Documentation Index
Fetch the complete documentation index at: https://polos.dev/docs/llms.txt
Use this file to discover all available pages before exploring further.
Workflows can pause execution to wait for time delays or external events. During waits, the worker suspends execution and no compute resources are consumed.
Why use waits?
Waits enable workflows to:
- Delay execution - Wait hours, days, or weeks between steps
- Coordinate with external systems - Wait for webhooks, approvals, or user actions
- Implement timeouts - Fail gracefully if events don’t arrive
- Save costs - No compute consumed while waiting
Types of waits
1. Time based: wait_for
Pause for a duration:
from polos import workflow, WorkflowContext
@workflow
async def delayed_workflow(ctx: WorkflowContext, input: WorkflowInput):
# Process immediately
data = await ctx.step.run("fetch", fetch_data)
# Wait 1 hour (worker suspends)
await ctx.step.wait_for("wait_1_hour", hours=1)
# Resume after 1 hour
result = await ctx.step.run("process", process_data, data)
return result
Available time units:
# Wait 30 seconds
await ctx.step.wait_for("wait", seconds=30)
# Wait 5 minutes
await ctx.step.wait_for("wait", minutes=5)
# Wait 2 hours
await ctx.step.wait_for("wait", hours=2)
# Wait 3 days
await ctx.step.wait_for("wait", days=3)
# Wait 1 week
await ctx.step.wait_for("wait", weeks=1)
# Combine units
await ctx.step.wait_for("wait", hours=1, minutes=30) # 1.5 hours
2. Timestamp-based: wait_until
Wait until a specific datetime
from datetime import datetime, timezone
@workflow
async def scheduled_action(ctx: WorkflowContext, input: dict):
# Schedule for specific time
target_time = datetime(2025, 12, 31, 23, 59, 0, tzinfo=timezone.utc)
# Wait until exactly 23:59 on Dec 31, 2025
await ctx.step.wait_until("wait_new_year", target_time)
# Executes at the specified time
await ctx.step.run("celebrate", send_celebration)
Timezone handling:
from datetime import datetime
from zoneinfo import ZoneInfo
# Wait until 9 AM Eastern Time
target = datetime(2025, 2, 1, 9, 0, 0, tzinfo=ZoneInfo("America/New_York"))
await ctx.step.wait_until("wait_morning", target)
# If target is in the past, raises error
3. Event-based: wait_for_event
Wait for an external event:
@workflow
async def approval_workflow(ctx: WorkflowContext, input: dict):
# Submit for approval
await ctx.step.run("submit", submit_for_approval, input)
# Wait for approval event (hours or days)
approval = await ctx.step.wait_for_event(
"wait_approval",
topic="approval/response"
)
# Resume when event arrives
if approval.data["approved"]:
await ctx.step.run("execute", execute_action, input)
else:
return {"status": "rejected"}
With timeout:
# Wait for event with 24-hour timeout
try:
approval = await ctx.step.wait_for_event(
"wait_approval",
topic="approval.response",
timeout=86400 # 24 hours in seconds
)
# Event received
return {"approved": approval.data["approved"]}
except StepExecutionError:
# Timeout expired without event
return {"status": "timeout"}
Wait behavior
Worker suspension
When a workflow waits:
- Worker suspends execution - Workflow state is saved
- No compute consumed - Worker is freed to handle other workflows
- Orchestrator tracks wait - Schedules resumption
- Workflow resumes - Worker picks up from where it stopped
Short vs long waits
Short waits (≤10 seconds):
- Worker sleeps inline (doesn’t suspend)
- Slightly more efficient (no orchestrator round-trip)
- Configurable via
POLOS_WAIT_THRESHOLD_SECONDS environment variable
Long waits (>10 seconds):
- Worker suspends and is freed
- Orchestrator schedules resumption
- Essential for hours/days/weeks
# Short wait - worker sleeps inline
await ctx.step.wait_for("short", seconds=5)
# Long wait - worker suspends
await ctx.step.wait_for("long", minutes=30)
Publishing events to resume waits
From within a workflow
@workflow
async def publisher_workflow(ctx: WorkflowContext, input: dict):
# Process data
result = await ctx.step.run("process", process_data, input)
# Publish event (resumes workflows waiting on this topic)
await ctx.step.publish_event(
"notify",
topic="approval/response",
data={
"approved": True,
"approved_by": "alice@example.com"
}
)
return result
From external systems (API)
import httpx
async def send_approval(approved: bool, approved_by: str):
"""Send approval event via API."""
async with httpx.AsyncClient() as client:
await client.post(
"https://api.polos.ai/api/v1/events/publish",
headers={
"Authorization": "Bearer YOUR_API_KEY",
"Content-Type": "application/json"
},
json={
"topic": "approval/response",
"events": [{
"data": {
"approved": approved,
"approved_by": approved_by
}
}]
}
)
# Resume waiting workflow
await send_approval(approved=True, approved_by="alice@example.com")