Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/develop/python/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ From there, you can dive deeper into any of the Temporal primitives to start bui
- [Message passing](/develop/python/workflows/message-passing)
- [Schedules](/develop/python/workflows/schedules)
- [Timers](/develop/python/workflows/timers)
- [Versioning](/develop/python/workflows/versioning)
- [Streaming](/develop/python/workflows/streaming)

## [Activities](/develop/python/activities)

Expand Down
1 change: 1 addition & 0 deletions docs/develop/python/workflows/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ import * as Components from '@site/src/components';
- [Schedules](/develop/python/workflows/schedules)
- [Timers](/develop/python/workflows/timers)
- [Versioning](/develop/python/workflows/versioning)
- [Streaming](/develop/python/workflows/streaming)
179 changes: 179 additions & 0 deletions docs/develop/python/workflows/streaming.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
---
id: streaming
title: Streaming from Workflows - Python SDK
sidebar_label: Streaming
description: Streaming shows an application's progress in real time—like incremental results and status updates—as it runs, instead of waiting until the final result is complete.
toc_max_heading_level: 4
keywords:
- streaming
- AI agent
- Python SDK
- pub/sub
- PubSubMixin
- PubSubClient
- server-sent events
- SSE
- LLM
tags:
- Streaming
- Workflows
- AI agent
- Python SDK
- Temporal SDKs
---

:::tip Support, stability, and dependency info

