> ## Documentation Index
> Fetch the complete documentation index at: https://polos.dev/docs/llms.txt
> Use this file to discover all available pages before exploring further.

# Overview

Workflows are the foundation of Polos. They're durable functions that survive failures, maintain state, and resume exactly where they stopped.

## What is a workflow?

A workflow is a function that can contain steps, call other workflows, wait for events, and use normal programming constructs like loops and conditionals.

<CodeGroup>
  ```python Python theme={null}
  from polos import workflow, WorkflowContext

  @workflow
  async def process_order(ctx: WorkflowContext, input: OrderInput):
      # Step 1: Validate order
      order = await ctx.step.run("validate", validate_order, input)

      # Step 2: Charge payment
      payment = await ctx.step.run("charge", charge_stripe, order)

      # Step 3: Send confirmation
      await ctx.step.run("send_email", send_confirmation, order)

      return OrderOutput(order_id=order.id, status="completed")
  ```

  ```typescript TypeScript theme={null}
  import { defineWorkflow, WorkflowContext } from '@polos/sdk';

  const processOrder = defineWorkflow<OrderInput, {}, OrderOutput>(
    { id: 'process-order' },
    async (ctx, input) => {
      // Step 1: Validate order
      const order = await ctx.step.run('validate', () => validateOrder(input));

      // Step 2: Charge payment
      const payment = await ctx.step.run('charge', () => chargeStripe(order));

      // Step 3: Send confirmation
      await ctx.step.run('send_email', () => sendConfirmation(order));

      return { orderId: order.id, status: 'completed' };
    }
  );
  ```
</CodeGroup>

**What makes workflows special:**

* **Durable** - Survive crashes and resume from the last completed step
* **Stateful** - Maintain context across execution
* **Composable** - Call other workflows and wait for results
* **Observable** - Track execution with real-time events

## Why workflows?

Traditional functions fail completely when something goes wrong. Workflows persist progress at each step:

**Without workflows:**

<CodeGroup>
  ```python Python theme={null}
  async def process_order(order):
      validate_order(order)           # ✓ Completes
      charge_stripe(order)            # ✓ Completes
      send_confirmation(order)        # ❌ Server crashes
      # On restart: Everything runs again
      # Result: Customer charged twice
  ```

  ```typescript TypeScript theme={null}
  async function processOrder(order: Order) {
    validateOrder(order);             // ✓ Completes
    chargeStripe(order);              // ✓ Completes
    sendConfirmation(order);          // ❌ Server crashes
    // On restart: Everything runs again
    // Result: Customer charged twice
  }
  ```
</CodeGroup>

**With workflows:**

<CodeGroup>
  ```python Python theme={null}
  @workflow
  async def process_order(ctx: WorkflowContext, input: OrderInput):
      order = await ctx.step.run("validate", validate_order, input)   # ✓ Cached
      payment = await ctx.step.run("charge", charge_stripe, order)    # ✓ Cached
      await ctx.step.run("send_email", send_confirmation, order)      # ❌ Crashes
      # On restart: Resumes from send_email
      # Result: Customer charged once, email sent successfully
  ```

  ```typescript TypeScript theme={null}
  const processOrder = defineWorkflow<OrderInput, {}, OrderOutput>(
    { id: 'process-order' },
    async (ctx, input) => {
      const order = await ctx.step.run('validate', () => validateOrder(input));   // ✓ Cached
      const payment = await ctx.step.run('charge', () => chargeStripe(order));    // ✓ Cached
      await ctx.step.run('send_email', () => sendConfirmation(order));            // ❌ Crashes
      // On restart: Resumes from send_email
      // Result: Customer charged once, email sent successfully
    }
  );
  ```
</CodeGroup>

## Workflow context

Every workflow receives a context object with execution metadata and methods:

<CodeGroup>
  ```python Python theme={null}
  @workflow
  async def example_workflow(ctx: WorkflowContext, input: dict):
      # Execution identifiers
      execution_id = ctx.execution_id
      workflow_id = ctx.workflow_id

      # User context
      user_id = ctx.user_id
      session_id = ctx.session_id

      # Timestamps
      created_at = ctx.created_at
      started_at = ctx.started_at

      # Step methods
      result = await ctx.step.run("step_name", function, args)
      await ctx.step.wait_for("wait_step", seconds=60)

      return {"status": "completed"}
  ```

  ```typescript TypeScript theme={null}
  import { defineWorkflow, WorkflowContext } from '@polos/sdk';

  const exampleWorkflow = defineWorkflow<Record<string, unknown>, {}, { status: string }>(
    { id: 'example-workflow' },
    async (ctx, input) => {
      // Execution identifiers
      const executionId = ctx.executionId;
      const workflowId = ctx.workflowId;

      // User context
      const userId = ctx.userId;
      const sessionId = ctx.sessionId;

      // Timestamps
      const createdAt = ctx.createdAt;
      const startedAt = ctx.startedAt;

      // Step methods
      const result = await ctx.step.run('step_name', () => someFunction(args));
      await ctx.step.waitFor('wait_step', { seconds: 60 });

      return { status: 'completed' };
    }
  );
  ```
