Documentation Index
Fetch the complete documentation index at: https://polos.dev/docs/llms.txt
Use this file to discover all available pages before exploring further.
Streaming provides real-time feedback as agents execute, allowing you to display progress to users and handle responses incrementally.
Basic streaming
Use agent.stream() to stream agent responses:
import asyncio
from polos import PolosClient
async def main():
client = PolosClient()
stream = await weather_agent.stream(client, "What's the weather in Tokyo?")
# Stream text as it's generated
async for chunk in stream.text_chunks:
print(chunk, end="", flush=True)
print("\n")
if __name__ == "__main__":
asyncio.run(main())
Stream iterators
Polos provides multiple ways to consume streamed responses:
Text chunks
Stream only the text content:
stream = await agent.stream(client, "Explain quantum computing")
async for chunk in stream.text_chunks:
print(chunk, end="", flush=True)
Use case: Display text to users in real-time, like ChatGPT’s streaming interface.
All events
Access full event stream including tool calls, steps, and metadata:
stream = await agent.stream(client, "Search for Python tutorials and summarize them")
async for event in stream.events:
if event.event_type == "text_delta":
print(event.data.get("content", ""), end="", flush=True)
elif event.event_type == "tool_call":
tool_name = event.data.get("tool_name")
print(f"\n[Calling tool: {tool_name}]\n")
Use case: Build rich UIs that show tool execution progress, reasoning steps, etc.
Final text
Get the complete accumulated text after streaming finishes:
stream = await agent.stream(client, "Write a haiku about programming")
final_text = await stream.text()
print(final_text)
Use case: When you need the complete response but want streaming for progress indication.
Complete result
Get the full result object with usage stats, tool calls, and metadata:
stream = await agent.stream(client, "What's 2+2?")
result = await stream.result()
print(f"Result: {result.result}")
print(f"Total steps: {result.total_steps}")
print(f"Tokens used: {result.usage.total_tokens}")
# Access tool calls
for tool_call in result.tool_results:
print(f"Tool: {tool_call.tool_name}")
print(f"Input: {tool_call.result}")
Use case: Analytics, logging, cost tracking, debugging.
Lifecycle events
By default, agents emit lifecycle events during execution - regardless of whether you use agent.run() or agent.stream(). These events help you track progress and display status to users.
Event types
agent_start
- Marks the beginning of agent execution
- Contains agent ID and initial configuration
agent_finish
- Marks the end of execution
- Includes usage statistics (tokens, cost)
- Contains final result
step_start
- Indicates a workflow step has begun - for example, LLM or tool call
step_finish
- Confirms step completion
- Includes step output and duration
text_delta (only with agent.stream())
- Incremental text chunks as the LLM generates them
- Real-time content streaming
tool_call (only with agent.stream())
- Emitted when the agent asks for a tool execution
- Includes tool name and arguments
- The results of the tool execution are available via step_finish event when the tool finishes execution
Using lifecycle events
stream = await agent.stream(client, "Research AI agents and create a summary")
async for event in stream.events:
if event.event_type == "agent_start":
print("🚀 Agent started")
elif event.event_type == "step_start":
step_name = event.data.get("step_key")
print(f"⏳ Running step: {step_name}")
elif event.event_type == "tool_call":
tool_name = event.data.get("tool_call", {}).get("function", {}).get("name")
print(f"🔧 Calling tool: {tool_name}")
elif event.event_type == "text_delta":
content = event.data.get("content", "")
print(content, end="", flush=True)
elif event.event_type == "step_finish":
step_name = event.data.get("step_key")
print(f"\n✓ Completed step: {step_name}")
elif event.event_type == "agent_finish":
usage = event.data.get("usage", {})
print(f"\n✅ Agent finished - Tokens: {usage.get('total_tokens')}")
Example output:
🚀 Agent started
🔧 Calling tool: search_web
⏳ Running step: search_web
✓ Completed step: search_web
⏳ Running step: generate_summary
Based on recent research, AI agents are...
✓ Completed step: generate_summary
✅ Agent finished - Tokens: 1247
Stream with agent.run()
Even when using agent.run() (non-streaming), lifecycle events are still emitted. You can listen to them separately:
from polos import PolosClient, events
import json
client = PolosClient()
# Start the agent - non-streaming
handle = await weather_agent.invoke(
client,
payload={
"input": "What is the weather in New York and Mumbai? Compare them.",
"streaming": False, # Set to False for non-streaming execution
}
)
# Listen to events using stream_workflow
print("Streaming events:\n")
async for event in events.stream_workflow(client, handle.root_workflow_id, handle.id):
if event.event_type == "step_start":
print(f"\n\nStep started: {event.data.get('step_key')}")
elif event.event_type == "step_finish":
# Print the result of the step
print(f"\n\nStep finished: {json.dumps(event.data.get('data', {}).get('result', {}), indent=2)}")
elif event.event_type == "agent_finish":
print("\n\nAgent finished")
Building UIs with streaming
Basic text streaming UI
async def stream_to_ui(agent, user_message):
stream = await agent.stream(client, user_message)
async for chunk in stream.text_chunks:
# Send chunk to frontend
await websocket.send_json({
"type": "text_chunk",
"content": chunk
})
# Send completion signal
await websocket.send_json({"type": "done"})
Rich progress UI
async def stream_with_progress(agent, user_message):
stream = await agent.stream(client, user_message)
async for event in stream.events:
if event.event_type == "tool_call":
await websocket.send_json({
"type": "tool_call",
"tool": event.data.get("tool_call", {}).get("function", {}).get("tool_name"),
"status": "executing"
})
elif event.event_type == "text_delta":
await websocket.send_json({
"type": "text",
"content": event.data.get("content", "")
})
elif event.event_type == "agent_finish":
await websocket.send_json({
"type": "complete",
"usage": event.data.get("usage")
})
Error handling in streams
Streams automatically handle errors and emit error events:
stream = await agent.stream(client, "Do something risky")
try:
async for event in stream.events:
if event.event_type == "error":
error_msg = event.data.get("error")
print(f"Error: {error_msg}")
break
elif event.event_type == "text_delta":
print(event.data.get("content", ""), end="")
except Exception as e:
print(f"Stream failed: {e}")
Key takeaways
stream.text_chunks / stream.textChunks - Simple text-only streaming
stream.events - Full access to lifecycle events, tool calls, and metadata
stream.text() - Get complete accumulated text
stream.result() - Get full result with usage and tool information
- Lifecycle events - Emitted for both
agent.run() and agent.stream()
- Rich UIs - Use events to show tool execution, progress, and real-time updates