> ## Documentation Index
> Fetch the complete documentation index at: https://polos.dev/docs/llms.txt
> Use this file to discover all available pages before exploring further.

# Streaming

Streaming provides real-time feedback as agents execute, allowing you to display progress to users and handle responses incrementally.

## Basic streaming

Use `agent.stream()` to stream agent responses:

<CodeGroup>
  ```python Python theme={null}
  import asyncio
  from polos import PolosClient

  async def main():
      client = PolosClient()
      stream = await weather_agent.stream(client, "What's the weather in Tokyo?")

      # Stream text as it's generated
      async for chunk in stream.text_chunks:
          print(chunk, end="", flush=True)

      print("\n")

  if __name__ == "__main__":
      asyncio.run(main())
  ```

  ```typescript TypeScript theme={null}
  import { PolosClient } from "@polos/sdk";

  async function main() {
    const client = PolosClient.fromEnv();
    const stream = await weatherAgent.stream(client, "What's the weather in Tokyo?");

    // Stream text as it's generated
    for await (const chunk of stream.textChunks) {
      process.stdout.write(chunk);
    }

    console.log("\n");
  }

  main();
  ```
</CodeGroup>

## Stream iterators

Polos provides multiple ways to consume streamed responses:

### Text chunks

Stream only the text content:

<CodeGroup>
  ```python Python theme={null}
  stream = await agent.stream(client, "Explain quantum computing")

  async for chunk in stream.text_chunks:
      print(chunk, end="", flush=True)
  ```

  ```typescript TypeScript theme={null}
  const stream = await agent.stream(client, "Explain quantum computing");

  for await (const chunk of stream.textChunks) {
    process.stdout.write(chunk);
  }
  ```
</CodeGroup>

**Use case:** Display text to users in real-time, like ChatGPT's streaming interface.

### All events

Access full event stream including tool calls, steps, and metadata:

<CodeGroup>
  ```python Python theme={null}
  stream = await agent.stream(client, "Search for Python tutorials and summarize them")

  async for event in stream.events:
      if event.event_type == "text_delta":
          print(event.data.get("content", ""), end="", flush=True)
      elif event.event_type == "tool_call":
          tool_name = event.data.get("tool_name")
          print(f"\n[Calling tool: {tool_name}]\n")
  ```

  ```typescript TypeScript theme={null}
  const stream = await agent.stream(client, "Search for Python tutorials and summarize them");

  for await (const event of stream.events) {
    if (event.eventType === "text_delta") {
      process.stdout.write(event.data?.content ?? "");
    } else if (event.eventType === "tool_call") {
      const toolName = event.data?.toolName;
      console.log(`\n[Calling tool: ${toolName}]\n`);
    }
  }
  ```
</CodeGroup>

**Use case:** Build rich UIs that show tool execution progress, reasoning steps, etc.

### Final text

Get the complete accumulated text after streaming finishes:

<CodeGroup>
  ```python Python theme={null}
  stream = await agent.stream(client, "Write a haiku about programming")

  final_text = await stream.text()
  print(final_text)
  ```

  ```typescript TypeScript theme={null}
  const stream = await agent.stream(client, "Write a haiku about programming");

  const finalText = await stream.text();
  console.log(finalText);
  ```
</CodeGroup>

**Use case:** When you need the complete response but want streaming for progress indication.

### Complete result

Get the full result object with usage stats, tool calls, and metadata:

<CodeGroup>
  ```python Python theme={null}
  stream = await agent.stream(client, "What's 2+2?")

  result = await stream.result()

  print(f"Result: {result.result}")
  print(f"Total steps: {result.total_steps}")
  print(f"Tokens used: {result.usage.total_tokens}")

  # Access tool calls
  for tool_call in result.tool_results:
      print(f"Tool: {tool_call.tool_name}")
      print(f"Input: {tool_call.result}")
  ```

  ```typescript TypeScript theme={null}
  const stream = await agent.stream(client, "What's 2+2?");

  const result = await stream.result();

  console.log(`Result: ${result.result}`);
  console.log(`Total steps: ${result.totalSteps}`);
  console.log(`Tokens used: ${result.usage.totalTokens}`);

  // Access tool calls
  for (const toolCall of result.toolResults) {
    console.log(`Tool: ${toolCall.toolName}`);
    console.log(`Input: ${toolCall.result}`);
  }
  ```
</CodeGroup>

**Use case:** Analytics, logging, cost tracking, debugging.

## Lifecycle events

