Back to SDKs
Python

Python SDK Guide

A complete, runnable example for integrating Zywrap's V1 offline data (including dynamic schemas) into your Python (Flask/FastAPI) application.

Option 1: Cloud Proxy (Quick Start)

Don't want to host the offline data yourself? Use our official PyPI package to connect directly to the Zywrap Cloud Proxy in seconds.

bash
pip install zywrap
python
from zywrap import Zywrap, ZywrapError

# Initialize the client
client = Zywrap("YOUR_API_KEY")

try:
    response = client.execute(
        model="openai-gpt-5.4",
        wrapper_codes=["mc_seo_meta_titles_descriptions_base"],
        variables={
            "pageTopic": "AI project management software",
            "pageIntent": "Compare software options",
            "primaryKeyword": "agency project management",
            "brandVoice": "clear and trustworthy"
        }
    )
    print(response["data"])
    
except ZywrapError as e:
    print(f"Failed to execute: {e}")
OR

Option 2: Advanced Local Database Sync

Self-host the wrapper data to ensure zero-latency lookups, complete offline capability, and highly customized API integrations.

Step 1: Download Your Data Bundle

Download the V1 ZIP file containing the highly compressed Tabular JSON data. Unzip the zywrap-data.json file from the bundle to use in the import script.

You must be logged in to download the SDK data bundle.

Step 2: Database Setup

Run this SQL. It features the relational use_cases table containing the dynamic schema_data.

sql

-- Database Schema for Zywrap Offline SDK v1.0 (PostgreSQL)

CREATE TABLE "ai_models" (
  "code" VARCHAR(255) PRIMARY KEY,
  "name" VARCHAR(255) NOT NULL,
  "status" BOOLEAN DEFAULT TRUE,
  "ordering" INT
);

CREATE TABLE "categories" (
  "code" VARCHAR(255) PRIMARY KEY,
  "name" VARCHAR(255) NOT NULL,
  "status" BOOLEAN DEFAULT TRUE,
  "ordering" INT
);

CREATE TABLE "languages" (
  "code" VARCHAR(10) PRIMARY KEY,
  "name" VARCHAR(255) NOT NULL,
  "status" BOOLEAN DEFAULT TRUE,
  "ordering" INT
);

CREATE TABLE "use_cases" (
  "code" VARCHAR(255) PRIMARY KEY,
  "name" VARCHAR(255) NOT NULL,
  "description" TEXT,
  "category_code" VARCHAR(255) REFERENCES categories(code) ON DELETE SET NULL,
  "schema_data" JSONB,
  "status" BOOLEAN DEFAULT TRUE,
  "ordering" BIGINT
);

CREATE TABLE "wrappers" (
  "code" VARCHAR(255) PRIMARY KEY,
  "name" VARCHAR(255) NOT NULL,
  "description" TEXT,
  "use_case_code" VARCHAR(255) REFERENCES use_cases(code) ON DELETE SET NULL,
  "featured" BOOLEAN DEFAULT FALSE,
  "base" BOOLEAN DEFAULT FALSE,
  "status" BOOLEAN DEFAULT TRUE,
  "ordering" BIGINT
);

CREATE TABLE "block_templates" (
  "type" VARCHAR(50) NOT NULL,
  "code" VARCHAR(255) NOT NULL,
  "name" VARCHAR(255) NOT NULL,
  "status" BOOLEAN DEFAULT TRUE,
  PRIMARY KEY ("type", "code")
);

CREATE TABLE "settings" (
  "setting_key" VARCHAR(255) PRIMARY KEY,
  "setting_value" TEXT
);

CREATE TABLE "usage_logs" (
  "id" BIGSERIAL PRIMARY KEY,
  "trace_id" VARCHAR(255),
  "wrapper_code" VARCHAR(255),
  "model_code" VARCHAR(255),
  "prompt_tokens" INT DEFAULT 0,
  "completion_tokens" INT DEFAULT 0,
  "total_tokens" INT DEFAULT 0,
  "credits_used" BIGINT DEFAULT 0,
  "latency_ms" INT DEFAULT 0,
  "status" VARCHAR(50) DEFAULT 'success',
  "error_message" TEXT,
  "created_at" TIMESTAMPTZ DEFAULT NOW()
);

-- Indexes for performance
CREATE INDEX idx_usage_wrapper ON usage_logs(wrapper_code);
CREATE INDEX idx_usage_model ON usage_logs(model_code);
CREATE INDEX idx_use_case_cat ON use_cases(category_code);
CREATE INDEX idx_wrapper_uc ON wrappers(use_case_code);

Step 3: Database Connection

Save this as db.py. We recommend using psycopg2 for PostgreSQL.

python

# FILE: db.py
# Uses the 'psycopg2' library for PostgreSQL
# pip install psycopg2-binary

import psycopg2
import psycopg2.extras 
import sys

# Replace with your actual database credentials
DB_SETTINGS = {
    "dbname": "zywrap_db",
    "user": "postgres",
    "password": "password",
    "host": "localhost",
    "port": "5432"
}

