Documentation Index
Fetch the complete documentation index at: https://polos.dev/docs/llms.txt
Use this file to discover all available pages before exploring further.
Event-triggered workflows execute automatically when events are published to specific topics. Perfect for building reactive systems, webhooks, and event-driven architectures.
Defining event-triggered workflows
Use trigger_on_event to specify which topic should trigger the workflow:
from polos import workflow, WorkflowContext, EventPayload
@workflow(
id="user-signup-handler",
trigger_on_event="user/signup"
)
async def user_signup_handler(ctx: WorkflowContext, payload: EventPayload):
# Triggered when event published to "user.signup" topic
user_id = payload.data["user_id"]
email = payload.data["email"]
# Send welcome email
await ctx.step.run("send_welcome", send_welcome_email, user_id, email)
# Create sample data
await ctx.step.run("setup_account", create_sample_data, user_id)
return {"status": "onboarded", "user_id": user_id}
Event payload structure
Event-triggered workflows receive the event in their payload:
@workflow(
id="notification-handler",
trigger_on_event="notifications/new"
)
async def notification_handler(ctx: WorkflowContext, payload: EventPayload):
# Single event
print(f"Event ID: {payload.id}")
print(f"Sequence ID: {payload.sequence_id}")
print(f"Topic: {payload.topic}")
print(f"Event Type: {payload.event_type}")
print(f"Data: {payload.data}") # This is a dict
print(f"Created At: {payload.created_at}")
# Process the event
notification_data = payload.data
await ctx.step.run("send", send_notification, notification_data)
Event structure:
{
"id": "evt_123abc",
"sequence_id": 456
"topic": "notifications.new",
"event_type": "notification_created",
"data": {
"user_id": "user_456",
"message": "You have a new message"
},
"created_at": "2025-01-28T10:30:00Z"
}
Publishing events
Publish events from workflows or external systems:
From within a workflow
@workflow
async def create_user(ctx: WorkflowContext, input: CreateUserInput):
# Create user
user = await ctx.step.run("create", create_user_record, input)
# Publish event (triggers event-triggered workflows)
await ctx.step.publish_event(
"publish_signup",
topic="user/signup",
data={
"user_id": user.id,
"email": user.email,
"name": user.name
},
event_type="user_created"
)
return {"user_id": user.id}
From external systems (API)
import httpx
async def publish_event(topic: str, data: dict):
"""Publish an event via API."""
async with httpx.AsyncClient() as client:
response = await client.post(
"https://api.polos.ai/api/v1/events/publish",
headers={
"Authorization": "Bearer YOUR_API_KEY",
"Content-Type": "application/json"
},
json={
"topic": topic,
"events": [{
"data": data,
"event_type": "custom_event"
}]
}
)
response.raise_for_status()
# Trigger workflows listening to "order.completed"
await publish_event(
"order.completed",
{"order_id": "ord_123", "total": 99.99}
)
Batch events for processing
Process multiple events together for efficiency:
@workflow(
id="batch-processor",
trigger_on_event="analytics/events",
batch_size=10, # Process up to 10 events
batch_timeout_seconds=30 # Or wait max 30 seconds
)
async def batch_processor(ctx: WorkflowContext, payload: BatchEventPayload):
# Multiple events in the payload
events = payload.events
print(f"Processing batch of {len(events)} events")
# Extract all event data
analytics_data = [event.data for event in events]
# Process batch
await ctx.step.run("batch_insert", insert_analytics, analytics_data)
return {"processed": len(events)}
Batching behavior:
- Workflow triggers when either
batch_size is reached or batch_timeout_seconds elapses
- If 10 events arrive in 5 seconds → triggers immediately with 10 events
- If only 3 events arrive in 30 seconds → triggers with 3 events after timeout
Batch payload structure:
{
"events": [
{
"id": "evt_1",
"sequence_id": 1001,
"topic": "analytics.events",
"data": {"action": "click"},
"created_at": "2025-01-28T10:30:00Z"
},
{
"id": "evt_2",
"sequence_id": 1002,
"topic": "analytics.events",
"data": {"action": "view"},
"created_at": "2025-01-28T10:30:05Z"
}
]
}
Multiple handlers for one topic
Multiple workflows can listen to the same topic:
@workflow(
id="immediate-handler",
trigger_on_event="order/created"
)
async def immediate_handler(ctx: WorkflowContext, payload: EventPayload):
"""Process each order immediately."""
order_id = payload.data["order_id"]
await ctx.step.run("send_confirmation", send_order_confirmation, order_id)
return {"handler": "immediate"}
@workflow(
id="batched-handler",
trigger_on_event="order/created",
batch_size=5,
batch_timeout_seconds=60
)
async def batched_handler(ctx: WorkflowContext, payload: BatchEventPayload):
"""Process orders in batches for analytics."""
events = payload.events
order_ids = [e.data["order_id"] for e in events]
await ctx.step.run("batch_analytics", update_analytics, order_ids)
return {"handler": "batched", "count": len(events)}
When an event is published:
immediate-handler triggers once per event
batched-handler triggers once per batch (up to 5 events or 60 seconds)
Event filtering
Filter events by event_type:
@workflow(
id="high-priority-handler",
trigger_on_event="notifications/system"
)
async def high_priority_handler(ctx: WorkflowContext, payload: PayloadEvent):
# Only process high priority notifications
if payload.event_type == "high_priority":
await ctx.step.run("alert", send_alert, payload.data)
else:
print(f"Skipping event type: {payload.event_type}")
Alternative approach: Use multiple topics
# Publish to specific topics
await ctx.step.publish_event(
"publish",
topic="notifications/high_priority", # Specific topic
data=notification_data
)
@workflow(
id="high-priority-handler",
trigger_on_event="notifications/high_priority" # Only high priority
)
async def high_priority_handler(ctx: WorkflowContext, payload: EventPayload):
# All events on this topic are high priority
await ctx.step.run("alert", send_alert, payload.data)
Event topic patterns
Use topic hierarchies for organization:
# User events
"user/signup"
"user/login"
"user/deleted"
# Order events
"order/created"
"order/completed"
"order/cancelled"
# System events
"system/error"
"system/warning"
"system/maintenance"
Key takeaways
- Event-triggered workflows execute automatically when events are published
- Use
trigger_on_event to specify the topic
- Publish events from workflows or external systems via API
- Batch events for efficiency with
batch_size and batch_timeout_seconds
- Multiple handlers can listen to the same topic