By default, agents emit lifecycle events during execution - regardless of whether you use `agent.run()` or `agent.stream()`. These events help you track progress and display status to users.

### Event types

**`agent_start`**

* Marks the beginning of agent execution
* Contains agent ID and initial configuration

**`agent_finish`**

* Marks the end of execution
* Includes usage statistics (tokens, cost)
* Contains final result

**`step_start`**

* Indicates a workflow step has begun - for example, LLM or tool call

**`step_finish`**

* Confirms step completion
* Includes step output and duration

**`text_delta`** (only with `agent.stream()`)

* Incremental text chunks as the LLM generates them
* Real-time content streaming

**`tool_call`** (only with `agent.stream()`)

* Emitted when the agent asks for a tool execution
* Includes tool name and arguments
* The results of the tool execution are available via step\_finish event when the tool finishes execution

### Using lifecycle events

<CodeGroup>
  ```python Python theme={null}
  stream = await agent.stream(client, "Research AI agents and create a summary")

  async for event in stream.events:
      if event.event_type == "agent_start":
          print("🚀 Agent started")

      elif event.event_type == "step_start":
          step_name = event.data.get("step_key")
          print(f"⏳ Running step: {step_name}")

      elif event.event_type == "tool_call":
          tool_name = event.data.get("tool_call", {}).get("function", {}).get("name")
          print(f"🔧 Calling tool: {tool_name}")

      elif event.event_type == "text_delta":
          content = event.data.get("content", "")
          print(content, end="", flush=True)

      elif event.event_type == "step_finish":
          step_name = event.data.get("step_key")
          print(f"\n✓ Completed step: {step_name}")

      elif event.event_type == "agent_finish":
          usage = event.data.get("usage", {})
          print(f"\n✅ Agent finished - Tokens: {usage.get('total_tokens')}")
  ```

  ```typescript TypeScript theme={null}
  const stream = await agent.stream(client, "Research AI agents and create a summary");

  for await (const event of stream.events) {
    if (event.eventType === "agent_start") {
      console.log("🚀 Agent started");
    } else if (event.eventType === "step_start") {
      const stepName = event.data?.stepKey;
      console.log(`⏳ Running step: ${stepName}`);
    } else if (event.eventType === "tool_call") {
      const toolName = event.data?.toolCall?.function?.name;
      console.log(`🔧 Calling tool: ${toolName}`);
    } else if (event.eventType === "text_delta") {
      const content = event.data?.content ?? "";
      process.stdout.write(content);
    } else if (event.eventType === "step_finish") {
      const stepName = event.data?.stepKey;
      console.log(`\n✓ Completed step: ${stepName}`);
    } else if (event.eventType === "agent_finish") {
      const usage = event.data?.usage ?? {};
      console.log(`\n✅ Agent finished - Tokens: ${usage.totalTokens}`);
    }
  }
  ```
</CodeGroup>

**Example output:**

```
🚀 Agent started
🔧 Calling tool: search_web
⏳ Running step: search_web
✓ Completed step: search_web
⏳ Running step: generate_summary
Based on recent research, AI agents are...
✓ Completed step: generate_summary
✅ Agent finished - Tokens: 1247
```

## Stream with agent.run()

Even when using `agent.run()` (non-streaming), lifecycle events are still emitted. You can listen to them separately:

<CodeGroup>
  ```python Python theme={null}
  from polos import PolosClient, events
  import json

  client = PolosClient()

  # Start the agent - non-streaming
  handle = await weather_agent.invoke(
      client,
      payload={
          "input": "What is the weather in New York and Mumbai? Compare them.",
          "streaming": False,  # Set to False for non-streaming execution
      }
  )

  # Listen to events using stream_workflow
  print("Streaming events:\n")
  async for event in events.stream_workflow(client, handle.root_workflow_id, handle.id):
      if event.event_type == "step_start":
          print(f"\n\nStep started: {event.data.get('step_key')}")
      elif event.event_type == "step_finish":
          # Print the result of the step
          print(f"\n\nStep finished: {json.dumps(event.data.get('data', {}).get('result', {}), indent=2)}")
      elif event.event_type == "agent_finish":
          print("\n\nAgent finished")
  ```

  ```typescript TypeScript theme={null}
  import { PolosClient, events } from "@polos/sdk";

  const client = PolosClient.fromEnv();

  // Start the agent - non-streaming
  const handle = await weatherAgent.invoke(client, {
    input: "What is the weather in New York and Mumbai? Compare them.",
    streaming: false, // Set to false for non-streaming execution
  });

  // Listen to events using streamWorkflow
  console.log("Streaming events:\n");
  for await (const event of events.streamWorkflow(client, handle.rootWorkflowId, handle.id)) {
    if (event.eventType === "step_start") {
      console.log(`\n\nStep started: ${event.data?.stepKey}`);
    } else if (event.eventType === "step_finish") {
      // Print the result of the step
      console.log(`\n\nStep finished: ${JSON.stringify(event.data?.data?.result ?? {}, null, 2)}`);
    } else if (event.eventType === "agent_finish") {
      console.log("\n\nAgent finished");
    }
  }
  ```
