Documentation Index
Fetch the complete documentation index at: https://polos.dev/docs/llms.txt
Use this file to discover all available pages before exploring further.
Monitor workflow execution in real-time by streaming lifecycle events. Events provide visibility into steps, tool calls, and execution progress.
What are lifecycle events?
Lifecycle events are emitted automatically during workflow execution:
workflow_start - Workflow begins execution
workflow_finish - Workflow completes
step_start - Step begins
step_finish - Step completes
For agents, additional events:
agent_start - Agent execution begins
agent_finish - Agent execution completes (includes usage stats)
text_delta - Text streaming chunks (only with agent.stream())
tool_call - Tool execution triggered (only with agent.stream())
Streaming workflow events
Listen to events for a specific workflow execution:
from polos import workflow, WorkflowContext, PolosClient, events
@workflow
async def data_pipeline(ctx: WorkflowContext, input: PipelineInput):
data = await ctx.step.run("extract", extract_data, input)
transformed = await ctx.step.run("transform", transform_data, data)
final = await ctx.step.invoke_and_wait("child_pipeline", child_pipeline, transformed)
await ctx.step.run("load", load_data, final)
# Start workflow
client = PolosClient()
handle = await data_pipeline.invoke(
client,
PipeInput(source="database", table="orders")
)
# Stream events
async for event in events.stream_workflow(client, handle.root_workflow_id, handle.id):
if event.event_type == "step_start":
print(f"Step started: {event.data.get('step_key')}")
elif event.event_type == "step_finish":
print(f"Step finished: {event.data.get('step_key')}")
elif event.event_type == "workflow_finish":
print(f"Workflow completed")
Event structure
Events contain event_type, data and execution metadata.
Example event data:
{
"id": "9124c653-41e8-4a7d-ad43-76d642b422bd",
"sequence_id": 1001,
"event_type": "step_start",
"data": {
"step_key": "extract",
"step_type": "run",
"_metadata": {
"execution_id": "exec_123",
"workflow_id": "data-pipeline"
}
},
"created_at": "2025-01-29T10:30:00Z"
}
Workflows contain events for all steps and subworkflows that it executes. The data_pipeline example above has 3 steps and 1 workflow. So, it will see the following events:
workflow_start -> "data": {"_metadata": {"workflow_id": "data_pipeline"}}
step_start -> "data": {"_metadata": {"workflow_id": "data_pipeline"}, "step_key": "extract"}
step_finish -> "data": {"_metadata": {"workflow_id": "data_pipeline"}, "step_key": "extract"}
step_start -> "data": {"_metadata": {"workflow_id": "data_pipeline"}, "step_key": "transform"}
step_finish -> "data": {"_metadata": {"workflow_id": "data_pipeline"}, "step_key": "transform"}
workflow_start -> "data": {"_metadata": {"workflow_id": "child_pipeline"}}
< events for steps and subworkflows from child_pipeline >
workflow_finish -> "data": {"_metadata": {"workflow_id": "child_pipeline"}}
step_start -> "data": {"_metadata": {"workflow_id": "data_pipeline"}, "step_key": "load"}
step_finish -> "data": {"_metadata": {"workflow_id": "data_pipeline"}, "step_key": "load"}
workflow_finish -> "data": {"_metadata": {"workflow_id": "data_pipeline"}}
The metadata field identifies which workflow / subworkflow emitted the event.
Building real-time UIs
Lifecycle events are useful for streaming progress updates to UI. You can send events to
frontend via Websocket:
from polos import events, PolosClient
async def stream_to_websocket(client: PolosClient, root_workflow_id: str, execution_id: str, websocket):
"""Stream workflow events to WebSocket client."""
async for event in events.stream_workflow(client, root_workflow_id, execution_id):
# Send event to frontend
await websocket.send_json({
"type": event.event_type,
"data": event.data,
"timestamp": event.created_at
})
# Stop streaming when workflow completes
if event.event_type == "workflow_finish" and event.data["metadata"]["execution_id"] == execution_id:
break
Frontend (JavaScript):
const ws = new WebSocket('ws://localhost:8000/stream');
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'step_start') {
console.log(`Step started: ${data.data.step_key}`);
} else if (data.type === 'step_finish') {
console.log(`Step finished: ${data.data.step_key}`);
} else if (data.type === 'workflow_finish') {
console.log('Workflow completed!');
}
};