def get_db_connection():
    """Establishes and returns a new database connection."""
    try:
        conn = psycopg2.connect(**DB_SETTINGS)
        return conn
    except psycopg2.OperationalError as e:
        print(f"FATAL: Could not connect to the database.\n{e}", file=sys.stderr)
        sys.exit(1)

Step 4: Import Tabular Data

This script parses the compressed V1 JSON and securely bulk-inserts it into your database.

python

# FILE: import.py
# USAGE: python import.py
# This script assumes you have 'zywrap-data.json' in the same directory.

import json
import sys
from db import get_db_connection

def extract_tabular(tabular_data):
    """Helper to expand tabular JSON data into dictionaries"""
    if not tabular_data or not tabular_data.get('cols') or not tabular_data.get('data'):
        return []
    cols = tabular_data['cols']
    return [dict(zip(cols, row)) for row in tabular_data['data']]

def main():
    print("Starting lightning-fast v1.0 data import...")
    
    try:
        with open('zywrap-data.json', 'r', encoding='utf-8') as f:
            data = json.load(f)
    except FileNotFoundError:
        print("FATAL: zywrap-data.json not found.", file=sys.stderr)
        sys.exit(1)

    conn = get_db_connection()
    try:
        with conn.cursor() as cur:
            # 1. Clear existing data
            print("Clearing tables...")
            cur.execute("TRUNCATE wrappers, use_cases, categories, languages, block_templates, ai_models, settings RESTART IDENTITY CASCADE")

            # psycopg2 autocommits by default only outside blocks, so we are in a transaction implicitly.
            
            # 1. Import Categories
            if 'categories' in data:
                for c in extract_tabular(data['categories']):
                    cur.execute(
                        "INSERT INTO categories (code, name, status, ordering) VALUES (%s, %s, TRUE, %s)", 
                        (c['code'], c['name'], c.get('ordering', 99999))
                    )
                print("Categories imported successfully.")

            # 2. Import Use Cases
            if 'useCases' in data:
                for uc in extract_tabular(data['useCases']):
                    schema_json = json.dumps(uc['schema']) if uc.get('schema') else None
                    cur.execute(
                        "INSERT INTO use_cases (code, name, description, category_code, schema_data, status, ordering) VALUES (%s, %s, %s, %s, %s, TRUE, %s)", 
                        (uc['code'], uc['name'], uc.get('desc'), uc.get('cat'), schema_json, uc.get('ordering', 999999999))
                    )
                print("Use Cases imported successfully.")

            # 3. Import Wrappers
            if 'wrappers' in data:
                for w in extract_tabular(data['wrappers']):
                    featured = bool(w.get('featured'))
                    base = bool(w.get('base'))
                    cur.execute(
                        "INSERT INTO wrappers (code, name, description, use_case_code, featured, base, status, ordering) VALUES (%s, %s, %s, %s, %s, %s, TRUE, %s)",
                        (w['code'], w['name'], w.get('desc'), w.get('usecase'), featured, base, w.get('ordering', 999999999))
                    )
                print("Wrappers imported successfully.")

            # 4. Import Languages
            if 'languages' in data:
                ord_counter = 1
                for l in extract_tabular(data['languages']):
                    cur.execute(
                        "INSERT INTO languages (code, name, status, ordering) VALUES (%s, %s, TRUE, %s)", 
                        (l['code'], l['name'], ord_counter)
                    )
                    ord_counter += 1
                print("Languages imported successfully.")

            # 5. Import AI Models
            if 'aiModels' in data:
                for m in extract_tabular(data['aiModels']):
                    cur.execute(
                        "INSERT INTO ai_models (code, name, status, ordering) VALUES (%s, %s, TRUE, %s)",
                        (m['code'], m['name'], m.get('ordering', 99999))
                    )
                print("AI Models imported successfully.")
                
            # 6. Import Block Templates
            if 'templates' in data:
                for type_name, tabular in data['templates'].items():
                    for tpl in extract_tabular(tabular):
                        cur.execute(
                            "INSERT INTO block_templates (type, code, name, status) VALUES (%s, %s, %s, TRUE)", 
                            (type_name, tpl['code'], tpl['name'])
                        )
                print("Block templates imported successfully.")

            # 7. Store the version
            if 'version' in data:
                cur.execute(
                    "INSERT INTO settings (setting_key, setting_value) VALUES ('data_version', %s) ON CONFLICT (setting_key) DO UPDATE SET setting_value = EXCLUDED.setting_value",
                    (data['version'],)
                )
                print("Data version saved to settings table.")
            
            conn.commit()
            print(f"\n✅ v1.0 Import complete! Version: {data.get('version', 'N/A')}")

    except Exception as e:
        conn.rollback()
        print(f"FATAL: Database error during import.\n{e}", file=sys.stderr)
    finally:
        conn.close()

if __name__ == "__main__":
    main()

Step 5: The Dynamic API & Playground

This Flask server reads from your local DB, handles dynamic schemas, forwards the final request to the Zywrap API, and logs local usage automatically.

