Run workflows in parallel and aggregate results
from polos import workflow, WorkflowContext
from polos.types.types import BatchWorkflowInput
@workflow(id="parallel_review")
async def parallel_review(ctx: WorkflowContext, payload):
reviewers = ["alice", "bob", "charlie"]
# Create batch of review requests
requests = [
BatchWorkflowInput(
id="single_review",
payload={"reviewer": r, "document_id": payload["doc_id"]},
)
for r in reviewers
]
# Run all in parallel
results = await ctx.step.batch_invoke_and_wait("reviews", requests)
# Aggregate
approved = sum(1 for r in results if r.result.get("approved"))
return {"approved": approved, "total": len(results)}
@workflow(id="launch_background_tasks")
async def launch_tasks(ctx: WorkflowContext, payload):
requests = [
BatchWorkflowInput(id="background_task", payload={"task_id": i})
for i in range(10)
]
# Launch without waiting
handles = await ctx.step.batch_invoke("launch", requests)
return {"launched": len(handles), "ids": [h.id for h in handles]}
@workflow(id="process_chunks")
async def process_chunks(ctx: WorkflowContext, payload):
data = payload["data"]
chunk_size = 10
# Split into chunks
chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]
# Process in parallel
requests = [
BatchWorkflowInput(id="process_chunk", payload={"chunk": c, "index": i})
for i, c in enumerate(chunks)
]
results = await ctx.step.batch_invoke_and_wait("chunks", requests)
# Combine results
all_processed = []
for r in results:
all_processed.extend(r.result.get("processed", []))
return {"total": len(all_processed)}
git clone https://github.com/polos-dev/polos.git
cd polos/python-examples/13-parallel-review
cp .env.example .env
uv sync
python worker.py # Terminal 1
python main.py # Terminal 2