Skip to main content

Documentation Index

Fetch the complete documentation index at: https://polos.dev/docs/llms.txt

Use this file to discover all available pages before exploring further.

Workflows are the foundation of Polos. They’re durable functions that survive failures, maintain state, and resume exactly where they stopped.

What is a workflow?

A workflow is a function that can contain steps, call other workflows, wait for events, and use normal programming constructs like loops and conditionals.
from polos import workflow, WorkflowContext

@workflow
async def process_order(ctx: WorkflowContext, input: OrderInput):
    # Step 1: Validate order
    order = await ctx.step.run("validate", validate_order, input)

    # Step 2: Charge payment
    payment = await ctx.step.run("charge", charge_stripe, order)

    # Step 3: Send confirmation
    await ctx.step.run("send_email", send_confirmation, order)

    return OrderOutput(order_id=order.id, status="completed")
What makes workflows special:
  • Durable - Survive crashes and resume from the last completed step
  • Stateful - Maintain context across execution
  • Composable - Call other workflows and wait for results
  • Observable - Track execution with real-time events

Why workflows?

Traditional functions fail completely when something goes wrong. Workflows persist progress at each step: Without workflows:
async def process_order(order):
    validate_order(order)           # ✓ Completes
    charge_stripe(order)            # ✓ Completes
    send_confirmation(order)        # ❌ Server crashes
    # On restart: Everything runs again
    # Result: Customer charged twice
With workflows:
@workflow
async def process_order(ctx: WorkflowContext, input: OrderInput):
    order = await ctx.step.run("validate", validate_order, input)   # ✓ Cached
    payment = await ctx.step.run("charge", charge_stripe, order)    # ✓ Cached
    await ctx.step.run("send_email", send_confirmation, order)      # ❌ Crashes
    # On restart: Resumes from send_email
    # Result: Customer charged once, email sent successfully

Workflow context

Every workflow receives a context object with execution metadata and methods:
@workflow
async def example_workflow(ctx: WorkflowContext, input: dict):
    # Execution identifiers
    execution_id = ctx.execution_id
    workflow_id = ctx.workflow_id

    # User context
    user_id = ctx.user_id
    session_id = ctx.session_id

    # Timestamps
    created_at = ctx.created_at
    started_at = ctx.started_at

    # Step methods
    result = await ctx.step.run("step_name", function, args)
    await ctx.step.wait_for("wait_step", seconds=60)

    return {"status": "completed"}

Key concepts

Steps

Steps are the unit of durability. Each step’s output is persisted when the step completes.
@workflow
async def example(ctx: WorkflowContext, input: dict):
    # Each step is durable
    step1 = await ctx.step.run("step1", func1, input)
    step2 = await ctx.step.run("step2", func2, step1)
    step3 = await ctx.step.run("step3", func3, step2)
See Steps for details.

Workflow composition

Workflows can call other workflows or agents. They can wait for those to complete or simply trigger them as fire-and-forget.
@workflow
async def parent(ctx: WorkflowContext, input: ParentInput):
    # Call child workflow but don't wait for it to complete
    exec_handle = await ctx.step.invoke(
        "call_child",
        child_workflow,
        {"data": input.data}
    )

    # Call agent and wait
    response = await ctx.step.agent_invoke_and_wait(
        "research_agent",
        research_agent.with_input(input.data)
    )
    return response.result

@workflow
async def child_workflow(ctx: WorkflowContext, input: dict):
    # Process data
    return {"processed": True}

Control flow

Use normal control flow:
@workflow
async def conditional_workflow(ctx: WorkflowContext, input: ConditionalWorkflowInput):
    # Conditionals
    if input.amount > 1000:
        await ctx.step.run("high_value", process_high_value, input)
    else:
        await ctx.step.run("standard", process_standard, input)

    # Loops
    for item in input.items:
        await ctx.step.run(f"process_{item.id}", process_item, item)

    # Parallel execution
    results = await asyncio.gather(
        ctx.step.run("task_a", task_a, input),
        ctx.step.run("task_b", task_b, input),
        ctx.step.run("task_c", task_c, input)
    )

    return results

Waits and events

Workflows can pause and resume:
@workflow
async def approval_workflow(ctx: WorkflowContext, input: ApprovalWorkflowInput):
    # Process request
    request = await ctx.step.run("process", process_request, input)

    # Wait for approval event
    approval = await ctx.step.wait_for_event(
        "wait_approval",
        topic="approval.response"
    )

    if approval.data["approved"]:
        result = await ctx.step.run("execute", execute_action, request)
        return result
See Waits and Events for details.

Starting workflows

Workflows can be started in three ways:

1. Invoke using API or SDK

client = PolosClient()
result = await order_workflow.invoke(
    client, payload={"order_id": "123"}
)
You can also invoke multiple workflows or agents in parallel:
from polos import (
    batch_invoke, batch_agent_invoke, BatchWorkflowInput, PolosClient
)

client = PolosClient()

# Batch invoke multiple workflows
handles = await client.batch_invoke([
    BatchWorkflowInput(id="workflow-1", payload={"foo": "bar"}),
    BatchWorkflowInput(id="workflow-2", payload={"baz": 42}),
    BatchWorkflowInput(id="workflow-3", payload={"qux": "test"}),
])

# Batch invoke multiple agents
agent_handles = await client.batch_agent_invoke([
    grammar_agent.with_input("Check this text"),
    tone_agent.with_input("Check this text"),
    accuracy_agent.with_input("Check this text"),
])
Note: Batch invoke cannot be called from within a workflow. Use ctx.step.batch_invoke() or ctx.step.batch_agent_invoke() when calling from within workflows. See Steps for details.

2. Scheduled execution

@workflow(schedule="0 9 * * *")  # Daily at 9am
async def daily_report(ctx: WorkflowContext, input: SchedulePayload):
    data = await ctx.step.run("fetch", fetch_data)
    report = await ctx.step.run("generate", generate_report, data)
    await ctx.step.run("send", send_email, report)

3. Event-triggered

@workflow(trigger_on_event="user.signup")
async def onboard_user(ctx: WorkflowContext, event: EventPayload):
    user_id = event.data["user_id"]
    await ctx.step.run("welcome", send_welcome_email, user_id)
    await ctx.step.run("setup", create_sample_data, user_id)

Key takeaways

  • Workflows are durable functions - They survive failures and resume from the last completed step
  • Steps are the unit of durability - Each step’s output is persisted
  • WorkflowContext provides execution metadata - Access IDs, timestamps, and step methods
  • Compose workflows - Call other workflows and wait for results
  • Use normal control flow - Conditionals, loops, and parallel execution
  • Start workflows - Via API, schedule, or events