html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Zywrap V1 Offline Playground</title>
    <style>
        :root {
            --primary: #2563eb; --primary-hover: #1d4ed8;
            --bg: #f8fafc; --card-bg: #ffffff;
            --text-main: #0f172a; --text-muted: #64748b;
            --border: #e2e8f0; --radius: 0.75rem;
        }
        * { box-sizing: border-box; margin: 0; padding: 0; }
        body { font-family: system-ui, -apple-system, sans-serif; background: var(--bg); color: var(--text-main); line-height: 1.5; padding: 2rem; }
        .container { max-width: 1200px; margin: 0 auto; display: grid; grid-template-columns: 1fr 1.8fr; gap: 2rem; }
        @media (max-width: 768px) { .container { grid-template-columns: 1fr; } }
        
        .card { background: var(--card-bg); border-radius: var(--radius); border: 1px solid var(--border); box-shadow: 0 1px 3px rgba(0,0,0,0.05); padding: 1.5rem; margin-bottom: 1.5rem; }
        .card-title { font-size: 1.25rem; font-weight: 700; margin-bottom: 1rem; color: var(--text-main); }
        
        .form-group { margin-bottom: 1rem; }
        label { display: block; font-size: 0.875rem; font-weight: 600; margin-bottom: 0.35rem; color: #334155; }
        
        .input, select, textarea { 
            width: 100%; padding: 0.625rem 0.75rem; border: 1px solid var(--border); 
            border-radius: 0.5rem; font-size: 0.875rem; background: #fff; transition: border-color 0.2s; 
        }
        .input:focus, select:focus, textarea:focus { outline: none; border-color: var(--primary); box-shadow: 0 0 0 3px rgba(37,99,235,0.1); }
        textarea { resize: vertical; min-height: 80px; }
        
        .grid-2 { display: grid; grid-template-columns: 1fr 1fr; gap: 0.75rem; }
        
        .btn { 
            width: 100%; background: var(--primary); color: white; border: none; padding: 0.875rem; 
            border-radius: 0.5rem; font-weight: 600; font-size: 1rem; cursor: pointer; transition: background 0.2s; 
        }
        .btn:hover { background: var(--primary-hover); }
        .btn:disabled { opacity: 0.7; cursor: not-allowed; }
        
        .schema-section { background: #f8fafc; border: 1px solid var(--border); border-radius: 0.5rem; padding: 1rem; margin-bottom: 1rem; }
        .schema-title { font-size: 0.875rem; font-weight: 700; border-bottom: 1px solid var(--border); padding-bottom: 0.5rem; margin-bottom: 0.75rem; color: #475569; }
        
        #output { background: #f1f5f9; padding: 1.25rem; border-radius: 0.5rem; font-family: ui-monospace, monospace; font-size: 0.875rem; white-space: pre-wrap; min-height: 300px; border: 1px solid var(--border); overflow-x: auto; }
    </style>
</head>
<body>
    <div class="container">
        <div>
            <div class="card">
                <h2 class="card-title">Configuration</h2>
                
                <div class="form-group">
                    <label>1. Category</label>
                    <select id="category"><option value="">Loading...</option></select>
                </div>
                
                <div class="form-group">
                    <label>2. AI Solution</label>
                    <select id="usecase" disabled><option value="">-- Select Category First --</option></select>
                </div>

                <div class="form-group">
                    <label>3. Configuration Style</label>
                    <select id="wrapper" disabled><option value="">-- Select Solution First --</option></select>
                </div>

                <div class="form-group">
                    <label>4. AI Model</label>
                    <select id="model"><option value="">Loading...</option></select>
                </div>

                <div class="form-group">
                    <label>5. Target Language</label>
                    <select id="language"><option value="">Loading...</option></select>
                </div>
            </div>

            <div class="card">
                <h2 class="card-title" style="font-size: 1rem;">Advanced Overrides</h2>
                <div class="grid-2 overrides">
                    <div><label>Tone</label><select id="toneCode"><option value="">-- Default --</option></select></div>
                    <div><label>Style</label><select id="styleCode"><option value="">-- Default --</option></select></div>
                    <div><label>Formatting</label><select id="formatCode"><option value="">-- Default --</option></select></div>
                    <div><label>Complexity</label><select id="complexityCode"><option value="">-- Default --</option></select></div>
                    <div><label>Length</label><select id="lengthCode"><option value="">-- Default --</option></select></div>
                    <div><label>Audience</label><select id="audienceCode"><option value="">-- Default --</option></select></div>
                    <div><label>Goal</label><select id="responseGoalCode"><option value="">-- Default --</option></select></div>
                    <div><label>Output Type</label><select id="outputCode"><option value="">-- Default --</option></select></div>
                </div>
            </div>
        </div>

        <div>
            <div class="card">
                <div id="dynamic-schema-container"></div>
                
                <div class="form-group">
                    <label id="prompt-label">Prompt / Additional Context</label>
                    <textarea id="prompt" class="input" placeholder="Type your request or additional instructions here..."></textarea>
                </div>
                
                <button id="run-button" class="btn">Generate Response</button>
            </div>

            <div class="card">
                <h2 class="card-title">AI Response</h2>
                <pre id="output">Output will appear here...</pre>
            </div>
        </div>
    </div>

    <script>
        // Points to the Flask Backend
        const API_ENDPOINT = 'http://localhost:5000/api';
        
        const elements = {
            category: document.getElementById('category'),
            usecase: document.getElementById('usecase'), // 🚀 NEW
            wrapper: document.getElementById('wrapper'),
            model: document.getElementById('model'),
            language: document.getElementById('language'),
            schemaContainer: document.getElementById('dynamic-schema-container'),
            prompt: document.getElementById('prompt'),
            promptLabel: document.getElementById('prompt-label'),
            runBtn: document.getElementById('run-button'),
            output: document.getElementById('output')
        };

        async function fetchAPI(action, params = '') {
            const res = await fetch(`${API_ENDPOINT}?action=${action}&${params}`);
            return await res.json();
        }

        function populateSelect(el, data, placeholder = '-- Select --') {
            el.innerHTML = `<option value="">${placeholder}</option>`;
            data.forEach(item => {
                const opt = document.createElement('option');
                opt.value = item.code; 
                opt.textContent = item.name;
                el.appendChild(opt);
            });
            el.disabled = false;
        }

        async function init() {
            const [categories, models, langs, templates] = await Promise.all([
                fetchAPI('get_categories'),
                fetchAPI('get_ai_models'),
                fetchAPI('get_languages'),
                fetchAPI('get_block_templates')
            ]);
            
            populateSelect(elements.category, categories);
            populateSelect(elements.model, models, 'Default Model');
            populateSelect(elements.language, langs, 'English (Default)');

            const overrideMap = {
                tones: 'toneCode', styles: 'styleCode', formattings: 'formatCode',
                complexities: 'complexityCode', lengths: 'lengthCode', audienceLevels: 'audienceCode',
                responseGoals: 'responseGoalCode', outputTypes: 'outputCode'
            };
            for (const [type, elId] of Object.entries(overrideMap)) {
                if (templates[type]) populateSelect(document.getElementById(elId), templates[type], '-- Default --');
            }
        }

        // 🚀 CASCADING LOGIC 1: Category -> Load Use Cases
        elements.category.addEventListener('change', async () => {
            if (!elements.category.value) {
                elements.usecase.innerHTML = '<option value="">-- Select Category First --</option>';
                elements.usecase.disabled = true;
                elements.wrapper.innerHTML = '<option value="">-- Select Solution First --</option>';
                elements.wrapper.disabled = true;
                elements.schemaContainer.innerHTML = '';
                return;
            }
            elements.usecase.innerHTML = '<option value="">Loading...</option>';
            const useCases = await fetchAPI('get_use_cases', 'category=' + elements.category.value);
            populateSelect(elements.usecase, useCases, '-- Select a Solution --');
            
            // Reset wrapper
            elements.wrapper.innerHTML = '<option value="">-- Select Solution First --</option>';
            elements.wrapper.disabled = true;
            elements.schemaContainer.innerHTML = '';
        });

        // 🚀 CASCADING LOGIC 2: Use Case -> Load Wrappers
        elements.usecase.addEventListener('change', async () => {
            if (!elements.usecase.value) {
                elements.wrapper.innerHTML = '<option value="">-- Select Solution First --</option>';
                elements.wrapper.disabled = true;
                elements.schemaContainer.innerHTML = '';
                return;
            }
            
            elements.wrapper.innerHTML = '<option value="">Loading...</option>';
            let wrappers = await fetchAPI('get_wrappers', 'usecase=' + elements.usecase.value);
            
            // Format the labels cleanly (Base vs Variation)
            elements.wrapper.innerHTML = '<option value="">-- Select a Style --</option>';
            
            let autoSelectCode = null; // Track which wrapper to auto-select

            wrappers.forEach((w, index) => {
                const opt = document.createElement('option');
                opt.value = w.code;
                const parts = w.name.split('—');
                const displayName = w.base ? `✨ Base Template - ${parts[0].trim()}` : `↳ Variation: ${parts.length > 1 ? parts[1].trim() : w.name}`;
                opt.textContent = displayName;
                elements.wrapper.appendChild(opt);

                // Identify the Base template, or fallback to the first available variation
                if (w.base) {
                    autoSelectCode = w.code;
                } else if (index === 0 && !autoSelectCode) {
                    autoSelectCode = w.code;
                }
            });
            elements.wrapper.disabled = false;

            // 🚀 NEW: Auto-select and trigger schema load
            if (autoSelectCode) {
                elements.wrapper.value = autoSelectCode;
                elements.wrapper.dispatchEvent(new Event('change')); 
            }
        });

        // 🚀 CASCADING LOGIC 3: Wrapper -> Load Schema
        elements.wrapper.addEventListener('change', async () => {
            elements.schemaContainer.innerHTML = '';
            elements.promptLabel.textContent = "Prompt / Additional Context";
            if (!elements.wrapper.value) return;

            const schema = await fetchAPI('get_schema', 'wrapper=' + elements.wrapper.value);
            if (!schema || (!schema.req && !schema.opt)) return;

            let html = '';
            elements.promptLabel.textContent = "Additional Free-form Instructions";

            const buildSection = (title, data) => {
                if (!data || Object.keys(data).length === 0) return '';
                let sectionHtml = `<div class="schema-section"><h3 class="schema-title">${title}</h3><div class="grid-2">`;
                for (const [key, def] of Object.entries(data)) {
                    
                    const isPlaceholder = def.p !== undefined ? def.p : false;
                    const defaultVal = def.d !== undefined ? def.d : '';
                    
                    const placeholderAttr = isPlaceholder ? `placeholder="${defaultVal}"` : '';
                    const valueAttr = (!isPlaceholder && defaultVal) ? `value="${defaultVal}"` : '';
                    
                    const label = key.replace(/([A-Z])/g, ' $1').replace(/^./, str => str.toUpperCase());
                    
                    sectionHtml += `<div>
                        <label>${label}</label>
                        <input type="text" class="input schema-input" data-key="${key}" ${placeholderAttr} ${valueAttr}>
                    </div>`;
                }
                return sectionHtml + '</div></div>';
            };

            html += buildSection('Core Inputs', schema.req);
            html += buildSection('Additional Context', schema.opt);
            elements.schemaContainer.innerHTML = html;
        });

        // Execute
        elements.runBtn.addEventListener('click', async () => {
            if (!elements.wrapper.value) return alert("Please select a wrapper.");
            elements.output.textContent = 'Executing...';
            elements.runBtn.disabled = true;
            elements.runBtn.textContent = 'Generating...';
            
            // 1. Grab the exact user prompt
            let finalPrompt = elements.prompt.value.trim();
            
            // 2. Build the variables object natively
            const variables = {};
            document.querySelectorAll('.schema-input').forEach(input => {
                const val = input.value.trim();
                if (val !== '') {
                    variables[input.dataset.key] = val;
                }
            });

            // 3. Build overrides
            const overrides = {};
            document.querySelectorAll('.overrides select').forEach(sel => {
                if (sel.value) overrides[sel.id] = sel.value;
            });

            try {
                const response = await fetch(API_ENDPOINT + '?action=execute', {
                    method: 'POST',
                    headers: { 'Content-Type': 'application/json' },
                    body: JSON.stringify({
                        model: elements.model.value,
                        wrapperCode: elements.wrapper.value,
                        language: elements.language.value,
                        prompt: finalPrompt, // Only the raw text area
                        variables: variables, // Only the object (backend formats it!)
                        overrides: overrides
                    })
                });
                
                const text = await response.text();
                if (!response.ok) {
                    let errMsg = text;
                    try { errMsg = JSON.parse(text).error || errMsg; } catch(e) {}
                    throw new Error(errMsg);
                }
                
                const data = JSON.parse(text);
                elements.output.textContent = data.output || JSON.stringify(data, null, 2);
            } catch (error) {
                elements.output.textContent = 'Error: ' + error.message;
            } finally {
                elements.runBtn.disabled = false;
                elements.runBtn.textContent = 'Generate Response';
            }
        });

        init();
    </script>
</body>
</html>
python

# FILE: app.py
# A simple Flask server to replicate the 'api.php' V1 playground backend.
#
# REQUIREMENTS:
# pip install flask flask-cors requests psycopg2-binary
#
# USAGE:
# 1. Save this as 'app.py'
# 2. Run: flask --app app run
# 3. Open 'playground.html' in your browser.

import json
import time
import requests
import sys
from db import get_db_connection
from flask import Flask, request, jsonify, Response
from flask_cors import CORS
from psycopg2.extras import RealDictCursor 

app = Flask(__name__)
CORS(app) 

ZYWRAP_API_KEY = "YOUR_ZYWRAP_API_KEY"
ZYWRAP_PROXY_URL = 'https://api.zywrap.com/v1/proxy'

# --- Database Helper Functions ---

def get_categories(cur):
    cur.execute("SELECT code, name FROM categories WHERE status = TRUE ORDER BY ordering ASC")
    return cur.fetchall()

# 🚀 NEW: Fetch Solutions (Use Cases) by Category
def get_use_cases(cur, category_code):
    cur.execute("""
        SELECT code, name 
        FROM use_cases 
        WHERE category_code = %s AND status = TRUE 
        ORDER BY ordering ASC
    """, (category_code,))
    return cur.fetchall()

# 🚀 UPDATED: Fetch Wrappers (Styles) by Use Case
def get_wrappers_by_use_case(cur, use_case_code):
    cur.execute("""
        SELECT code, name, featured, base 
        FROM wrappers 
        WHERE use_case_code = %s AND status = TRUE
        ORDER BY ordering ASC
    """, (use_case_code,))
    return cur.fetchall()

def get_schema_by_wrapper(cur, wrapper_code):
    cur.execute("""
        SELECT uc.schema_data 
        FROM use_cases uc 
        JOIN wrappers w ON w.use_case_code = uc.code 
        WHERE w.code = %s AND w.status = TRUE AND uc.status = TRUE
    """, (wrapper_code,))
    res = cur.fetchone()
    return res['schema_data'] if res else None

def get_languages(cur):
    cur.execute("SELECT code, name FROM languages WHERE status = TRUE ORDER BY ordering ASC")
    return cur.fetchall()

def get_ai_models(cur):
    cur.execute("SELECT code, name FROM ai_models WHERE status = TRUE ORDER BY ordering ASC")
    return cur.fetchall()

def get_block_templates(cur):
    cur.execute("SELECT type, code, name FROM block_templates WHERE status = TRUE ORDER BY type, name ASC")
    results = cur.fetchall()
    grouped = {}
    for row in results:
        t = row['type']
        if t not in grouped: grouped[t] = []
        grouped[t].append({'code': row['code'], 'name': row['name']})
    return grouped

# ✅ HYBRID PROXY EXECUTION
def execute_zywrap_proxy(api_key, model, wrapper_code, prompt, language=None, variables={}, overrides={}):
    payload_data = {
        'model': model,
        'wrapperCodes': [wrapper_code],
        'prompt': prompt,
        'variables': variables,
        'source': 'python_sdk' 
    }
    
    if language: payload_data['language'] = language
    if overrides: payload_data.update(overrides)
        
    clean_key = api_key.strip()
    headers = {
        'Content-Type': 'application/json',
        'Authorization': f'Bearer {clean_key}',
        'User-Agent': 'ZywrapPythonSDK/1.1'
    }
    
    try:
        response = requests.post(ZYWRAP_PROXY_URL, json=payload_data, headers=headers, stream=True, timeout=300)
        
        if response.status_code == 200:
            final_json = None
            for line in response.iter_lines():
                if line:
                    decoded_line = line.decode('utf-8').strip()
                    if decoded_line.startswith('data: '):
                        json_str = decoded_line[6:]
                        try:
                            data = json.loads(json_str)
                            if data and ('output' in data or 'error' in data):
                                final_json = data
                        except json.JSONDecodeError:
                            pass
            
            if final_json:
                status_code = 400 if 'error' in final_json else 200
                return final_json, status_code
            else:
                return {'error': 'Stream parse failed'}, 500
        else:
            try: return response.json(), response.status_code
            except ValueError: return {'error': response.text}, response.status_code

    except requests.exceptions.RequestException as e:
        error_msg = str(e)
        if e.response is not None:
             try: return e.response.json(), e.response.status_code
             except: return {'error': e.response.text}, e.response.status_code
        return {'error': error_msg}, 500


# --- API Router ---
@app.route('/api', methods=['GET', 'POST'])
def api_router():
    conn = get_db_connection()
    try:
        with conn.cursor(cursor_factory=RealDictCursor) as cur:
            
            if request.method == 'GET':
                action = request.args.get('action')
                if action == 'get_categories': return jsonify(get_categories(cur))
                if action == 'get_use_cases': return jsonify(get_use_cases(cur, request.args.get('category')))
                if action == 'get_wrappers': return jsonify(get_wrappers_by_use_case(cur, request.args.get('usecase')))
                if action == 'get_languages': return jsonify(get_languages(cur))
                if action == 'get_ai_models': return jsonify(get_ai_models(cur))
                if action == 'get_block_templates': return jsonify(get_block_templates(cur))
                if action == 'get_schema': return jsonify(get_schema_by_wrapper(cur, request.args.get('wrapper')))

            if request.method == 'POST':
                input_data = request.get_json()
                action = request.args.get('action') or input_data.get('action')
                
                if action == 'execute':
                    start_time = time.time()
                    
                    result, status_code = execute_zywrap_proxy(
                        ZYWRAP_API_KEY,
                        input_data.get('model'),
                        input_data.get('wrapperCode', ''),
                        input_data.get('prompt', ''),
                        input_data.get('language'),
                        input_data.get('variables', {}),
                        input_data.get('overrides', {})
                    )
                    
                    latency_ms = int((time.time() - start_time) * 1000)

                    try:
                        status_text = 'success' if status_code == 200 else 'error'
                        trace_id = result.get('id')
                        
                        usage = result.get('usage', {})
                        p_tokens = usage.get('prompt_tokens', 0)
                        c_tokens = usage.get('completion_tokens', 0)
                        t_tokens = usage.get('total_tokens', 0)
                        
                        credits_used = result.get('cost', {}).get('credits_used', 0)
                        error_message = result.get('error') if status_text == 'error' else None
                        
                        if error_message:
                            error_msg_str = str(error_message)
                            error_message = error_msg_str[:255] + '...' if len(error_msg_str) > 255 else error_msg_str

                        cur.execute("""
                            INSERT INTO usage_logs 
                            (trace_id, wrapper_code, model_code, prompt_tokens, completion_tokens, total_tokens, credits_used, latency_ms, status, error_message) 
                            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                        """, (
                            trace_id, input_data.get('wrapperCode'), input_data.get('model', 'default'),
                            p_tokens, c_tokens, t_tokens, credits_used, latency_ms, status_text, error_message
                        ))
                        conn.commit()
                    except Exception as log_err:
                        print(f"Failed to write to usage_logs: {log_err}", file=sys.stderr)
                        conn.rollback()

                    return jsonify(result), status_code
            
            return jsonify({'error': 'Invalid action'}), 400

    except Exception as e:
        return jsonify({'error': str(e)}), 500
    finally:
        conn.close()

if __name__ == '__main__':
    print(f"Zywrap Python SDK Playground backend listening at http://localhost:5000")
    app.run(debug=True, port=5000)

Step 6: Synchronize Data

Run this script on a cron job. It features automatic unzipping for FULL resets and ON CONFLICT optimized upserts for DELTA updates.

python

# FILE: zywrap-sync.py
# USAGE: python zywrap-sync.py
# REQUIREMENTS: pip install requests psycopg2-binary

import requests
import sys
import os
import zipfile
import psycopg2.extras 
from db import get_db_connection

# --- CONFIGURATION ---
DEVELOPER_API_KEY = 'YOUR_ZYWRAP_API_KEY_HERE'
ZYWRAP_API_ENDPOINT = 'https://api.zywrap.com/v1/sdk/v1/sync'
# ---------------------

def get_current_version(cur):
    cur.execute("SELECT setting_value FROM settings WHERE setting_key = 'data_version'")
    result = cur.fetchone()
    return result[0] if result else ''

def save_new_version(cur, version):
    cur.execute(
        "INSERT INTO settings (setting_key, setting_value) VALUES ('data_version', %s) ON CONFLICT (setting_key) DO UPDATE SET setting_value = EXCLUDED.setting_value",
        (version,)
    )

# --- HELPER FUNCTIONS ---

def upsert_batch(cur, table, rows, cols, pk='code'):
    """Optimized Upsert using Postgres ON CONFLICT"""
    if not rows: return
    
    col_names = ", ".join(cols)
    updates = [f"{c} = EXCLUDED.{c}" for c in cols if c != pk and c != 'type']
    update_clause = ", ".join(updates)
    conflict_target = f"({pk})" if pk != 'compound_template' else "(type, code)"
    
    query = f"""
        INSERT INTO {table} ({col_names}) VALUES %s
        ON CONFLICT {conflict_target} DO UPDATE SET {update_clause}
    """
    try:
        psycopg2.extras.execute_values(cur, query, rows, page_size=1000)
        print(f"   [+] Upserted {len(rows)} records into '{table}'.")
    except Exception as e:
        print(f"   [!] Error upserting {table}: {e}")

def delete_batch(cur, table, ids, pk='code'):
    if not ids: return
    query = f"DELETE FROM {table} WHERE {pk} = ANY(%s)"
    try:
        cur.execute(query, (list(ids),))
        print(f"   [-] Deleted {len(ids)} records from '{table}'.")
    except Exception as e:
        print(f"   [!] Error deleting from {table}: {e}")

# --- MAIN LOGIC ---

def main():
    print("--- 🚀 Starting Zywrap V1 Sync ---")
    conn = get_db_connection()
    try:
        with conn.cursor() as cur:
            current_version = get_current_version(cur)
            print(f"🔹 Local Version: {current_version or 'None'}")
            
            # Commit immediately to release the read-lock on the settings table!
            # Without this, import.py will deadlock when trying to TRUNCATE.
            conn.commit()

            # 1. Fetch update info
            headers = {'Authorization': f'Bearer {DEVELOPER_API_KEY}', 'Accept': 'application/json'}
            params = {'fromVersion': current_version}
            
            try:
                response = requests.get(ZYWRAP_API_ENDPOINT, headers=headers, params=params, verify=False)
                response.raise_for_status()
            except Exception as e:
                print(f"❌ API Error: {e}")
                return

            patch = response.json()
            mode = patch.get('mode', 'UNKNOWN')
            print(f"🔹 Sync Mode: {mode}")

            # --- SCENARIO A: FULL RESET ---
            if mode == 'FULL_RESET':
                zip_path = 'zywrap-data.zip'
                download_url = patch['wrappers']['downloadUrl']
                
                print(f"⬇️  Attempting automatic download from Zywrap...")
                dl = requests.get(download_url, headers=headers, stream=True, verify=False)
                
                if dl.status_code == 200:
                    with open(zip_path, 'wb') as f:
                        for chunk in dl.iter_content(chunk_size=8192): f.write(chunk)
                    
                    mb_size = round(os.path.getsize(zip_path) / 1024 / 1024, 2)
                    print(f"✅ Data bundle downloaded successfully ({mb_size} MB).")
                    
                    try:
                        print("📦 Attempting auto-unzip...")
                        with zipfile.ZipFile(zip_path, 'r') as z: 
                            z.extractall('.')
                        print("✅ Auto-unzip successful. Running import script...")
                        os.remove(zip_path)
                        
                        import importlib.util
                        spec = importlib.util.spec_from_file_location("import_script", "import.py")
                        import_module = importlib.util.module_from_spec(spec)
                        spec.loader.exec_module(import_module)
                        import_module.main()

                    except Exception as z_err:
                        print("⚠️ Failed to auto-unzip (Check directory permissions).")
                        print("\n👉 ACTION REQUIRED:")
                        print(f"   1. Please manually unzip '{zip_path}' in this folder.")
                        print("   2. Then run: python import.py")
                else:
                    print(f"❌ Automatic download failed. HTTP Status: {dl.status_code}")
                    if os.path.exists(zip_path): os.remove(zip_path)

            # --- SCENARIO B: DELTA UPDATE ---
            elif mode == 'DELTA_UPDATE':
                meta = patch.get('metadata', {})
                
                # Categories
                rows = [(r['code'], r['name'], bool(r.get('status', True)), r.get('position') or r.get('displayOrder') or r.get('ordering')) for r in meta.get('categories', [])]
                upsert_batch(cur, 'categories', rows, ['code', 'name', 'status', 'ordering'])

                # Languages
                rows = [(r['code'], r['name'], bool(r.get('status', True)), r.get('ordering')) for r in meta.get('languages', [])]
                upsert_batch(cur, 'languages', rows, ['code', 'name', 'status', 'ordering'])

                # AI Models
                rows = [(r['code'], r['name'], bool(r.get('status', True)), r.get('displayOrder') or r.get('ordering')) for r in meta.get('aiModels', [])]
                upsert_batch(cur, 'ai_models', rows, ['code', 'name', 'status', 'ordering'])

                # Templates
                rows = []
                for type_name, items in meta.get('templates', {}).items():
                    for i in items:
                        rows.append((type_name, i['code'], i.get('label') or i.get('name'), bool(i.get('status', True))))
                upsert_batch(cur, 'block_templates', rows, ['type', 'code', 'name', 'status'], pk='compound_template')

                # Use Cases
                upserts = patch.get('useCases', {}).get('upserts', [])
                if upserts:
                    rows = []
                    for uc in upserts:
                        schema_str = json.dumps(uc['schema']) if uc.get('schema') else None
                        rows.append((uc['code'], uc['name'], uc.get('description'), uc.get('categoryCode'), schema_str, bool(uc.get('status', True)), uc.get('displayOrder') or uc.get('ordering')))
                    upsert_batch(cur, 'use_cases', rows, ['code', 'name', 'description', 'category_code', 'schema_data', 'status', 'ordering'])

                # Wrappers
                upserts = patch.get('wrappers', {}).get('upserts', [])
                if upserts:
                    rows = []
                    for w in upserts:
                        rows.append((w['code'], w['name'], w.get('description'), w.get('useCaseCode') or w.get('categoryCode'), bool(w.get('featured') or w.get('isFeatured')), bool(w.get('base') or w.get('isBaseWrapper')), bool(w.get('status', True)), w.get('displayOrder') or w.get('ordering')))
                    upsert_batch(cur, 'wrappers', rows, ['code', 'name', 'description', 'use_case_code', 'featured', 'base', 'status', 'ordering'])

                # Deletes
                delete_batch(cur, 'wrappers', patch.get('wrappers', {}).get('deletes', []))
                delete_batch(cur, 'use_cases', patch.get('useCases', {}).get('deletes', []))
                
                # Version
                if patch.get('newVersion'):
                    save_new_version(cur, patch['newVersion'])
                
                conn.commit()
                print("✅ Delta Sync Complete.")
            else:
                print("✅ No updates needed.")

    except Exception as e:
        if not conn.closed:
            conn.rollback()
        print(f"FATAL: Sync Failed: {e}", file=sys.stderr)
    finally:
        if not conn.closed:
            conn.close()

if __name__ == "__main__":
    main()

Programmatically Download

Stream the download directly to disk to prevent memory issues.

python

# FILE: download_bundle.py
import requests
import sys
import os

# --- CONFIGURATION ---
ZYWRAP_API_KEY = 'YOUR_API_KEY_HERE'
API_ENDPOINT = 'https://api.zywrap.com/v1/sdk/v1/download'
OUTPUT_FILE = 'zywrap-data.zip'
# ---------------------

def download_sdk_bundle():
    print("Downloading latest V1 wrapper data from Zywrap...")

    if not ZYWRAP_API_KEY or 'YOUR_API_KEY_HERE' in ZYWRAP_API_KEY:
        print("FATAL: Please replace 'YOUR_API_KEY_HERE' with your actual Zywrap API key.", file=sys.stderr)
        sys.exit(1)

    headers = {'Authorization': f'Bearer {ZYWRAP_API_KEY}'}

    try:
        # stream=True ensures we don't load the entire zip into RAM at once
        response = requests.get(API_ENDPOINT, headers=headers, stream=True, timeout=300, verify=False)
        response.raise_for_status()

        # Write directly to disk
        with open(OUTPUT_FILE, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        
        print(f"✅ Sync complete. Data saved to {OUTPUT_FILE}.")
        print(f"Run 'unzip {OUTPUT_FILE}' to extract the 'zywrap-data.json' file, then run 'python import.py'.")

    except requests.exceptions.HTTPError as e:
        if os.path.exists(OUTPUT_FILE): os.remove(OUTPUT_FILE)
        print(f"FATAL: API request failed with status code {e.response.status_code}.", file=sys.stderr)
        sys.exit(1)
    except Exception as e:
        if os.path.exists(OUTPUT_FILE): os.remove(OUTPUT_FILE)
        print(f"FATAL: An error occurred: {e}", file=sys.stderr)
        sys.exit(1)

if __name__ == "__main__":
    download_sdk_bundle()
Ready to ship?

Start building with Zywrap today.

Stop wrestling with prompts. Integrate powerful, structured AI into your applications in minutes.