</CodeGroup>

## Key concepts

### Steps

Steps are the unit of durability. Each step's output is persisted when the step completes.

<CodeGroup>
  ```python Python theme={null}
  @workflow
  async def example(ctx: WorkflowContext, input: dict):
      # Each step is durable
      step1 = await ctx.step.run("step1", func1, input)
      step2 = await ctx.step.run("step2", func2, step1)
      step3 = await ctx.step.run("step3", func3, step2)
  ```

  ```typescript TypeScript theme={null}
  const example = defineWorkflow<Record<string, unknown>, {}, void>(
    { id: 'example' },
    async (ctx, input) => {
      // Each step is durable
      const step1 = await ctx.step.run('step1', () => func1(input));
      const step2 = await ctx.step.run('step2', () => func2(step1));
      const step3 = await ctx.step.run('step3', () => func3(step2));
    }
  );
  ```
</CodeGroup>

See [Steps](/workflows/steps) for details.

### Workflow composition

Workflows can call other workflows or agents. They can wait for those to complete or simply trigger them as fire-and-forget.

<CodeGroup>
  ```python Python theme={null}
  @workflow
  async def parent(ctx: WorkflowContext, input: ParentInput):
      # Call child workflow but don't wait for it to complete
      exec_handle = await ctx.step.invoke(
          "call_child",
          child_workflow,
          {"data": input.data}
      )

      # Call agent and wait
      response = await ctx.step.agent_invoke_and_wait(
          "research_agent",
          research_agent.with_input(input.data)
      )
      return response.result

  @workflow
  async def child_workflow(ctx: WorkflowContext, input: dict):
      # Process data
      return {"processed": True}
  ```

  ```typescript TypeScript theme={null}
  const parent = defineWorkflow<ParentInput, {}, unknown>(
    { id: 'parent' },
    async (ctx, input) => {
      // Call child workflow but don't wait for it to complete
      const execHandle = await ctx.step.invoke(
        'call_child',
        childWorkflow,
        { data: input.data }
      );

      // Call agent and wait
      const response = await ctx.step.agentInvokeAndWait(
        'research_agent',
        researchAgent.withInput(input.data)
      );
      return response.result;
    }
  );

  const childWorkflow = defineWorkflow<Record<string, unknown>, {}, { processed: boolean }>(
    { id: 'child-workflow' },
    async (ctx, input) => {
      // Process data
      return { processed: true };
    }
  );
  ```
</CodeGroup>

### Control flow

Use normal control flow:

<CodeGroup>
  ```python Python theme={null}
  @workflow
  async def conditional_workflow(ctx: WorkflowContext, input: ConditionalWorkflowInput):
      # Conditionals
      if input.amount > 1000:
          await ctx.step.run("high_value", process_high_value, input)
      else:
          await ctx.step.run("standard", process_standard, input)

      # Loops
      for item in input.items:
          await ctx.step.run(f"process_{item.id}", process_item, item)

      # Parallel execution
      results = await asyncio.gather(
          ctx.step.run("task_a", task_a, input),
          ctx.step.run("task_b", task_b, input),
          ctx.step.run("task_c", task_c, input)
      )

      return results
  ```

  ```typescript TypeScript theme={null}
  const conditionalWorkflow = defineWorkflow<ConditionalWorkflowInput, {}, unknown[]>(
    { id: 'conditional-workflow' },
    async (ctx, input) => {
      // Conditionals
      if (input.amount > 1000) {
        await ctx.step.run('high_value', () => processHighValue(input));
      } else {
        await ctx.step.run('standard', () => processStandard(input));
      }

      // Loops
      for (const item of input.items) {
        await ctx.step.run(`process_${item.id}`, () => processItem(item));
      }

      // Parallel execution
      const results = await Promise.all([
        ctx.step.run('task_a', () => taskA(input)),
        ctx.step.run('task_b', () => taskB(input)),
        ctx.step.run('task_c', () => taskC(input)),
      ]);

      return results;
    }
  );
  ```
</CodeGroup>

### Waits and events

Workflows can pause and resume:

<CodeGroup>
  ```python Python theme={null}
  @workflow
  async def approval_workflow(ctx: WorkflowContext, input: ApprovalWorkflowInput):
      # Process request
      request = await ctx.step.run("process", process_request, input)

      # Wait for approval event
      approval = await ctx.step.wait_for_event(
          "wait_approval",
          topic="approval.response"
      )

      if approval.data["approved"]:
          result = await ctx.step.run("execute", execute_action, request)
          return result
  ```

  ```typescript TypeScript theme={null}
  const approvalWorkflow = defineWorkflow<ApprovalWorkflowInput, {}, unknown>(
    { id: 'approval-workflow' },
    async (ctx, input) => {
      // Process request
      const request = await ctx.step.run('process', () => processRequest(input));

      // Wait for approval event
      const approval = await ctx.step.waitForEvent(
        'wait_approval',
        { topic: 'approval.response' }
      );

      if (approval.data['approved']) {
        const result = await ctx.step.run('execute', () => executeAction(request));
        return result;
      }
    }
  );
  ```
