Documentation Index
Fetch the complete documentation index at: https://polos.dev/docs/llms.txt
Use this file to discover all available pages before exploring further.
Workflows are the foundation of Polos. They’re durable functions that survive failures, maintain state, and resume exactly where they stopped.
What is a workflow?
A workflow is a function that can contain steps, call other workflows, wait for events, and use normal programming constructs like loops and conditionals.
from polos import workflow, WorkflowContext
@workflow
async def process_order(ctx: WorkflowContext, input: OrderInput):
# Step 1: Validate order
order = await ctx.step.run("validate", validate_order, input)
# Step 2: Charge payment
payment = await ctx.step.run("charge", charge_stripe, order)
# Step 3: Send confirmation
await ctx.step.run("send_email", send_confirmation, order)
return OrderOutput(order_id=order.id, status="completed")
What makes workflows special:
- Durable - Survive crashes and resume from the last completed step
- Stateful - Maintain context across execution
- Composable - Call other workflows and wait for results
- Observable - Track execution with real-time events
Why workflows?
Traditional functions fail completely when something goes wrong. Workflows persist progress at each step:
Without workflows:
async def process_order(order):
validate_order(order) # ✓ Completes
charge_stripe(order) # ✓ Completes
send_confirmation(order) # ❌ Server crashes
# On restart: Everything runs again
# Result: Customer charged twice
With workflows:
@workflow
async def process_order(ctx: WorkflowContext, input: OrderInput):
order = await ctx.step.run("validate", validate_order, input) # ✓ Cached
payment = await ctx.step.run("charge", charge_stripe, order) # ✓ Cached
await ctx.step.run("send_email", send_confirmation, order) # ❌ Crashes
# On restart: Resumes from send_email
# Result: Customer charged once, email sent successfully
Workflow context
Every workflow receives a context object with execution metadata and methods:
@workflow
async def example_workflow(ctx: WorkflowContext, input: dict):
# Execution identifiers
execution_id = ctx.execution_id
workflow_id = ctx.workflow_id
# User context
user_id = ctx.user_id
session_id = ctx.session_id
# Timestamps
created_at = ctx.created_at
started_at = ctx.started_at
# Step methods
result = await ctx.step.run("step_name", function, args)
await ctx.step.wait_for("wait_step", seconds=60)
return {"status": "completed"}
Key concepts
Steps
Steps are the unit of durability. Each step’s output is persisted when the step completes.
@workflow
async def example(ctx: WorkflowContext, input: dict):
# Each step is durable
step1 = await ctx.step.run("step1", func1, input)
step2 = await ctx.step.run("step2", func2, step1)
step3 = await ctx.step.run("step3", func3, step2)
See Steps for details.
Workflow composition
Workflows can call other workflows or agents. They can wait for those to complete or simply trigger them as fire-and-forget.
@workflow
async def parent(ctx: WorkflowContext, input: ParentInput):
# Call child workflow but don't wait for it to complete
exec_handle = await ctx.step.invoke(
"call_child",
child_workflow,
{"data": input.data}
)
# Call agent and wait
response = await ctx.step.agent_invoke_and_wait(
"research_agent",
research_agent.with_input(input.data)
)
return response.result
@workflow
async def child_workflow(ctx: WorkflowContext, input: dict):
# Process data
return {"processed": True}
Control flow
Use normal control flow:
@workflow
async def conditional_workflow(ctx: WorkflowContext, input: ConditionalWorkflowInput):
# Conditionals
if input.amount > 1000:
await ctx.step.run("high_value", process_high_value, input)
else:
await ctx.step.run("standard", process_standard, input)
# Loops
for item in input.items:
await ctx.step.run(f"process_{item.id}", process_item, item)
# Parallel execution
results = await asyncio.gather(
ctx.step.run("task_a", task_a, input),
ctx.step.run("task_b", task_b, input),
ctx.step.run("task_c", task_c, input)
)
return results
Waits and events
Workflows can pause and resume:
@workflow
async def approval_workflow(ctx: WorkflowContext, input: ApprovalWorkflowInput):
# Process request
request = await ctx.step.run("process", process_request, input)
# Wait for approval event
approval = await ctx.step.wait_for_event(
"wait_approval",
topic="approval.response"
)
if approval.data["approved"]:
result = await ctx.step.run("execute", execute_action, request)
return result
See Waits and Events for details.
Starting workflows
Workflows can be started in three ways:
1. Invoke using API or SDK
client = PolosClient()
result = await order_workflow.invoke(
client, payload={"order_id": "123"}
)
You can also invoke multiple workflows or agents in parallel:
from polos import (
batch_invoke, batch_agent_invoke, BatchWorkflowInput, PolosClient
)
client = PolosClient()
# Batch invoke multiple workflows
handles = await client.batch_invoke([
BatchWorkflowInput(id="workflow-1", payload={"foo": "bar"}),
BatchWorkflowInput(id="workflow-2", payload={"baz": 42}),
BatchWorkflowInput(id="workflow-3", payload={"qux": "test"}),
])
# Batch invoke multiple agents
agent_handles = await client.batch_agent_invoke([
grammar_agent.with_input("Check this text"),
tone_agent.with_input("Check this text"),
accuracy_agent.with_input("Check this text"),
])
Note: Batch invoke cannot be called from within a workflow. Use ctx.step.batch_invoke() or ctx.step.batch_agent_invoke() when calling from within workflows. See Steps for details.
2. Scheduled execution
@workflow(schedule="0 9 * * *") # Daily at 9am
async def daily_report(ctx: WorkflowContext, input: SchedulePayload):
data = await ctx.step.run("fetch", fetch_data)
report = await ctx.step.run("generate", generate_report, data)
await ctx.step.run("send", send_email, report)
3. Event-triggered
@workflow(trigger_on_event="user.signup")
async def onboard_user(ctx: WorkflowContext, event: EventPayload):
user_id = event.data["user_id"]
await ctx.step.run("welcome", send_welcome_email, user_id)
await ctx.step.run("setup", create_sample_data, user_id)
Key takeaways
- Workflows are durable functions - They survive failures and resume from the last completed step
- Steps are the unit of durability - Each step’s output is persisted
- WorkflowContext provides execution metadata - Access IDs, timestamps, and step methods
- Compose workflows - Call other workflows and wait for results
- Use normal control flow - Conditionals, loops, and parallel execution
- Start workflows - Via API, schedule, or events