Skip to main content

Python SDK

Official Python SDK for FlowMaestro with both synchronous and asynchronous clients.

Installation

pip install flowmaestro

Requirements: Python 3.9+

Quick Start

Synchronous Client

from flowmaestro import FlowMaestroClient

with FlowMaestroClient(api_key="fm_live_...") as client:
# Execute a workflow
response = client.workflows.execute("wf_123", inputs={"name": "John"})
execution_id = response["data"]["execution_id"]

# Wait for completion
result = client.executions.wait_for_completion(execution_id)
print(f"Result: {result['outputs']}")

Async Client

import asyncio
from flowmaestro import AsyncFlowMaestroClient

async def main():
async with AsyncFlowMaestroClient(api_key="fm_live_...") as client:
# Execute a workflow
response = await client.workflows.execute("wf_123", inputs={"name": "John"})
execution_id = response["data"]["execution_id"]

# Wait for completion
result = await client.executions.wait_for_completion(execution_id)
print(f"Result: {result['outputs']}")

asyncio.run(main())

Configuration

from flowmaestro import FlowMaestroClient

client = FlowMaestroClient(
# Required: Your API key
api_key="fm_live_your_api_key",

# Optional: Custom base URL (default: https://api.flowmaestro.io)
base_url="https://api.flowmaestro.io",

# Optional: Request timeout in seconds (default: 30.0)
timeout=30.0,

# Optional: Max retry attempts (default: 3)
max_retries=3,

# Optional: Custom headers
headers={
"X-Custom-Header": "value"
}
)

Workflows

List Workflows

response = client.workflows.list(page=1, per_page=20)

for workflow in response["data"]:
print(f"{workflow['name']} ({workflow['id']})")

print(f"Total: {response['pagination']['total_count']}")

Get Workflow

response = client.workflows.get("wf_abc123")

workflow = response["data"]
print(f"Workflow: {workflow['name']}")
print(f"Input Schema: {workflow['input_schema']}")

Execute Workflow

response = client.workflows.execute("wf_abc123", inputs={
"customer_email": "john@example.com",
"customer_name": "John Doe"
})

print(f"Execution ID: {response['data']['execution_id']}")
print(f"Status: {response['data']['status']}") # "pending"

Executions

Get Execution Status

response = client.executions.get("exec_xyz789")

execution = response["data"]
print(f"Status: {execution['status']}")

if execution["status"] == "completed":
print(f"Outputs: {execution['outputs']}")

Wait for Completion

# Wait with polling (default: 1 second interval, 5 minute timeout)
result = client.executions.wait_for_completion(
"exec_xyz789",
poll_interval=1.0, # Check every second
timeout=300.0 # 5 minute timeout
)

if result["status"] == "completed":
print(f"Success: {result['outputs']}")
else:
print(f"Failed: {result['error']}")

Stream Execution Events

# Stream events using Server-Sent Events
for event in client.executions.stream("exec_xyz789"):
print(f"Event: {event['type']}")

if event["type"] == "execution:completed":
print(f"Outputs: {event.get('outputs')}")
break

if event["type"] == "execution:failed":
print(f"Error: {event.get('error')}")
break

Async Streaming

async for event in client.executions.stream("exec_xyz789"):
if event["type"] == "node:started":
print(f"Starting node: {event['node_id']}")
elif event["type"] == "execution:completed":
print(f"Done: {event['outputs']}")
break

Cancel Execution

response = client.executions.cancel("exec_xyz789")
print(f"Cancelled: {response['data']['status'] == 'cancelled'}")

Agents & Threads

List Agents

response = client.agents.list()

for agent in response["data"]:
print(f"{agent['name']} ({agent['model']})")

Create Thread

response = client.agents.create_thread(
"agent_abc123",
metadata={"user_id": "user_456"}
)

thread_id = response["data"]["id"]
print(f"Thread ID: {thread_id}")

Send Message

response = client.threads.send_message("thread_xyz789", "What is my order status?")

print(f"Response: {response['data']['content']}")
print(f"Tokens used: {response['data']['usage']['total_tokens']}")

Stream Message Response

for event in client.threads.send_message_stream("thread_xyz789", "Tell me a story"):
if event["type"] == "message:token":
print(event.get("token", ""), end="", flush=True)
elif event["type"] == "message:completed":
print("\n\nDone!")
break

Get Thread Messages

response = client.threads.list_messages("thread_xyz789")

for msg in response["data"]:
print(f"[{msg['role']}]: {msg['content']}")

Delete Thread

client.threads.delete("thread_xyz789")

Knowledge Bases

List Knowledge Bases

response = client.knowledge_bases.list()

for kb in response["data"]:
print(f"{kb['name']} - {kb['document_count']} documents")

Query Knowledge Base

response = client.knowledge_bases.query(
"kb_abc123",
"How do I reset my password?",
top_k=5
)

for result in response["data"]["results"]:
print(f"Score: {result['similarity']:.3f}")
print(f"Content: {result['content']}")
print(f"Source: {result['document_name']}")
print("---")

Triggers

List Triggers

response = client.triggers.list(workflow_id="wf_abc123")

for trigger in response["data"]:
print(f"{trigger['name']} ({trigger['type']})")

Execute Trigger

response = client.triggers.execute("trigger_abc123", inputs={
"event_type": "user_signup",
"user_id": "user_123"
})

print(f"Execution ID: {response['data']['execution_id']}")

Webhooks

Create Webhook

response = client.webhooks.create(
name="My Webhook",
url="https://api.example.com/webhook",
events=["execution.completed", "execution.failed"]
)

# Store the secret securely!
print(f"Webhook ID: {response['data']['id']}")
print(f"Secret: {response['data']['secret']}")

List Webhooks

response = client.webhooks.list()

for webhook in response["data"]:
print(f"{webhook['name']} - {webhook['url']}")

Test Webhook

response = client.webhooks.test("wh_abc123")

if response["data"]["success"]:
print(f"Webhook working! Response time: {response['data']['response_time_ms']}ms")
else:
print(f"Webhook failed: {response['data']['error_message']}")

Delete Webhook

client.webhooks.delete("wh_abc123")

Error Handling

from flowmaestro import (
FlowMaestroClient,
FlowMaestroError,
AuthenticationError,
NotFoundError,
RateLimitError,
ValidationError
)

try:
response = client.workflows.execute("wf_invalid")
except NotFoundError:
print("Workflow not found")
except AuthenticationError:
print("Invalid API key")
except RateLimitError as e:
print(f"Rate limited. Retry after {e.retry_after}s")
except ValidationError as e:
print(f"Validation error: {e.details}")
except FlowMaestroError as e:
print(f"API error: {e.code} - {e.message}")

Type Hints

The SDK is fully typed and includes a py.typed marker for PEP 561 compliance:

from flowmaestro.types import Workflow, Execution, ExecutionStatus

def process_workflow(workflow: Workflow) -> None:
print(f"Processing: {workflow['name']}")

Context Managers

Both clients support context managers for automatic cleanup:

# Sync
with FlowMaestroClient(api_key="...") as client:
response = client.workflows.list()

# Async
async with AsyncFlowMaestroClient(api_key="...") as client:
response = await client.workflows.list()

Or manually close when done:

client = FlowMaestroClient(api_key="...")
try:
response = client.workflows.list()
finally:
client.close()