</CodeGroup>

See [Waits](/workflows/waits) and [Events](/workflows/event-triggered-workflows) for details.

## Starting workflows

Workflows can be started in three ways:

### 1. Invoke using API or SDK

<CodeGroup>
  ```python Python theme={null}
  client = PolosClient()
  result = await order_workflow.invoke(
      client, payload={"order_id": "123"}
  )
  ```

  ```typescript TypeScript theme={null}
  import { PolosClient } from '@polos/sdk';

  const client = PolosClient.fromEnv();
  const result = await orderWorkflow.invoke(
    client, { orderId: '123' }
  );
  ```
</CodeGroup>

You can also invoke multiple workflows or agents in parallel:

<CodeGroup>
  ```python Python theme={null}
  from polos import (
      batch_invoke, batch_agent_invoke, BatchWorkflowInput, PolosClient
  )

  client = PolosClient()

  # Batch invoke multiple workflows
  handles = await client.batch_invoke([
      BatchWorkflowInput(id="workflow-1", payload={"foo": "bar"}),
      BatchWorkflowInput(id="workflow-2", payload={"baz": 42}),
      BatchWorkflowInput(id="workflow-3", payload={"qux": "test"}),
  ])

  # Batch invoke multiple agents
  agent_handles = await client.batch_agent_invoke([
      grammar_agent.with_input("Check this text"),
      tone_agent.with_input("Check this text"),
      accuracy_agent.with_input("Check this text"),
  ])
  ```

  ```typescript TypeScript theme={null}
  import { PolosClient } from '@polos/sdk';

  const client = PolosClient.fromEnv();

  // Batch invoke multiple workflows
  const handles = await client.batchInvoke([
    { workflow: 'workflow-1', payload: { foo: 'bar' } },
    { workflow: 'workflow-2', payload: { baz: 42 } },
    { workflow: 'workflow-3', payload: { qux: 'test' } },
  ]);

  // Batch invoke multiple agents
  const agentHandles = await client.batchAgentInvoke([
    grammarAgent.withInput('Check this text'),
    toneAgent.withInput('Check this text'),
    accuracyAgent.withInput('Check this text'),
  ]);
  ```
</CodeGroup>

**Note:** Batch invoke cannot be called from within a workflow. Use `ctx.step.batch_invoke()` or `ctx.step.batch_agent_invoke()` when calling from within workflows. See [Steps](/workflows/steps) for details.

### 2. Scheduled execution

<CodeGroup>
  ```python Python theme={null}
  @workflow(schedule="0 9 * * *")  # Daily at 9am
  async def daily_report(ctx: WorkflowContext, input: SchedulePayload):
      data = await ctx.step.run("fetch", fetch_data)
      report = await ctx.step.run("generate", generate_report, data)
      await ctx.step.run("send", send_email, report)
  ```

  ```typescript TypeScript theme={null}
  const dailyReport = defineWorkflow<SchedulePayload, {}, void>(
    { id: 'daily-report', schedule: '0 9 * * *' }, // Daily at 9am
    async (ctx, input) => {
      const data = await ctx.step.run('fetch', () => fetchData());
      const report = await ctx.step.run('generate', () => generateReport(data));
      await ctx.step.run('send', () => sendEmail(report));
    }
  );
  ```
</CodeGroup>

### 3. Event-triggered

<CodeGroup>
  ```python Python theme={null}
  @workflow(trigger_on_event="user.signup")
  async def onboard_user(ctx: WorkflowContext, event: EventPayload):
      user_id = event.data["user_id"]
      await ctx.step.run("welcome", send_welcome_email, user_id)
      await ctx.step.run("setup", create_sample_data, user_id)
  ```

  ```typescript TypeScript theme={null}
  const onboardUser = defineWorkflow<EventPayload, {}, void>(
    { id: 'onboard-user', triggerOnEvent: 'user.signup' },
    async (ctx, event) => {
      const userId = event.data['user_id'];
      await ctx.step.run('welcome', () => sendWelcomeEmail(userId));
      await ctx.step.run('setup', () => createSampleData(userId));
    }
  );
  ```
</CodeGroup>

## Key takeaways

* **Workflows are durable functions** - They survive failures and resume from the last completed step
* **Steps are the unit of durability** - Each step's output is persisted
* **WorkflowContext provides execution metadata** - Access IDs, timestamps, and step methods
* **Compose workflows** - Call other workflows and wait for results
* **Use normal control flow** - Conditionals, loops, and parallel execution
* **Start workflows** - Via API, schedule, or events
