Skip to content

LangGraph streaming with workflow streams#1500

Open
brianstrauch wants to merge 10 commits intomainfrom
langgraph-streaming
Open

LangGraph streaming with workflow streams#1500
brianstrauch wants to merge 10 commits intomainfrom
langgraph-streaming

Conversation

@brianstrauch
Copy link
Copy Markdown
Contributor

@brianstrauch brianstrauch commented May 1, 2026

  • Wire LangGraph's get_stream_writer() to Temporal: inside activity-wrapped nodes, the writer is backed by a WorkflowStreamClient that signals chunks back to the owning workflow's WorkflowStream on a topic, letting external subscribers tail node output via WorkflowStreamClient.
  • Thread an optional stream_writer through set_langgraph_config so the activity wrapper can install one per invocation; default behavior (no writer) is unchanged.

Test plan

  • test_streaming_via_workflow_streams — node calls get_stream_writer() inside an activity; external WorkflowStreamClient receives {token: a/b/c} then {done: True}, and the workflow result reflects the accumulated state.
  • test_workflow_publishes_astream_chunks — workflow iterates app.astream(...) and republishes each chunk on its own topic; subscriber sees per-node progress.

@brianstrauch brianstrauch requested a review from a team as a code owner May 1, 2026 20:27
@brianstrauch brianstrauch requested a review from jssmith May 1, 2026 20:28
@brianstrauch brianstrauch requested a review from DABH May 1, 2026 21:33
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds an experimental integration path between LangGraph’s get_stream_writer() streaming mechanism and Temporal “Workflow Streams”, so activity-wrapped LangGraph nodes can publish incremental chunks that external subscribers can tail via WorkflowStreamClient.

Changes:

  • Add streaming_topic and streaming_batch_interval options to LangGraphPlugin and thread them into the activity wrapper.
  • Extend LangGraph runtime config restoration to accept an optional stream_writer, enabling activity invocations to install a per-call writer.
  • Add tests covering (1) activity-side get_stream_writer() streaming to workflow streams and (2) workflow-side forwarding of astream() chunks to an external topic.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
tests/contrib/langgraph/test_streaming.py Adds workflow-stream-backed streaming tests (activity-side writer + workflow-side republish of astream() chunks).
temporalio/contrib/langgraph/_plugin.py Introduces plugin-level streaming configuration and passes it into the activity wrapper.
temporalio/contrib/langgraph/_langgraph_config.py Adds stream_writer plumbing to set_langgraph_config so activities can inject a writer into the reconstructed Runtime.
temporalio/contrib/langgraph/_activity.py Uses WorkflowStreamClient.from_within_activity() to back the injected writer with workflow-stream topic publishes when streaming is enabled.
temporalio/contrib/langgraph/__init__.py Minor export ordering change (no functional changes).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread temporalio/contrib/langgraph/__init__.py
Comment thread temporalio/contrib/langgraph/_plugin.py
Comment on lines +75 to +91
if iscoroutinefunction(func):
result = await func(*input.args, **kwargs)
else:
result = func(*input.args, **kwargs)
if isinstance(result, Command):
return ActivityOutput(langgraph_command=result)
return ActivityOutput(result=result)
except GraphInterrupt as e:
return ActivityOutput(langgraph_interrupts=e.args[0])

if streaming_topic is None:
return await run(stream_writer=None)
async with WorkflowStreamClient.from_within_activity(
batch_interval=streaming_batch_interval,
) as client:
topic = client.topic(streaming_topic)
return await run(stream_writer=topic.publish)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did you resolve this? Are we just living with this? What if for example wrap_activity routed sync funcs through asyncio.to_thread whenever streaming_topic is set? Would this enable streaming to work for sync nodes/tasks?