</CodeGroup>

## Building UIs with streaming

### Basic text streaming UI

<CodeGroup>
  ```python Python theme={null}
  async def stream_to_ui(agent, user_message):
      stream = await agent.stream(client, user_message)

      async for chunk in stream.text_chunks:
          # Send chunk to frontend
          await websocket.send_json({
              "type": "text_chunk",
              "content": chunk
          })

      # Send completion signal
      await websocket.send_json({"type": "done"})
  ```

  ```typescript TypeScript theme={null}
  async function streamToUi(agent: Agent, userMessage: string) {
    const stream = await agent.stream(client, userMessage);

    for await (const chunk of stream.textChunks) {
      // Send chunk to frontend
      await websocket.send(JSON.stringify({
        type: "text_chunk",
        content: chunk,
      }));
    }

    // Send completion signal
    await websocket.send(JSON.stringify({ type: "done" }));
  }
  ```
</CodeGroup>

### Rich progress UI

<CodeGroup>
  ```python Python theme={null}
  async def stream_with_progress(agent, user_message):
      stream = await agent.stream(client, user_message)

      async for event in stream.events:
          if event.event_type == "tool_call":
              await websocket.send_json({
                  "type": "tool_call",
                  "tool": event.data.get("tool_call", {}).get("function", {}).get("tool_name"),
                  "status": "executing"
              })

          elif event.event_type == "text_delta":
              await websocket.send_json({
                  "type": "text",
                  "content": event.data.get("content", "")
              })

          elif event.event_type == "agent_finish":
              await websocket.send_json({
                  "type": "complete",
                  "usage": event.data.get("usage")
              })
  ```

  ```typescript TypeScript theme={null}
  async function streamWithProgress(agent: Agent, userMessage: string) {
    const stream = await agent.stream(client, userMessage);

    for await (const event of stream.events) {
      if (event.eventType === "tool_call") {
        await websocket.send(JSON.stringify({
          type: "tool_call",
          tool: event.data?.toolCall?.function?.toolName,
          status: "executing",
        }));
      } else if (event.eventType === "text_delta") {
        await websocket.send(JSON.stringify({
          type: "text",
          content: event.data?.content ?? "",
        }));
      } else if (event.eventType === "agent_finish") {
        await websocket.send(JSON.stringify({
          type: "complete",
          usage: event.data?.usage,
        }));
      }
    }
  }
  ```
</CodeGroup>

## Error handling in streams

Streams automatically handle errors and emit error events:

<CodeGroup>
  ```python Python theme={null}
  stream = await agent.stream(client, "Do something risky")

  try:
      async for event in stream.events:
          if event.event_type == "error":
              error_msg = event.data.get("error")
              print(f"Error: {error_msg}")
              break

          elif event.event_type == "text_delta":
              print(event.data.get("content", ""), end="")
  except Exception as e:
      print(f"Stream failed: {e}")
  ```

  ```typescript TypeScript theme={null}
  const stream = await agent.stream(client, "Do something risky");

  try {
    for await (const event of stream.events) {
      if (event.eventType === "error") {
        const errorMsg = event.data?.error;
        console.error(`Error: ${errorMsg}`);
        break;
      } else if (event.eventType === "text_delta") {
        process.stdout.write(event.data?.content ?? "");
      }
    }
  } catch (e) {
    console.error(`Stream failed: ${e}`);
  }
  ```
</CodeGroup>

## Key takeaways

* `stream.text_chunks` / `stream.textChunks` - Simple text-only streaming
* `stream.events` - Full access to lifecycle events, tool calls, and metadata
* `stream.text()` - Get complete accumulated text
* `stream.result()` - Get full result with usage and tool information
* **Lifecycle events** - Emitted for both `agent.run()` and `agent.stream()`
* **Rich UIs** - Use events to show tool execution, progress, and real-time updates