Streaming is currently in [Public Preview](/evaluate/development-production-features/release-stages#public-preview).

:::

Temporal's message-passing primitives — [Signals](/sending-messages#sending-signals), [Updates](/sending-messages#sending-updates), and [Queries](/sending-messages#sending-queries) — provide the building blocks for real-time streaming from durable Workflows without additional infrastructure.
The `temporalio.contrib.pubsub` module provides a reusable pub/sub implementation on top of these primitives.

See [Streaming from Workflows](/workflow-streaming) for a conceptual overview of the streaming problem and the architecture patterns this page implements.

## Prerequisites {#prerequisites}

- Python 3.12 or later
- `temporalio` SDK installed
- An LLM provider with a streaming API (the examples use the OpenAI Responses API)

## Use PubSubMixin in your Workflow {#use-pubsubmixin}

:::info
The code that follows is part of a working streaming agents [sample](https://github.com/jssmith/temporal-streaming-agents-samples).
:::

Add `PubSubMixin` as a base class to your Workflow. The mixin registers the Signal handler that receives batched events from Activities, the Update handler that external clients use to long-poll for new events, and the Query handler that returns the current stream offset.

Call `self.init_pubsub()` in your `__init__` method to initialize the mixin's internal state.
Pass `prior_state` if your Workflow accepts previously accumulated pub/sub state like when continuing from a prior session.

```python
from temporalio import workflow
from temporalio.contrib.pubsub import PubSubMixin

@workflow.defn
class AnalyticsWorkflow(PubSubMixin):
@workflow.init
def __init__(self, state: WorkflowState) -> None:
self.init_pubsub(prior_state=state.pubsub_state)
```

The Workflow can publish events directly for lifecycle events that originate inside the Workflow rather than inside an Activity, such as tool call start and complete:

```python
import json

EVENTS_TOPIC = "events"

def _emit(self, event_type: str, **data) -> None:
event = {
"type": event_type,
"timestamp": workflow.now().isoformat(),
"data": data,
}
self.publish(EVENTS_TOPIC, json.dumps(event).encode())
```

`self.publish()` is provided by `PubSubMixin` and appends the event to the durable in-Workflow buffer, incrementing the global offset. Any blocked poll Update handlers are woken up immediately.

## Publish events from an Activity {#publish-from-activity}

Inside an Activity, create a `PubSubClient` and use it to batch and publish streaming events to the Workflow via Signal.

```python
from temporalio import activity
from temporalio.contrib.pubsub import PubSubClient

@activity.defn
async def model_call(input: ModelCallInput) -> ModelCallResult:
pubsub = PubSubClient.create(batch_interval=2.0)

async with pubsub:
async with openai_client.responses.stream(**kwargs) as stream:
async for event in stream:
activity.heartbeat()
pubsub.publish(EVENTS_TOPIC, translate(event))
# Priority flush for significant events such as end of reasoning block
if is_thinking_complete(event):
pubsub.publish(EVENTS_TOPIC, payload, priority=True)

return ModelCallResult(...)
```

The `async with pubsub` context manager starts a background flush timer and guarantees a final flush on exit. No manual `asyncio.wait()` or cancellation logic is needed.

**How batching works:**

- Events are buffered and flushed to the Workflow via Signal at the `batch_interval` (default: 2 seconds). This is a Nagle-like strategy: accumulate small events, send in batches.
- Calling `pubsub.publish(..., priority=True)` triggers an immediate flush for the current batch. Use this for events that are significant on their own, like at the end of a reasoning block or an error.
- The `async with pubsub` block guarantees a final flush on exit so no events are dropped when the Activity completes.

The pattern generalizes to any LLM provider with a streaming API. The `translate()` function in the example converts provider-specific stream events into your application's event schema.

## Subscribe from an external client {#subscribe-from-client}

The BFF (or any external caller) subscribes to the Workflow's event stream using `PubSubClient.subscribe()`. This is an async iterator that long-polls the Workflow via Updates internally.

```python
from temporalio.contrib.pubsub import PubSubClient

pubsub = PubSubClient.create(client, session_id)
start_offset = await pubsub.get_offset()

async def event_stream():
async for item in pubsub.subscribe(
topics=[EVENTS_TOPIC], from_offset=start_offset
):
event = json.loads(item.data)
yield f"data: {json.dumps(event)}\n\n"
if event.get("type") == "AGENT_COMPLETE":
return

return StreamingResponse(event_stream(), media_type="text/event-stream")
```

**How the long-poll works:**

- Each call to `pubsub.subscribe()` sends an Update to the Workflow with the client's current offset.
- The Update handler inside `PubSubMixin` calls `workflow.wait_condition()` to block until new events are available at or beyond that offset.
- When events arrive via a Signal from an Activity or via `self.publish()` in the Workflow, the `wait_condition` returns and the Update handler delivers the new batch.
- The subscribe iterator re-polls automatically, tracking the offset across iterations.

Because events are stored durably inside the Workflow, the client can reconnect at any time, even after a BFF restart and resume from its last known offset without losing events.

## Per-turn event indexing {#per-turn-event-indexing}

Events use a global offset that increments across all turns within a session.
Capturing the offset before signaling `start_turn` ensures the SSE stream delivers only events from the current turn.

```python
pubsub = PubSubClient.create(client, session_id)
start_offset = await pubsub.get_offset()

# Signal the Workflow to start a new turn
await handle.signal(AnalyticsWorkflow.start_turn, StartTurnInput(message=text))

# Subscribe from start_offset onward — only events from this turn
async for item in pubsub.subscribe(topics=[EVENTS_TOPIC], from_offset=start_offset):
...
```

On reconnect, pass the client's last known offset to `subscribe()`.
The Workflow replays events from that point forward, so no events are lost even if the BFF restarts mid-stream.

## Sample application {#sample-application}

The [streaming agents sample](https://github.com/jssmith/temporal-streaming-agents-samples) is a complete chat-based analytics agent that demonstrates this pattern end-to-end.
The agent queries a Chinook music store database (SQLite) using SQL, Python, and shell tools.
It reasons about results, recovers from errors, and streams its progress to a React frontend via SSE.

The sample includes:

- `backend-temporal/` — FastAPI proxy and Temporal Worker using `PubSubMixin`
- `backend-ephemeral/` — A non-Temporal backend for direct comparison
- `frontend/` — React frontend that renders streamed events in real time

To test it out, clone the repo and run the app.
184 changes: 184 additions & 0 deletions docs/encyclopedia/workflow/streaming.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
---
id: workflow-streaming
title: Streaming from Workflows
sidebar_label: Streaming
description: Streaming shows an application's progress in real time—like incremental results and status updates—as it runs, instead of waiting until the final result is complete.
slug: /workflow-streaming
toc_max_heading_level: 4
keywords:
- streaming
- pub/sub
- server-sent events
- SSE
- signals
- updates
- AI agent
- LLM
tags:
- Streaming
- Workflows
- AI agent
---

This page discusses streaming from Temporal Workflows, covering [the streaming problem](#the-streaming-problem), [architecture with and without Temporal](#architecture), the [pub/sub transport pattern](#pubsub-pattern), [concurrency model](#concurrency-model), [durability considerations](#durability-and-resumability), and an [AI agent use case](#ai-agents).

## What is streaming from a Workflow? {#what-is-streaming}

Streaming means delivering progress to an external observer as it happens, rather than only when an operation completes. A Workflow that streams exposes its internal progress like, partial results, status updates, and incremental output, to clients in real time.

Some common examples are:

- **Incremental output**: Results rendered progressively as they are produced, rather than all at once at the end.
- **Status updates**: Notifications about what the Workflow is currently doing, which step is running, how long it has been running, or whether it succeeded or failed.
- **Progress events**: Checkpoints, milestones, or sub-results published as the Workflow moves through its execution.

Streaming keeps users engaged during long-running operations, makes system behavior more transparent, and enables callers to act on partial results like cancelling work that's heading in the wrong direction.

## The streaming problem {#the-streaming-problem}

Without durable infrastructure, a backend-for-frontend (BFF) service runs the business logic, buffers streaming events in memory, and pushes them to a client via Server-Sent Events (SSE) or WebSockets.
If that server restarts while work is in progress, all in-flight state is lost and the client receives no further updates.

The streaming problem has two core dimensions:

1. **Where does session state live?** An in-memory BFF loses progress and history on restart.
For long-running or expensive operations, this means lost work and a degraded user experience.
2. **How resumable is the underlying operation?** Some operations can be resumed mid-stream after a failure, while others have to restart from the beginning. The appropriate architecture depends on the cost of restarting work.

A key question is the level of durability appropriate for a given application. Making state durable introduces latency and consumes system resources. If failures are rare or the impact is low, durable streaming may not be justified. But when work is expensive, long-running, or stateful, losing progress to a transient failure is costly.

## Architecture {#architecture}

### Without Temporal {#architecture-without-temporal}

```
Client ──(SSE)──▶ BFF ──(stream)──▶ Service
```

The BFF runs the business logic, buffers events in memory, and streams them to the client via SSE.
If the server restarts, all in-flight work and session state is lost.

### With Temporal {#architecture-with-temporal}

```
Client ──(SSE)──▶ BFF ──(subscribe: long-poll Update)──▶ Workflow
execute_activity
Activity
publish (batched Signal)──▶ Workflow
external service
```

The BFF becomes a stateless proxy. Session state, history, and the event stream all live in the Workflow. The BFF can be restarted at any time without losing work.

## Pub/Sub pattern {#pubsub-pattern}

The streaming transport follows a pub/sub pattern in two directions:

1. **Activity → Workflow (publish)**: The Activity publishes events via batched Signals as it receives them from an external service.
2. **Workflow → external client (subscribe)**: The external client subscribes via long-poll Updates.

[Signals](/sending-messages#sending-signals), [Updates](/sending-messages#sending-updates), and [Queries](/sending-messages#sending-queries) handle both directions of the stream without additional infrastructure like Redis or Kafka.

### Activity to Workflow (publish) {#publish}

As an Activity receives incremental output from an external service, it translates those outputs into application events and publishes them through a pub/sub client. The client batches events and flushes them to the Workflow via Signal at a configurable interval.

This is a Nagle-like batching strategy: buffer events, flush on a timer. The client can also flush immediately for high-priority events.

The Workflow receives published events through a Signal handler that appends them to a durable event buffer. The Workflow itself can also publish events directly for lifecycle events that originate inside the Workflow rather than inside an Activity.

### Workflow to external client (subscribe) {#subscribe}

The external client subscribes to the Workflow's event stream using long-poll Updates.
Each poll includes the client's current offset into the event buffer.
The Update handler inside the Workflow uses a wait condition to block until new events are available at or beyond that offset, then returns them.

The subscribe iterator handles the poll loop, offset tracking, and reconnection internally. From the caller's perspective, subscribing is a normal async iteration.

Because events are stored durably in the Workflow, the external client can reconnect at any time, even after a BFF restart and resume from its last known offset. No events are lost.

## Concurrency model {#concurrency-model}

A Workflow's main execution loop and its message handlers run concurrently on a single thread. A `wait_condition` yields so that the main loop and poll handlers can interleave at each `await` point.

A typical streaming sequence looks like this:

1. The client sends a Signal to trigger work and immediately opens a subscribe Update at the current event buffer offset.
2. The Workflow starts an Activity and yields.
3. The Activity publishes batches of events as Signals while it processes output from an external service.
4. Each incoming Signal appends to the event buffer, waking up any blocked poll Update handlers.
5. The poll handlers return the new batch and the client re-polls with the updated offset.
6. When the Activity completes, the Workflow may publish lifecycle events and start the next Activity or publish a final completion event.

This interleaving is deterministic and replay-safe because all coordination happens through Temporal's event-sourced execution model.

## Durability and resumability {#durability-and-resumability}

The appropriate level of durability depends on the cost of lost work:

- **Low-stakes, short-running operations**: An in-memory BFF may be sufficient. Restarting work on failure is cheap.
- **Expensive, long-running operations**: Losing in-flight progress to a restart is costly. A Temporal-backed Workflow preserves state, history, and the event stream across restarts.

These streaming patterns work regardless of whether the underlying service supports mid-stream resumption. Even if an individual Activity must restart from the beginning on failure, the Workflow's retry policy handles that automatically and previously published events remain in the durable buffer.

## Use case: AI agents {#ai-agents}

AI agents are a compelling use case for Workflow streaming because agent loops are long-running, stateful, and expensive to restart.

### What AI agents stream {#ai-agent-events}

AI agent streams commonly include:

- **LLM tokens**: Model responses rendered incrementally as they are generated.
- **Reasoning outputs**: Internal chain-of-thought exposed separately from the final response.
- **Application messages**: Tool calls, web search results, agent handoffs, and other progress indicators from the application or from behind the model API.

Streaming keeps users engaged, builds trust through transparency, and enables agent steering like, cancelling unproductive work or interrupting to provide additional context.
This is especially valuable for agents that do significant work between user interactions.

### LLM provider resumability {#llm-resumability}

Whether a specific LLM API call can resume mid-stream after a failure varies by provider:

- **OpenAI**: Supports a fully resumable background mode.
- **Google Gemini**: Provides access to end results of an interrupted stream once the call completes.
- **Anthropic**: Accepts a response prefix that can resume a streaming response.
- **Other providers**: May have no streaming recovery at all.

The Temporal streaming pattern described on this page works regardless of provider resumability.
If an Activity must restart from the beginning, Temporal's retry policy handles the retry and previously published events remain in the Workflow's durable buffer.

### Multi-turn sessions and event indexing {#multi-turn-sessions}

In a conversational agent, the Workflow persists across multiple turns. Each turn produces a new stream of events, but all events share a single global offset that increments across the lifetime of the session.

Before triggering a new turn, the client queries the Workflow's current event buffer offset.
The client then subscribes starting from that offset, receiving only events from the current turn instead of replayed events from prior turns.

On reconnect, the client resumes from its last known offset. The Workflow holds all events durably and serves them on demand, even after a BFF restart.

### Architecture {#ai-agent-architecture}

In an AI agent application, the generic architecture maps as follows:

```
Browser ──(SSE)──▶ BFF ──(subscribe: long-poll Update)──▶ Workflow
execute_activity
LLM Activity
publish (batched Signal)──▶ Workflow
LLM API
```

The BFF is a stateless proxy. The Workflow holds conversation history, the current agent state, and the full event stream for the session. The LLM Activity streams model output from the provider API and batches events back to the Workflow via Signals.

## SDK guides {#sdk-guides}

- [How to stream from Workflows using the Python SDK](/develop/python/workflows/streaming)
Loading
Loading