
Python SDK Guide
A complete, runnable example for integrating Zywrap's V1 offline data (including dynamic schemas) into your Python (Flask/FastAPI) application.
Step 1: Download Your Data Bundle
Download the V1 ZIP file containing the highly compressed Tabular JSON data. Unzip the zywrap-data.json file from the bundle to use in the import script.
You must be logged in to download the SDK data bundle.
Step 2: Database Setup
Run this SQL. It features the relational use_cases table containing the dynamic schema_data.
-- Database Schema for Zywrap Offline SDK v1.0 (PostgreSQL)
CREATE TABLE "ai_models" (
"code" VARCHAR(255) PRIMARY KEY,
"name" VARCHAR(255) NOT NULL,
"status" BOOLEAN DEFAULT TRUE,
"ordering" INT
);
CREATE TABLE "categories" (
"code" VARCHAR(255) PRIMARY KEY,
"name" VARCHAR(255) NOT NULL,
"status" BOOLEAN DEFAULT TRUE,
"ordering" INT
);
CREATE TABLE "languages" (
"code" VARCHAR(10) PRIMARY KEY,
"name" VARCHAR(255) NOT NULL,
"status" BOOLEAN DEFAULT TRUE,
"ordering" INT
);
CREATE TABLE "use_cases" (
"code" VARCHAR(255) PRIMARY KEY,
"name" VARCHAR(255) NOT NULL,
"description" TEXT,
"category_code" VARCHAR(255) REFERENCES categories(code) ON DELETE SET NULL,
"schema_data" JSONB,
"status" BOOLEAN DEFAULT TRUE,
"ordering" BIGINT
);
CREATE TABLE "wrappers" (
"code" VARCHAR(255) PRIMARY KEY,
"name" VARCHAR(255) NOT NULL,
"description" TEXT,
"use_case_code" VARCHAR(255) REFERENCES use_cases(code) ON DELETE SET NULL,
"featured" BOOLEAN DEFAULT FALSE,
"base" BOOLEAN DEFAULT FALSE,
"status" BOOLEAN DEFAULT TRUE,
"ordering" BIGINT
);
CREATE TABLE "block_templates" (
"type" VARCHAR(50) NOT NULL,
"code" VARCHAR(255) NOT NULL,
"name" VARCHAR(255) NOT NULL,
"status" BOOLEAN DEFAULT TRUE,
PRIMARY KEY ("type", "code")
);
CREATE TABLE "settings" (
"setting_key" VARCHAR(255) PRIMARY KEY,
"setting_value" TEXT
);
CREATE TABLE "usage_logs" (
"id" BIGSERIAL PRIMARY KEY,
"trace_id" VARCHAR(255),
"wrapper_code" VARCHAR(255),
"model_code" VARCHAR(255),
"prompt_tokens" INT DEFAULT 0,
"completion_tokens" INT DEFAULT 0,
"total_tokens" INT DEFAULT 0,
"credits_used" BIGINT DEFAULT 0,
"latency_ms" INT DEFAULT 0,
"status" VARCHAR(50) DEFAULT 'success',
"error_message" TEXT,
"created_at" TIMESTAMPTZ DEFAULT NOW()
);
-- Indexes for performance
CREATE INDEX idx_usage_wrapper ON usage_logs(wrapper_code);
CREATE INDEX idx_usage_model ON usage_logs(model_code);
CREATE INDEX idx_use_case_cat ON use_cases(category_code);
CREATE INDEX idx_wrapper_uc ON wrappers(use_case_code);
Step 3: Database Connection
Save this as db.py. We recommend using psycopg2 for PostgreSQL.
# FILE: db.py
# Uses the 'psycopg2' library for PostgreSQL
# pip install psycopg2-binary
import psycopg2
import psycopg2.extras
import sys
# Replace with your actual database credentials
DB_SETTINGS = {
"dbname": "zywrap_db",
"user": "postgres",
"password": "password",
"host": "localhost",
"port": "5432"
}
def get_db_connection():
"""Establishes and returns a new database connection."""
try:
conn = psycopg2.connect(**DB_SETTINGS)
return conn
except psycopg2.OperationalError as e:
print(f"FATAL: Could not connect to the database.\n{e}", file=sys.stderr)
sys.exit(1)
Step 4: Import Tabular Data
This script parses the compressed V1 JSON and securely bulk-inserts it into your database.
# FILE: import.py
# USAGE: python import.py
# This script assumes you have 'zywrap-data.json' in the same directory.
import json
import sys
from db import get_db_connection
def extract_tabular(tabular_data):
"""Helper to expand tabular JSON data into dictionaries"""
if not tabular_data or not tabular_data.get('cols') or not tabular_data.get('data'):
return []
cols = tabular_data['cols']
return [dict(zip(cols, row)) for row in tabular_data['data']]
def main():
print("Starting lightning-fast v1.0 data import...")
try:
with open('zywrap-data.json', 'r', encoding='utf-8') as f:
data = json.load(f)
except FileNotFoundError:
print("FATAL: zywrap-data.json not found.", file=sys.stderr)
sys.exit(1)
conn = get_db_connection()
try:
with conn.cursor() as cur:
# 1. Clear existing data
print("Clearing tables...")
cur.execute("TRUNCATE wrappers, use_cases, categories, languages, block_templates, ai_models, settings RESTART IDENTITY CASCADE")
# psycopg2 autocommits by default only outside blocks, so we are in a transaction implicitly.
# 1. Import Categories
if 'categories' in data:
for c in extract_tabular(data['categories']):
cur.execute(
"INSERT INTO categories (code, name, status, ordering) VALUES (%s, %s, TRUE, %s)",
(c['code'], c['name'], c.get('ordering', 99999))
)
print("Categories imported successfully.")
# 2. Import Use Cases
if 'useCases' in data:
for uc in extract_tabular(data['useCases']):
schema_json = json.dumps(uc['schema']) if uc.get('schema') else None
cur.execute(
"INSERT INTO use_cases (code, name, description, category_code, schema_data, status, ordering) VALUES (%s, %s, %s, %s, %s, TRUE, %s)",
(uc['code'], uc['name'], uc.get('desc'), uc.get('cat'), schema_json, uc.get('ordering', 999999999))
)
print("Use Cases imported successfully.")
# 3. Import Wrappers
if 'wrappers' in data:
for w in extract_tabular(data['wrappers']):
featured = bool(w.get('featured'))
base = bool(w.get('base'))
cur.execute(
"INSERT INTO wrappers (code, name, description, use_case_code, featured, base, status, ordering) VALUES (%s, %s, %s, %s, %s, %s, TRUE, %s)",
(w['code'], w['name'], w.get('desc'), w.get('usecase'), featured, base, w.get('ordering', 999999999))
)
print("Wrappers imported successfully.")
# 4. Import Languages
if 'languages' in data:
ord_counter = 1
for l in extract_tabular(data['languages']):
cur.execute(
"INSERT INTO languages (code, name, status, ordering) VALUES (%s, %s, TRUE, %s)",
(l['code'], l['name'], ord_counter)
)
ord_counter += 1
print("Languages imported successfully.")
# 5. Import AI Models
if 'aiModels' in data:
for m in extract_tabular(data['aiModels']):
cur.execute(
"INSERT INTO ai_models (code, name, status, ordering) VALUES (%s, %s, TRUE, %s)",
(m['code'], m['name'], m.get('ordering', 99999))
)
print("AI Models imported successfully.")
# 6. Import Block Templates
if 'templates' in data:
for type_name, tabular in data['templates'].items():
for tpl in extract_tabular(tabular):
cur.execute(
"INSERT INTO block_templates (type, code, name, status) VALUES (%s, %s, %s, TRUE)",
(type_name, tpl['code'], tpl['name'])
)
print("Block templates imported successfully.")
# 7. Store the version
if 'version' in data:
cur.execute(
"INSERT INTO settings (setting_key, setting_value) VALUES ('data_version', %s) ON CONFLICT (setting_key) DO UPDATE SET setting_value = EXCLUDED.setting_value",
(data['version'],)
)
print("Data version saved to settings table.")
conn.commit()
print(f"\n✅ v1.0 Import complete! Version: {data.get('version', 'N/A')}")
except Exception as e:
conn.rollback()
print(f"FATAL: Database error during import.\n{e}", file=sys.stderr)
finally:
conn.close()
if __name__ == "__main__":
main()
Step 5: The Dynamic API & Playground
This Flask server reads from your local DB, handles dynamic schemas, forwards the final request to the Zywrap API, and logs local usage automatically.
# FILE: app.py
# A simple Flask server to replicate the 'api.php' V1 playground backend.
#
# REQUIREMENTS:
# pip install flask flask-cors requests psycopg2-binary
#
# USAGE:
# 1. Save this as 'app.py'
# 2. Run: flask --app app run
# 3. Open 'playground.html' in your browser.
import json
import time
import requests
import sys
from db import get_db_connection
from flask import Flask, request, jsonify, Response
from flask_cors import CORS
from psycopg2.extras import RealDictCursor
app = Flask(__name__)
CORS(app)
ZYWRAP_API_KEY = "YOUR_ZYWRAP_API_KEY"
ZYWRAP_PROXY_URL = 'https://api.zywrap.com/v1/proxy'
# --- Database Helper Functions ---
def get_categories(cur):
cur.execute("SELECT code, name FROM categories WHERE status = TRUE ORDER BY ordering ASC")
return cur.fetchall()
def get_languages(cur):
cur.execute("SELECT code, name FROM languages WHERE status = TRUE ORDER BY ordering ASC")
return cur.fetchall()
def get_ai_models(cur):
cur.execute("SELECT code, name FROM ai_models WHERE status = TRUE ORDER BY ordering ASC")
return cur.fetchall()
def get_block_templates(cur):
cur.execute("SELECT type, code, name FROM block_templates WHERE status = TRUE ORDER BY type, name ASC")
results = cur.fetchall()
grouped = {}
for row in results:
t = row['type']
if t not in grouped: grouped[t] = []
grouped[t].append({'code': row['code'], 'name': row['name']})
return grouped
def get_wrappers_by_category(cur, category_code):
cur.execute("""
SELECT w.code, w.name, w.featured, w.base
FROM wrappers w
JOIN use_cases uc ON w.use_case_code = uc.code
WHERE uc.category_code = %s AND w.status = TRUE AND uc.status = TRUE
ORDER BY w.ordering ASC
""", (category_code,))
return cur.fetchall()
def get_schema_by_wrapper(cur, wrapper_code):
cur.execute("""
SELECT uc.schema_data
FROM use_cases uc
JOIN wrappers w ON w.use_case_code = uc.code
WHERE w.code = %s AND w.status = TRUE AND uc.status = TRUE
""", (wrapper_code,))
res = cur.fetchone()
return res['schema_data'] if res else None
# ✅ HYBRID PROXY EXECUTION
def execute_zywrap_proxy(api_key, model, wrapper_code, prompt, language=None, variables={}, overrides={}):
payload_data = {
'model': model,
'wrapperCodes': [wrapper_code],
'prompt': prompt,
'variables': variables,
'source': 'python_sdk'
}
if language: payload_data['language'] = language
if overrides: payload_data.update(overrides)
clean_key = api_key.strip()
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {clean_key}',
'User-Agent': 'ZywrapPythonSDK/1.1'
}
try:
response = requests.post(ZYWRAP_PROXY_URL, json=payload_data, headers=headers, stream=True, timeout=300)
if response.status_code == 200:
final_json = None
for line in response.iter_lines():
if line:
decoded_line = line.decode('utf-8').strip()
if decoded_line.startswith('data: '):
json_str = decoded_line[6:]
try:
data = json.loads(json_str)
if data and ('output' in data or 'error' in data):
final_json = data
except json.JSONDecodeError:
pass
if final_json:
status_code = 400 if 'error' in final_json else 200
return final_json, status_code
else:
return {'error': 'Stream parse failed'}, 500
else:
try: return response.json(), response.status_code
except ValueError: return {'error': response.text}, response.status_code
except requests.exceptions.RequestException as e:
error_msg = str(e)
if e.response is not None:
try: return e.response.json(), e.response.status_code
except: return {'error': e.response.text}, e.response.status_code
return {'error': error_msg}, 500
# --- API Router ---
@app.route('/api', methods=['GET', 'POST'])
def api_router():
conn = get_db_connection()
try:
# Use RealDictCursor to automatically convert SQL rows to dictionaries
with conn.cursor(cursor_factory=RealDictCursor) as cur:
if request.method == 'GET':
action = request.args.get('action')
if action == 'get_categories': return jsonify(get_categories(cur))
if action == 'get_languages': return jsonify(get_languages(cur))
if action == 'get_ai_models': return jsonify(get_ai_models(cur))
if action == 'get_block_templates': return jsonify(get_block_templates(cur))
if action == 'get_wrappers': return jsonify(get_wrappers_by_category(cur, request.args.get('category')))
if action == 'get_schema': return jsonify(get_schema_by_wrapper(cur, request.args.get('wrapper')))
if request.method == 'POST':
input_data = request.get_json()
action = request.args.get('action') or input_data.get('action')
if action == 'execute':
# ⏱️ Start Local Timer
start_time = time.time()
result, status_code = execute_zywrap_proxy(
ZYWRAP_API_KEY,
input_data.get('model'),
input_data.get('wrapperCode', ''),
input_data.get('prompt', ''),
input_data.get('language'),
input_data.get('variables', {}),
input_data.get('overrides', {})
)
# ⏱️ End Local Timer
latency_ms = int((time.time() - start_time) * 1000)
# --- 📝 LOGGING TO LOCAL DATABASE ---
try:
status_text = 'success' if status_code == 200 else 'error'
trace_id = result.get('id')
usage = result.get('usage', {})
p_tokens = usage.get('prompt_tokens', 0)
c_tokens = usage.get('completion_tokens', 0)
t_tokens = usage.get('total_tokens', 0)
credits_used = result.get('cost', {}).get('credits_used', 0)
error_message = result.get('error') if status_text == 'error' else None
if error_message:
error_msg_str = str(error_message)
error_message = error_msg_str[:255] + '...' if len(error_msg_str) > 255 else error_msg_str
cur.execute("""
INSERT INTO usage_logs
(trace_id, wrapper_code, model_code, prompt_tokens, completion_tokens, total_tokens, credits_used, latency_ms, status, error_message)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (
trace_id, input_data.get('wrapperCode'), input_data.get('model', 'default'),
p_tokens, c_tokens, t_tokens, credits_used, latency_ms, status_text, error_message
))
conn.commit()
except Exception as log_err:
print(f"Failed to write to usage_logs: {log_err}", file=sys.stderr)
conn.rollback()
return jsonify(result), status_code
return jsonify({'error': 'Invalid action'}), 400
except Exception as e:
return jsonify({'error': str(e)}), 500
finally:
conn.close()
if __name__ == '__main__':
print(f"Zywrap Python SDK Playground backend listening at http://localhost:5000")
app.run(debug=True, port=5000)
Step 6: Synchronize Data
Run this script on a cron job. It features automatic unzipping for FULL resets and ON CONFLICT optimized upserts for DELTA updates.
# FILE: zywrap-sync.py
# USAGE: python zywrap-sync.py
# REQUIREMENTS: pip install requests psycopg2-binary
import requests
import sys
import os
import zipfile
import psycopg2.extras
from db import get_db_connection
# --- CONFIGURATION ---
DEVELOPER_API_KEY = 'YOUR_ZYWRAP_API_KEY_HERE'
ZYWRAP_API_ENDPOINT = 'https://api.zywrap.com/v1/sdk/v1/sync'
# ---------------------
def get_current_version(cur):
cur.execute("SELECT setting_value FROM settings WHERE setting_key = 'data_version'")
result = cur.fetchone()
return result[0] if result else ''
def save_new_version(cur, version):
cur.execute(
"INSERT INTO settings (setting_key, setting_value) VALUES ('data_version', %s) ON CONFLICT (setting_key) DO UPDATE SET setting_value = EXCLUDED.setting_value",
(version,)
)
# --- HELPER FUNCTIONS ---
def upsert_batch(cur, table, rows, cols, pk='code'):
"""Optimized Upsert using Postgres ON CONFLICT"""
if not rows: return
col_names = ", ".join(cols)
updates = [f"{c} = EXCLUDED.{c}" for c in cols if c != pk and c != 'type']
update_clause = ", ".join(updates)
conflict_target = f"({pk})" if pk != 'compound_template' else "(type, code)"
query = f"""
INSERT INTO {table} ({col_names}) VALUES %s
ON CONFLICT {conflict_target} DO UPDATE SET {update_clause}
"""
try:
psycopg2.extras.execute_values(cur, query, rows, page_size=1000)
print(f" [+] Upserted {len(rows)} records into '{table}'.")
except Exception as e:
print(f" [!] Error upserting {table}: {e}")
def delete_batch(cur, table, ids, pk='code'):
if not ids: return
query = f"DELETE FROM {table} WHERE {pk} = ANY(%s)"
try:
cur.execute(query, (list(ids),))
print(f" [-] Deleted {len(ids)} records from '{table}'.")
except Exception as e:
print(f" [!] Error deleting from {table}: {e}")
# --- MAIN LOGIC ---
def main():
print("--- 🚀 Starting Zywrap V1 Sync ---")
conn = get_db_connection()
try:
with conn.cursor() as cur:
current_version = get_current_version(cur)
print(f"🔹 Local Version: {current_version or 'None'}")
# Commit immediately to release the read-lock on the settings table!
# Without this, import.py will deadlock when trying to TRUNCATE.
conn.commit()
# 1. Fetch update info
headers = {'Authorization': f'Bearer {DEVELOPER_API_KEY}', 'Accept': 'application/json'}
params = {'fromVersion': current_version}
try:
response = requests.get(ZYWRAP_API_ENDPOINT, headers=headers, params=params, verify=False)
response.raise_for_status()
except Exception as e:
print(f"❌ API Error: {e}")
return
patch = response.json()
mode = patch.get('mode', 'UNKNOWN')
print(f"🔹 Sync Mode: {mode}")
# --- SCENARIO A: FULL RESET ---
if mode == 'FULL_RESET':
zip_path = 'zywrap-data.zip'
download_url = patch['wrappers']['downloadUrl']
print(f"⬇️ Attempting automatic download from Zywrap...")
dl = requests.get(download_url, headers=headers, stream=True, verify=False)
if dl.status_code == 200:
with open(zip_path, 'wb') as f:
for chunk in dl.iter_content(chunk_size=8192): f.write(chunk)
mb_size = round(os.path.getsize(zip_path) / 1024 / 1024, 2)
print(f"✅ Data bundle downloaded successfully ({mb_size} MB).")
try:
print("📦 Attempting auto-unzip...")
with zipfile.ZipFile(zip_path, 'r') as z:
z.extractall('.')
print("✅ Auto-unzip successful. Running import script...")
os.remove(zip_path)
import importlib.util
spec = importlib.util.spec_from_file_location("import_script", "import.py")
import_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(import_module)
import_module.main()
except Exception as z_err:
print("⚠️ Failed to auto-unzip (Check directory permissions).")
print("\n👉 ACTION REQUIRED:")
print(f" 1. Please manually unzip '{zip_path}' in this folder.")
print(" 2. Then run: python import.py")
else:
print(f"❌ Automatic download failed. HTTP Status: {dl.status_code}")
if os.path.exists(zip_path): os.remove(zip_path)
# --- SCENARIO B: DELTA UPDATE ---
elif mode == 'DELTA_UPDATE':
meta = patch.get('metadata', {})
# Categories
rows = [(r['code'], r['name'], bool(r.get('status', True)), r.get('position') or r.get('displayOrder') or r.get('ordering')) for r in meta.get('categories', [])]
upsert_batch(cur, 'categories', rows, ['code', 'name', 'status', 'ordering'])
# Languages
rows = [(r['code'], r['name'], bool(r.get('status', True)), r.get('ordering')) for r in meta.get('languages', [])]
upsert_batch(cur, 'languages', rows, ['code', 'name', 'status', 'ordering'])
# AI Models
rows = [(r['code'], r['name'], bool(r.get('status', True)), r.get('displayOrder') or r.get('ordering')) for r in meta.get('aiModels', [])]
upsert_batch(cur, 'ai_models', rows, ['code', 'name', 'status', 'ordering'])
# Templates
rows = []
for type_name, items in meta.get('templates', {}).items():
for i in items:
rows.append((type_name, i['code'], i.get('label') or i.get('name'), bool(i.get('status', True))))
upsert_batch(cur, 'block_templates', rows, ['type', 'code', 'name', 'status'], pk='compound_template')
# Use Cases
upserts = patch.get('useCases', {}).get('upserts', [])
if upserts:
rows = []
for uc in upserts:
schema_str = json.dumps(uc['schema']) if uc.get('schema') else None
rows.append((uc['code'], uc['name'], uc.get('description'), uc.get('categoryCode'), schema_str, bool(uc.get('status', True)), uc.get('displayOrder') or uc.get('ordering')))
upsert_batch(cur, 'use_cases', rows, ['code', 'name', 'description', 'category_code', 'schema_data', 'status', 'ordering'])
# Wrappers
upserts = patch.get('wrappers', {}).get('upserts', [])
if upserts:
rows = []
for w in upserts:
rows.append((w['code'], w['name'], w.get('description'), w.get('useCaseCode') or w.get('categoryCode'), bool(w.get('featured') or w.get('isFeatured')), bool(w.get('base') or w.get('isBaseWrapper')), bool(w.get('status', True)), w.get('displayOrder') or w.get('ordering')))
upsert_batch(cur, 'wrappers', rows, ['code', 'name', 'description', 'use_case_code', 'featured', 'base', 'status', 'ordering'])
# Deletes
delete_batch(cur, 'wrappers', patch.get('wrappers', {}).get('deletes', []))
delete_batch(cur, 'use_cases', patch.get('useCases', {}).get('deletes', []))
# Version
if patch.get('newVersion'):
save_new_version(cur, patch['newVersion'])
conn.commit()
print("✅ Delta Sync Complete.")
else:
print("✅ No updates needed.")
except Exception as e:
if not conn.closed:
conn.rollback()
print(f"FATAL: Sync Failed: {e}", file=sys.stderr)
finally:
if not conn.closed:
conn.close()
if __name__ == "__main__":
main()
Programmatically Download
Stream the download directly to disk to prevent memory issues.
# FILE: download_bundle.py
import requests
import sys
import os
# --- CONFIGURATION ---
ZYWRAP_API_KEY = 'YOUR_API_KEY_HERE'
API_ENDPOINT = 'https://api.zywrap.com/v1/sdk/v1/download'
OUTPUT_FILE = 'zywrap-data.zip'
# ---------------------
def download_sdk_bundle():
print("Downloading latest V1 wrapper data from Zywrap...")
if not ZYWRAP_API_KEY or 'YOUR_API_KEY_HERE' in ZYWRAP_API_KEY:
print("FATAL: Please replace 'YOUR_API_KEY_HERE' with your actual Zywrap API key.", file=sys.stderr)
sys.exit(1)
headers = {'Authorization': f'Bearer {ZYWRAP_API_KEY}'}
try:
# stream=True ensures we don't load the entire zip into RAM at once
response = requests.get(API_ENDPOINT, headers=headers, stream=True, timeout=300, verify=False)
response.raise_for_status()
# Write directly to disk
with open(OUTPUT_FILE, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"✅ Sync complete. Data saved to {OUTPUT_FILE}.")
print(f"Run 'unzip {OUTPUT_FILE}' to extract the 'zywrap-data.json' file, then run 'python import.py'.")
except requests.exceptions.HTTPError as e:
if os.path.exists(OUTPUT_FILE): os.remove(OUTPUT_FILE)
print(f"FATAL: API request failed with status code {e.response.status_code}.", file=sys.stderr)
sys.exit(1)
except Exception as e:
if os.path.exists(OUTPUT_FILE): os.remove(OUTPUT_FILE)
print(f"FATAL: An error occurred: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
download_sdk_bundle()

