Documentation Index
Fetch the complete documentation index at: https://polos.dev/docs/llms.txt
Use this file to discover all available pages before exploring further.
Workflow state persists data across the entire workflow execution. Use state for counters, accumulators, configuration, or any data that multiple steps need to access and modify.
Defining workflow state
Create a schema for state using WorkflowState:
from polos import workflow, WorkflowContext, WorkflowState
class ProcessingState(WorkflowState):
processed_count: int = 0
failed_items: list[str] = []
started_at: str = ""
@workflow(state_schema=ProcessingState)
async def batch_processor(ctx: WorkflowContext, input: BatchProcessorInput):
# Access state via ctx.state
ctx.state.started_at = input.timestamp
for item in input.items:
try:
await ctx.step.run(f"process_{item}", process_item, item)
ctx.state.processed_count += 1
except Exception as e:
ctx.state.failed_items.append(item)
return BatchProcessorOutput(
processed=ctx.state.processed_count,
failed=len(ctx.state.failed_items)
)
When to use state
Use state for:
- ✅ Counters and accumulators
- ✅ Progress tracking
- ✅ Configuration set during execution
- ✅ Data shared across multiple steps
Don’t use state for:
- ❌ Large data (state has 1MB limit)
- ❌ Temporary variables (use local variables)
State is saved when the workflow completes or fails.
Setting initial state during workflow invocation
You can set the initial workflow state when invoking it:
class ParentState(WorkflowState):
tasks_started: int = 0
class ChildState(WorkflowState):
parent_id: str = ""
config: dict = {}
@workflow(state_schema=ParentState)
async def parent_workflow(ctx: WorkflowContext, input: ParentInput):
ctx.state.tasks_started += 1
# Invoke child workflow with some initial values for its state
result = await ctx.step.invoke_and_wait(
"call_child",
child_workflow,
payload={"data": input.data},
initial_state=ChildState(
parent_id=ctx.execution_id,
config={"mode": "production"}
)
)
return result
@workflow(state_schema=ChildState)
async def child_workflow(ctx: WorkflowContext, input: ChildInput):
# Access initial state
print(f"Parent ID: {ctx.state.parent_id}")
print(f"Config: {ctx.state.config}")
await ctx.step.run("process", process_data, input.data)
return {"processed": True}
Invoke parent workflow with some initial values for its state:
client = PolosClient()
result = await parent_workflow(
client,
ParentInput(data={"resume_from": 10}),
initial_state=ParentState(tasks_started=10)
)
Best practices
-
Do not use state to store large objects. Polos enforces a limit of 1MB (serialized JSON) on state size.
-
Use default values
class ConfigState(WorkflowState):
retry_limit: int = 3
timeout_seconds: int = 30
debug_mode: bool = False