Customize automatic retry
Copy
from polos import workflow, WorkflowContext
@workflow(id="retry_example")
async def retry_example(ctx: WorkflowContext, payload):
result = await ctx.step.run(
"unreliable_op",
unreliable_operation,
max_retries=3,
base_delay=1.0,
max_delay=10.0,
)
return {"result": result}
Error recovery
Copy
from polos import StepExecutionError
@workflow(id="error_recovery")
async def error_recovery(ctx: WorkflowContext, payload):
results, errors = [], []
for item in payload["items"]:
try:
result = await ctx.step.run(f"process_{item}", process_item, item)
results.append(result)
except StepExecutionError as e:
errors.append({"item": item, "error": str(e)})
return {"processed": len(results), "failed": len(errors)}
Fallback pattern
Copy
@workflow(id="fallback_pattern")
async def fallback_pattern(ctx: WorkflowContext, payload):
try:
return await ctx.step.run("primary", primary_process, payload)
except StepExecutionError:
return await ctx.step.run("fallback", fallback_process, payload)
Compensation (rollback)
Copy
@workflow(id="compensation")
async def compensation(ctx: WorkflowContext, payload):
completed = []
try:
await ctx.step.run("reserve", reserve_inventory, payload)
completed.append("reserve")
await ctx.step.run("charge", charge_payment, payload)
completed.append("charge")
return {"status": "success"}
except StepExecutionError:
# Rollback in reverse order
for step in reversed(completed):
await ctx.step.run(f"undo_{step}", compensate[step], payload)
return {"status": "rolled_back"}
Run it
Copy
git clone https://github.com/polos-dev/polos.git
cd polos/python-examples/11-error-handling
cp .env.example .env
uv sync
python worker.py # Terminal 1
python main.py # Terminal 2