Python SDK
Complete instrumentation for Python AI agents — tracing, custom metrics, governance, drift detection, and more.
Installation
pip install turingpulse-sdkConfiguration
Initialize TuringPulse at application startup. Get your API key from the TuringPulse dashboard.
Basic Configuration
from turingpulse_sdk import init, TuringPulseConfig
init(TuringPulseConfig(
api_key="sk_...", # Or set TP_API_KEY env var
workflow_name="My Workflow", # Or set TP_WORKFLOW_NAME env var
))Full Configuration Options
from turingpulse_sdk import init, TuringPulseConfig
config = TuringPulseConfig(
# Required
api_key="sk_...", # API key (or TP_API_KEY env var)
workflow_name="My Workflow", # Display name (or TP_WORKFLOW_NAME env var)
# Endpoint (optional — defaults to https://api.turingpulse.ai)
# endpoint="https://api.turingpulse.ai", # Or TP_ENDPOINT env var
# Data Capture
capture_arguments=False, # Log function inputs (TP_CAPTURE_ARGUMENTS)
capture_return_value=False, # Log function outputs (TP_CAPTURE_RETURN_VALUE)
# Security — redact sensitive fields before telemetry leaves your environment
redact_fields=[ # Keys to mask with [REDACTED]
"password", "api_key", "secret", "token", "authorization"
],
# Network Tuning
timeout_seconds=10.0, # HTTP timeout (TP_TIMEOUT_SECONDS, max 120)
max_retries=3, # Retry attempts (TP_MAX_RETRIES)
policy_check_timeout=1.5, # HITL policy check timeout (TP_POLICY_CHECK_TIMEOUT)
# Governance defaults (applied to all instrumented functions)
# governance_defaults=GovernanceDefaults(hitl=False, reviewers=[]),
)
init(config)TP_API_KEY as an environment variable instead of hardcoding. The SDK also supports TP_WORKFLOW_NAME and TP_ENDPOINT.Basic Instrumentation
Use the @instrument decorator to capture traces from any function.
from turingpulse_sdk import instrument
@instrument(name="my-agent")
def process_query(query: str) -> str:
response = llm.chat(query)
return response
# Call normally - traces are captured automatically
result = process_query("What's the weather today?")Decorator Options
@instrument(
name="customer-support-agent",
operation="handle_query",
labels={
"team": "support",
"channel": "web",
"priority": "high",
},
trace=True,
)
def my_agent(query: str, context: dict) -> dict:
return {"response": "..."}Custom Metrics & KPIs
Track custom metrics and set up automatic alerts when thresholds are breached.
Defining KPIs
from turingpulse_sdk import instrument, KPIConfig
@instrument(
name="document-processor",
kpis=[
# Track latency from execution duration
KPIConfig(
kpi_id="latency_ms",
description="Response Latency",
use_duration=True,
alert_threshold=5000,
comparator="gt",
),
KPIConfig(
kpi_id="token_count",
description="Token Usage",
value=lambda ctx: ctx.result.get("tokens", 0) if ctx.result else 0,
alert_threshold=8000,
comparator="gt",
),
KPIConfig(
kpi_id="accuracy",
description="Accuracy Score",
value=lambda ctx: ctx.result.get("score", 0) if ctx.result else 0,
alert_threshold=0.85,
comparator="lt",
),
KPIConfig(
kpi_id="cost_usd",
description="Execution Cost",
from_result_path="cost",
alert_threshold=0.50,
comparator="gt",
),
],
)
def process_document(doc: str) -> dict:
result = llm.analyze(doc)
return {
"analysis": result.text,
"tokens": result.usage.total_tokens,
"score": result.confidence,
}KPI Comparators
| Comparator | Description | Alert When |
|---|---|---|
gt | Greater than | value > threshold |
gte | Greater than or equal | value >= threshold |
lt | Less than | value < threshold |
lte | Less than or equal | value <= threshold |
eq | Equal | value == threshold |
neq | Not equal | value != threshold |
KPIConfig on your @instrument() decorators instead of recording metrics manually. KPIs are evaluated automatically after each run and can trigger alerts when thresholds are breached.Governance & Human Oversight
Configure human-in-the-loop (HITL), human-after-the-loop (HATL), and human-on-the-loop (HOTL) workflows.
Human-in-the-Loop (HITL)
Require human approval before execution completes.
from turingpulse_sdk import instrument, GovernanceDirective
@instrument(
name="high-risk-action",
governance=GovernanceDirective(
hitl=True,
reviewers=["manager@company.com"],
escalation_channels=["pagerduty://critical"],
auto_escalate_after_seconds=3600,
severity="high",
),
)
def execute_trade(symbol: str, amount: float):
return trading_api.execute(symbol, amount)Human-after-the-Loop (HATL)
Execute immediately, queue for review after completion.
@instrument(
name="content-generator",
governance=GovernanceDirective(
hatl=True,
reviewers=["qa@company.com"],
notes="Review generated content for accuracy and tone",
),
)
def generate_content(topic: str):
return content_llm.generate(topic)Human-on-the-Loop (HOTL)
Real-time monitoring with alerts, no blocking.
@instrument(
name="realtime-monitor",
governance=GovernanceDirective(
hotl=True,
escalation_channels=["slack://alerts"],
),
)
def process_transaction(tx: dict):
return payment_processor.process(tx)Platform Configuration
Alert channels, baselines, anomaly rules, and workflow registration are configured through the TuringPulse platform UI or REST API. This keeps operational configuration separate from your application code.
- Alert Channels — Controls → Alert Channels
- KPI Thresholds — Controls → Thresholds
- Drift Rules — Controls → Drift Rules
- Anomaly Rules — Controls → Anomalies
Nested Spans
Nested spans are captured automatically when using framework integrations (LangGraph, LangChain, etc.). Each LLM call, tool invocation, and retrieval step creates a child span within the parent trace.
For custom workflows, nest @instrument calls:
from turingpulse_sdk import instrument
@instrument(name="multi-step-agent")
def complex_workflow(query: str):
context = retrieve_context(query)
response = generate_response(query, context)
return validate_output(response)
@instrument(name="retrieval-step")
def retrieve_context(query: str):
return vector_db.search(query)
@instrument(name="generation-step")
def generate_response(query: str, context: list):
return llm.chat(query, context=context)
@instrument(name="validation-step")
def validate_output(response):
return validator.check(response)Custom Metadata & Tags
from turingpulse_sdk import instrument
@instrument(
name="enriched-agent",
labels={"department": "sales", "experiment": "v2-prompt"},
metadata={"model_version": "gpt-4-turbo"},
)
def process_with_metadata(query: str, user_id: str):
result = llm.chat(query)
return result.content@instrument() decorator. For dynamic values computed at runtime (like token counts), use KPIConfigwith value extractors.Deploy Tracking
Register deployments to correlate behavior changes with code releases.
from turingpulse_sdk import register_deploy
# Auto-detect from CI/CD environment
register_deploy(
workflow_id="my-agent",
auto_detect=True, # Detects GitHub Actions, GitLab CI, etc.
)
# Or provide explicit values
register_deploy(
workflow_id="my-agent",
version="v1.2.3",
git_sha="abc123def",
commit_message="Improve prompt template",
)Drift Detection & Fingerprinting
from turingpulse_sdk import init, FingerprintConfig
init(TuringPulseConfig(
api_key="sk_live_...",
workflow_name="My Workflow",
fingerprint=FingerprintConfig(
enabled=True,
capture_prompts=True, # Hash prompts for change detection
capture_configs=True, # Hash model configurations
capture_structure=True, # Track agent DAG structure
sensitive_config_keys=[ # Keys to redact before hashing
"api_key", "password", "secret", "token"
],
)
)
# When prompts/configs change, TuringPulse will:
# 1. Detect the change automatically
# 2. Correlate with any metric changes
# 3. Alert if drift is detectedAsync Support
from turingpulse_sdk import instrument
import asyncio
@instrument(name="async-agent")
async def async_workflow(queries: list[str]):
tasks = [process_query(q) for q in queries]
results = await asyncio.gather(*tasks)
return results
@instrument(name="query-processor")
async def process_query(query: str):
response = await llm.achat(query)
return response
# Run with asyncio
results = asyncio.run(async_workflow(["q1", "q2", "q3"]))Error Handling
from turingpulse_sdk import instrument
@instrument(name="error-aware-agent")
def risky_operation(data: dict):
try:
result = external_api.call(data)
return result
except RateLimitError:
raise
except ValidationError as e:
return {"error": "Invalid input", "details": str(e)}
except Exception:
raiseNext Steps
- LangGraph Integration — Auto-instrumentation for LangGraph agents
- LangChain Integration — Auto-instrumentation for LangChain
- OpenAI Integration — Instrument OpenAI API calls
- Quickstart Guide — End-to-end setup walkthrough