LangGraph streaming with workflow streams#1500
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds an experimental integration path between LangGraph’s get_stream_writer() streaming mechanism and Temporal “Workflow Streams”, so activity-wrapped LangGraph nodes can publish incremental chunks that external subscribers can tail via WorkflowStreamClient.
Changes:
- Add
streaming_topicandstreaming_batch_intervaloptions toLangGraphPluginand thread them into the activity wrapper. - Extend LangGraph runtime config restoration to accept an optional
stream_writer, enabling activity invocations to install a per-call writer. - Add tests covering (1) activity-side
get_stream_writer()streaming to workflow streams and (2) workflow-side forwarding ofastream()chunks to an external topic.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
tests/contrib/langgraph/test_streaming.py |
Adds workflow-stream-backed streaming tests (activity-side writer + workflow-side republish of astream() chunks). |
temporalio/contrib/langgraph/_plugin.py |
Introduces plugin-level streaming configuration and passes it into the activity wrapper. |
temporalio/contrib/langgraph/_langgraph_config.py |
Adds stream_writer plumbing to set_langgraph_config so activities can inject a writer into the reconstructed Runtime. |
temporalio/contrib/langgraph/_activity.py |
Uses WorkflowStreamClient.from_within_activity() to back the injected writer with workflow-stream topic publishes when streaming is enabled. |
temporalio/contrib/langgraph/__init__.py |
Minor export ordering change (no functional changes). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if iscoroutinefunction(func): | ||
| result = await func(*input.args, **kwargs) | ||
| else: | ||
| result = func(*input.args, **kwargs) | ||
| if isinstance(result, Command): | ||
| return ActivityOutput(langgraph_command=result) | ||
| return ActivityOutput(result=result) | ||
| except GraphInterrupt as e: | ||
| return ActivityOutput(langgraph_interrupts=e.args[0]) | ||
|
|
||
| if streaming_topic is None: | ||
| return await run(stream_writer=None) | ||
| async with WorkflowStreamClient.from_within_activity( | ||
| batch_interval=streaming_batch_interval, | ||
| ) as client: | ||
| topic = client.topic(streaming_topic) | ||
| return await run(stream_writer=topic.publish) |
There was a problem hiding this comment.
How did you resolve this? Are we just living with this? What if for example wrap_activity routed sync funcs through asyncio.to_thread whenever streaming_topic is set? Would this enable streaming to work for sync nodes/tasks?
| def wrap_activity( | ||
| func: Callable, | ||
| *, | ||
| streaming_topic: str | None = None, |
There was a problem hiding this comment.
So to make streaming work you need (a) streaming_topic set on the plugin, (b) a WorkflowStream() constructed in the workflow's __init__, (c) and an external WorkflowStreamClient to subscribe. This could be a lot of new users to keep track of. For instance, there is no compile-time or even first-invocation check that (a) is paired with (b). If the user forgets (b), I think what would happen right now (correct me if wrong) is the activity-side signal goes to an unhandled signal name (Temporal will buffer it indefinitely), the activity succeeds, and the subscriber's poll update fails - all silently, with no log message pointing at the missing WorkflowStream. Copilot flagged a piece of this. I'd push for one of:
- Have the plugin's interceptor verify, on workflow start, that a
WorkflowStreamis registered whenstreaming_topicis set, and raise a clear error if not; or - Publish a one-liner from
wrap_activity(e.g. an activity.logger.warning on the first publish if the signal name has no handler), though that's harder to detect from the activity side, so maybe lean towards the first option which doesn't sound too hard
| # TODO: Remove activity_options when we have support for @task(metadata=...) | ||
| activity_options: dict[str, dict[str, Any]] | None = None, | ||
| default_activity_options: dict[str, Any] | None = None, | ||
| streaming_topic: str | None = None, |
There was a problem hiding this comment.
So streaming_topic is per plugin i.e. all nodes/tasks under one LangGraphPlugin share one topic. Do we want something like streaming_topic: str | Callable[[NodeInfo], str] | None so a single plugin can fan out to multiple topics? Could that ever be a use case? Just thinking about this, not necessarily something we need to change
| return {"value": state["value"] + "a"} | ||
| async def token_node(state: State) -> dict[str, str]: | ||
| tokens = ["a", "b", "c"] | ||
| writer = get_stream_writer() |
There was a problem hiding this comment.
get_stream_writer() is the LangGraph "custom" stream-mode hook. There are other stream modes such as messages / values / updates, and these flow through astream()'s iterator, not the writer, and this PR doesn't wire those. A user might misinterpret and think that the plugin automatically captures all LangGraph streaming. Your second test (test_workflow_publishes_astream_chunks) demonstrates the workflow-side bridge as the workaround, which is good, but we may want to call out that pattern more explicitly in the readme/docs ("for messages/values/updates, iterate astream() in your workflow and republish via WorkflowStream.topic(...).publish"), so users know what's covered and what isn't.
Alternatively, was there any thinking around why we just want the custom stream-mode hook here? Maybe just a nice comment here would help?
| if iscoroutinefunction(func): | ||
| result = await func(*input.args, **kwargs) | ||
| else: | ||
| result = func(*input.args, **kwargs) | ||
| if isinstance(result, Command): | ||
| return ActivityOutput(langgraph_command=result) | ||
| return ActivityOutput(result=result) | ||
| except GraphInterrupt as e: | ||
| return ActivityOutput(langgraph_interrupts=e.args[0]) | ||
|
|
||
| if streaming_topic is None: | ||
| return await run(stream_writer=None) | ||
| async with WorkflowStreamClient.from_within_activity( | ||
| batch_interval=streaming_batch_interval, | ||
| ) as client: | ||
| topic = client.topic(streaming_topic) | ||
| return await run(stream_writer=topic.publish) |
There was a problem hiding this comment.
How did you resolve this? Are we just living with this? What if for example wrap_activity routed sync funcs through asyncio.to_thread whenever streaming_topic is set? Would this enable streaming to work for sync nodes/tasks?
|
|
||
| if streaming_topic is None: | ||
| return await run(stream_writer=None) | ||
| async with WorkflowStreamClient.from_within_activity( |
There was a problem hiding this comment.
Small question here around deduplication. iiuc each call to from_within_activity() mints a fresh _publisher_id, and dedup is (publisher_id, sequence)-keyed, so attempt 1's tokens stay in the
workflow log alongside attempt 2's. So if we make multiple attempts (like if the activity fails and replays?) then each attempt would yield a different sample. This could be desirable or it could be confusing to have extra streamed output. Wdyt? Is this behavior worth stating in the docs at least, and perhaps we consider whether to have e.g. an option to use a deterministic publisher_id derived from (workflow_id, activity_id) so retries dedup?
|
|
||
| ## Streaming | ||
|
|
||
| When `streaming_topic` is set on `LangGraphPlugin`, calls to `stream_writer` leverage Temporal [Workflow Streams](https://github.com/temporalio/sdk-python/tree/main/temporalio/contrib/workflow_streams). Async nodes are recommended for this feature. |
There was a problem hiding this comment.
This is a bit thin for someone trying to get started actually using this.I'd expand to a self-contained snippet showing:
- the plugin construction with
streaming_topic, WorkflowStream()in the workflow's__init__,- an external subscriber loop with
WorkflowStreamClient.create(...).topic(...).subscribe(...), - an explicit "this is stream_mode='custom'; for other modes, bridge via astream() in the workflow" note,
- retry semantics ("each attempt republishes from scratch").
Or just link to a hello world example if that's easier (I imagine we'll want one or two new additions to the samples repo?)
| """LangGraph plugin for Temporal SDK. | ||
|
|
||
| .. warning:: | ||
| This package is experimental and may change in future versions. | ||
| Use with caution in production environments. | ||
|
|
||
| This plugin runs `LangGraph <https://github.com/langchain-ai/langgraph>`_ nodes | ||
| and tasks as Temporal Activities, giving your AI agent workflows durable | ||
| execution, automatic retries, and timeouts. It supports both the LangGraph Graph | ||
| API (``StateGraph``) and Functional API (``@entrypoint`` / ``@task``). | ||
| """ |
There was a problem hiding this comment.
Nit: can we go ahead and flesh out this docstring with an Args: section that documents (at least) streaming_topic and streaming_batch_interval (what they do, how they relate, when to lower/raise the interval)? And I guess the other args too while we're at it?
Test plan