Documentation Index
Fetch the complete documentation index at: https://polos.dev/docs/llms.txt
Use this file to discover all available pages before exploring further.
Workflow is the core building block in Polos. A workflow is durable code - it survives failures and resumes exactly where it stopped.
What is a workflow?
A workflow is a Python function decorated with @workflow. It receives a WorkflowContext (ctx) and your input data.
from polos import workflow, WorkflowContext
@workflow
async def research(ctx: WorkflowContext, input: ResearchInput):
# Step 1: Search for information
results = await ctx.step.run("search_web", search_web, input.topic)
# Step 2: Analyze results (dynamic logic)
if len(results) > 10:
summary = await ctx.step.run("summarize", summarize_results, results)
else:
summary = await ctx.step.run("detailed_analysis", detailed_analysis, results)
# Step 3: Generate report
report = await ctx.step.run("generate_report", generate_report, summary)
return report
async def search_web(query: str):
response = await http_client.get(f"https://api.search.com?q={query}")
return response.json()
Let’s unpack what’s happening here:
ctx.step.run("search_web", search_web, input.topic) tells Polos to execute search_web (a regular Python function) as a durable step. If the workflow crashes and replays, completed steps are skipped.
Step: The unit of durability
Steps are how Polos achieves durability. Each step has a unique step key (like "search_web" or "generate_report"). When a workflow replays after a failure, Polos checks which steps already completed and skips them.
What should be a step?
- ✅ External API calls (OpenAI, Stripe, databases)
- ✅ Non-deterministic operations (LLM calls,
time.time(), random())
- ✅ Side effects (sending emails, charging cards, writing to DB)
Critical rule: Each step in a workflow must have a unique step key. In loops, use variables:
@workflow
async def process_items(ctx: WorkflowContext, input: Input):
# ✅ GOOD: Unique step key per iteration
for i, item in enumerate(input.items):
await ctx.step.run(f"process_{i}", process_item, item)
Workflow composition
Workflows can invoke other workflows. The parent suspends while children execute - no compute is consumed during waits.
@workflow
async def parent_workflow(ctx: WorkflowContext, input: ParentInput):
# Invoke child and wait for result
result = await ctx.step.invoke_and_wait(
"call_child",
child_workflow,
ChildInput(message="Hello!")
)
# Parent resumes here with result
return result
Waiting
Workflows can pause for time or events. Workers suspend during waits consuming no compute.
@workflow
async def approval_workflow(ctx: WorkflowContext, input: Input):
# Request approval
await ctx.step.run("request", request_approval, input.data)
# Wait for event (hours or days) - no compute consumed
decision = await ctx.step.wait_for_event("wait_approval", topic=f"approval/{input.id}")
# Resumes here when event arrives
if decision.data["approved"]:
await ctx.step.run("execute", execute_action, input.data)
else:
await ctx.step.run("handle_rejection", handle_rejection, decision.data)
Starting workflows
Workflows can be triggered in three ways:
1. Direct invocation
result = await research.invoke(ResearchInput(topic="AI agents"))
2. Event-triggered
@workflow(trigger_on_event="user/signup")
async def onboard_user(ctx: WorkflowContext, event: UserSignUpEvent):
# runs when event occurs
3. Scheduled
@workflow(schedule="0 9 * * *") # Daily at 9am
async def daily_report(ctx: WorkflowContext, input: SchedulePayload):
# runs at the defined schedule
Key takeaways
- Workflows are durable - they survive failures and resume from the last completed step
- Steps are the unit of durability - use them for API calls, side effects, and non-deterministic operations
- Step keys must be unique per execution
- Workers suspend during waits (child workflows, events, timeouts) - no compute consumed
Learn more
For detailed guides on workflows and steps, see:
- Workflows – Complete workflow reference
- Steps – Built-in step functions