def wrap_activity(
func: Callable,
*,
streaming_topic: str | None = None,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So to make streaming work you need (a) streaming_topic set on the plugin, (b) a WorkflowStream() constructed in the workflow's __init__, (c) and an external WorkflowStreamClient to subscribe. This could be a lot of new users to keep track of. For instance, there is no compile-time or even first-invocation check that (a) is paired with (b). If the user forgets (b), I think what would happen right now (correct me if wrong) is the activity-side signal goes to an unhandled signal name (Temporal will buffer it indefinitely), the activity succeeds, and the subscriber's poll update fails - all silently, with no log message pointing at the missing WorkflowStream. Copilot flagged a piece of this. I'd push for one of:

  • Have the plugin's interceptor verify, on workflow start, that a WorkflowStream is registered when streaming_topic is set, and raise a clear error if not; or
  • Publish a one-liner from wrap_activity (e.g. an activity.logger.warning on the first publish if the signal name has no handler), though that's harder to detect from the activity side, so maybe lean towards the first option which doesn't sound too hard

# TODO: Remove activity_options when we have support for @task(metadata=...)
activity_options: dict[str, dict[str, Any]] | None = None,
default_activity_options: dict[str, Any] | None = None,
streaming_topic: str | None = None,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So streaming_topic is per plugin i.e. all nodes/tasks under one LangGraphPlugin share one topic. Do we want something like streaming_topic: str | Callable[[NodeInfo], str] | None so a single plugin can fan out to multiple topics? Could that ever be a use case? Just thinking about this, not necessarily something we need to change

return {"value": state["value"] + "a"}
async def token_node(state: State) -> dict[str, str]:
tokens = ["a", "b", "c"]
writer = get_stream_writer()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_stream_writer() is the LangGraph "custom" stream-mode hook. There are other stream modes such as messages / values / updates, and these flow through astream()'s iterator, not the writer, and this PR doesn't wire those. A user might misinterpret and think that the plugin automatically captures all LangGraph streaming. Your second test (test_workflow_publishes_astream_chunks) demonstrates the workflow-side bridge as the workaround, which is good, but we may want to call out that pattern more explicitly in the readme/docs ("for messages/values/updates, iterate astream() in your workflow and republish via WorkflowStream.topic(...).publish"), so users know what's covered and what isn't.

Alternatively, was there any thinking around why we just want the custom stream-mode hook here? Maybe just a nice comment here would help?

Comment on lines +75 to +91
if iscoroutinefunction(func):
result = await func(*input.args, **kwargs)
else:
result = func(*input.args, **kwargs)
if isinstance(result, Command):
return ActivityOutput(langgraph_command=result)
return ActivityOutput(result=result)
except GraphInterrupt as e:
return ActivityOutput(langgraph_interrupts=e.args[0])

if streaming_topic is None:
return await run(stream_writer=None)
async with WorkflowStreamClient.from_within_activity(
batch_interval=streaming_batch_interval,
) as client:
topic = client.topic(streaming_topic)
return await run(stream_writer=topic.publish)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did you resolve this? Are we just living with this? What if for example wrap_activity routed sync funcs through asyncio.to_thread whenever streaming_topic is set? Would this enable streaming to work for sync nodes/tasks?


if streaming_topic is None:
return await run(stream_writer=None)
async with WorkflowStreamClient.from_within_activity(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small question here around deduplication. iiuc each call to from_within_activity() mints a fresh _publisher_id, and dedup is (publisher_id, sequence)-keyed, so attempt 1's tokens stay in the
workflow log alongside attempt 2's. So if we make multiple attempts (like if the activity fails and replays?) then each attempt would yield a different sample. This could be desirable or it could be confusing to have extra streamed output. Wdyt? Is this behavior worth stating in the docs at least, and perhaps we consider whether to have e.g. an option to use a deterministic publisher_id derived from (workflow_id, activity_id) so retries dedup?


## Streaming

When `streaming_topic` is set on `LangGraphPlugin`, calls to `stream_writer` leverage Temporal [Workflow Streams](https://github.com/temporalio/sdk-python/tree/main/temporalio/contrib/workflow_streams). Async nodes are recommended for this feature.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit thin for someone trying to get started actually using this.I'd expand to a self-contained snippet showing:

  • the plugin construction with streaming_topic,
  • WorkflowStream() in the workflow's __init__,
  • an external subscriber loop with WorkflowStreamClient.create(...).topic(...).subscribe(...),
  • an explicit "this is stream_mode='custom'; for other modes, bridge via astream() in the workflow" note,
  • retry semantics ("each attempt republishes from scratch").

Or just link to a hello world example if that's easier (I imagine we'll want one or two new additions to the samples repo?)

Comment on lines 40 to 50
"""LangGraph plugin for Temporal SDK.

.. warning::
This package is experimental and may change in future versions.
Use with caution in production environments.

This plugin runs `LangGraph <https://github.com/langchain-ai/langgraph>`_ nodes
and tasks as Temporal Activities, giving your AI agent workflows durable
execution, automatic retries, and timeouts. It supports both the LangGraph Graph
API (``StateGraph``) and Functional API (``@entrypoint`` / ``@task``).
"""
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: can we go ahead and flesh out this docstring with an Args: section that documents (at least) streaming_topic and streaming_batch_interval (what they do, how they relate, when to lower/raise the interval)? And I guess the other args too while we're at it?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants