From 98824af7aff48617c5908ce0ef4a5fd188cc61b8 Mon Sep 17 00:00:00 2001 From: Declan Brady Date: Tue, 23 Jun 2026 09:57:18 -0400 Subject: [PATCH 1/2] refactor(harness)!: consolidate the Pydantic-AI harness, remove tracing handler Collapse _pydantic_ai_async / _pydantic_ai_tracing into _pydantic_ai_sync (convert helper) and _pydantic_ai_turn (stream_pydantic_ai_events). Span tracing is derived from the canonical stream by UnifiedEmitter, so create_pydantic_ai_tracing_handler is removed. BREAKING CHANGE: create_pydantic_ai_tracing_handler is removed. Co-Authored-By: Claude Opus 4.8 (1M context) --- src/agentex/lib/adk/__init__.py | 5 +- .../lib/adk/_modules/_pydantic_ai_async.py | 65 ---- .../lib/adk/_modules/_pydantic_ai_sync.py | 38 +-- .../lib/adk/_modules/_pydantic_ai_tracing.py | 221 ------------- .../lib/adk/_modules/_pydantic_ai_turn.py | 57 +++- tests/lib/adk/test_pydantic_ai_async.py | 294 +----------------- tests/lib/adk/test_pydantic_ai_sync.py | 252 ++++++++++----- .../lib/adk/test_pydantic_ai_sync_unified.py | 209 ------------- tests/lib/adk/test_pydantic_ai_turn.py | 4 +- .../harness/test_harness_pydantic_ai_async.py | 47 +-- .../harness/test_harness_pydantic_ai_sync.py | 49 +-- 11 files changed, 240 insertions(+), 1001 deletions(-) delete mode 100644 src/agentex/lib/adk/_modules/_pydantic_ai_async.py delete mode 100644 src/agentex/lib/adk/_modules/_pydantic_ai_tracing.py delete mode 100644 tests/lib/adk/test_pydantic_ai_sync_unified.py diff --git a/src/agentex/lib/adk/__init__.py b/src/agentex/lib/adk/__init__.py index 86fa90253..23c81690a 100644 --- a/src/agentex/lib/adk/__init__.py +++ b/src/agentex/lib/adk/__init__.py @@ -11,10 +11,8 @@ emit_langgraph_messages, convert_langgraph_to_agentex_events, ) -from agentex.lib.adk._modules._pydantic_ai_async import stream_pydantic_ai_events +from agentex.lib.adk._modules._pydantic_ai_turn import PydanticAITurn, stream_pydantic_ai_events from agentex.lib.adk._modules._pydantic_ai_sync import convert_pydantic_ai_to_agentex_events -from agentex.lib.adk._modules._pydantic_ai_tracing import create_pydantic_ai_tracing_handler -from agentex.lib.adk._modules._pydantic_ai_turn import PydanticAITurn from agentex.lib.adk._modules._claude_code_sync import convert_claude_code_to_agentex_events from agentex.lib.adk._modules._claude_code_turn import ( ClaudeCodeTurn, @@ -75,7 +73,6 @@ # Pydantic AI "stream_pydantic_ai_events", "convert_pydantic_ai_to_agentex_events", - "create_pydantic_ai_tracing_handler", "PydanticAITurn", # Claude Code "convert_claude_code_to_agentex_events", diff --git a/src/agentex/lib/adk/_modules/_pydantic_ai_async.py b/src/agentex/lib/adk/_modules/_pydantic_ai_async.py deleted file mode 100644 index 85abfb845..000000000 --- a/src/agentex/lib/adk/_modules/_pydantic_ai_async.py +++ /dev/null @@ -1,65 +0,0 @@ -"""Async Pydantic AI streaming helper for Agentex. - -Consumes a Pydantic AI ``agent.run_stream_events(...)`` async iterator and -pushes Agentex streaming updates to Redis via the ``adk.streaming`` -contexts. For use with async ACP agents that stream via Redis rather than -HTTP yields. - -Text and thinking tokens stream as deltas inside coalesced streaming -contexts. Tool requests and tool results are posted as open+close pairs -on a streaming context (the unified surface persists ``initial_content`` -when a context is closed without deltas). This matches the ``auto_send`` -convention used by all other async/Temporal harnesses. - -Tracing is opt-in via a ``tracing_handler`` parameter — see -``create_pydantic_ai_tracing_handler`` in -``agentex.lib.adk._modules._pydantic_ai_tracing``. -""" - -from __future__ import annotations - -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - from agentex.lib.adk._modules._pydantic_ai_tracing import ( - AgentexPydanticAITracingHandler, - ) - - -async def stream_pydantic_ai_events( - stream, - task_id: str, - tracing_handler: "AgentexPydanticAITracingHandler | None" = None, -) -> str: - """Stream Pydantic AI events to Agentex via Redis. - - Args: - stream: Async iterator yielded by ``agent.run_stream_events(...)``. - task_id: The Agentex task ID to stream messages to. - tracing_handler: Optional handler from - ``create_pydantic_ai_tracing_handler(...)``. When provided, each - tool call in the run is also recorded as an Agentex child span - beneath the handler's configured ``parent_span_id``. Streaming - behavior is unchanged when omitted. - - Returns: - The accumulated text content of the **last** text part in the run. - Multi-step runs (where the model emits text, then a tool call, then - more text) return only the final text segment, matching the - ``stream_langgraph_events`` convention. - """ - from agentex.lib.core.harness.emitter import UnifiedEmitter - from agentex.lib.adk._modules._pydantic_ai_turn import PydanticAITurn - - turn = PydanticAITurn( - stream, - model=None, - tracing_handler=tracing_handler, - ) - emitter = UnifiedEmitter( - task_id=task_id, - trace_id=None, - parent_span_id=None, - ) - result = await emitter.auto_send_turn(turn) - return result.final_text diff --git a/src/agentex/lib/adk/_modules/_pydantic_ai_sync.py b/src/agentex/lib/adk/_modules/_pydantic_ai_sync.py index e4ac31e7e..0f9aaeb55 100644 --- a/src/agentex/lib/adk/_modules/_pydantic_ai_sync.py +++ b/src/agentex/lib/adk/_modules/_pydantic_ai_sync.py @@ -41,14 +41,9 @@ async def handle_message_send(params): import json import inspect -from typing import TYPE_CHECKING, Any, Callable, AsyncIterator +from typing import Any, Callable, AsyncIterator from pydantic_ai.run import AgentRunResultEvent - -if TYPE_CHECKING: - from agentex.lib.adk._modules._pydantic_ai_tracing import ( - AgentexPydanticAITracingHandler, - ) from pydantic_ai.messages import ( TextPart, PartEndEvent, @@ -124,7 +119,6 @@ def _tool_return_content(result: ToolReturnPart | Any) -> Any: async def convert_pydantic_ai_to_agentex_events( stream_response: AsyncIterator[Any], - tracing_handler: "AgentexPydanticAITracingHandler | None" = None, on_result: Callable[[AgentRunResultEvent], Any] | None = None, ) -> AsyncIterator[StreamTaskMessageStart | StreamTaskMessageDelta | StreamTaskMessageFull | StreamTaskMessageDone]: """Convert a Pydantic AI agent event stream into Agentex stream events. @@ -148,11 +142,6 @@ async def convert_pydantic_ai_to_agentex_events( stream_response: The async iterator yielded by Pydantic AI's ``agent.run_stream_events(...)`` context manager (or a stream of ``AgentStreamEvent`` items received in an ``event_stream_handler``). - tracing_handler: Optional handler from - ``create_pydantic_ai_tracing_handler(...)``. When provided, each - tool call in the run is also recorded as an Agentex child span - beneath the handler's configured ``parent_span_id``. Streaming - behavior is unchanged when omitted. on_result: Optional callback invoked with the terminal ``AgentRunResultEvent`` when the run completes. Both sync and async callables are accepted. No ``StreamTaskMessage*`` events are @@ -306,26 +295,6 @@ async def convert_pydantic_ai_to_agentex_events( if message_index is None: continue yield StreamTaskMessageDone(type="done", index=message_index) - # Tool-call parts end with the model's full args known. Open a - # tracing child span for the tool execution now; close it when - # FunctionToolResultEvent arrives below. - if tracing_handler is not None and isinstance(event.part, ToolCallPart) and event.part.tool_call_id: - args: dict[str, Any] | str | None - raw_args = event.part.args - if isinstance(raw_args, dict): - args = dict(raw_args) - elif isinstance(raw_args, str): - try: - args = json.loads(raw_args) if raw_args else {} - except json.JSONDecodeError: - args = {"_raw": raw_args} - else: - args = {} - await tracing_handler.on_tool_start( - tool_call_id=event.part.tool_call_id, - tool_name=event.part.tool_name, - arguments=args, - ) elif isinstance(event, FunctionToolResultEvent): result = event.part @@ -345,11 +314,6 @@ async def convert_pydantic_ai_to_agentex_events( content=content_payload, ), ) - if tracing_handler is not None and tool_call_id: - await tracing_handler.on_tool_end( - tool_call_id=tool_call_id, - result=content_payload, - ) elif isinstance(event, (FunctionToolCallEvent, FinalResultEvent, AgentRunResultEvent)): # Already covered by PartStart/PartDelta/PartEnd events above, or diff --git a/src/agentex/lib/adk/_modules/_pydantic_ai_tracing.py b/src/agentex/lib/adk/_modules/_pydantic_ai_tracing.py deleted file mode 100644 index e199d0a8c..000000000 --- a/src/agentex/lib/adk/_modules/_pydantic_ai_tracing.py +++ /dev/null @@ -1,221 +0,0 @@ -"""Tracing handler that records Agentex spans for tool calls in a pydantic-ai agent run. - -.. deprecated:: - ``AgentexPydanticAITracingHandler`` and ``create_pydantic_ai_tracing_handler`` - are superseded by the unified harness surface (``UnifiedEmitter`` in - ``agentex.lib.core.harness``). The unified surface derives tool and - reasoning spans directly from the canonical ``StreamTaskMessage*`` stream, - so no separate handler is required. Both symbols remain fully importable - and functional; they will be removed in a future release. New code should - construct a ``UnifiedEmitter`` with a ``trace_id`` instead: - - from agentex.lib.core.harness import UnifiedEmitter - from agentex.lib.adk._modules._pydantic_ai_turn import PydanticAITurn - - emitter = UnifiedEmitter(task_id=task_id, trace_id=trace_id, parent_span_id=parent_span_id) - turn = PydanticAITurn(agent.run_stream_events(prompt), model="openai:gpt-4o") - async for event in emitter.yield_turn(turn): - yield event - -# NOTE: A runtime ``warnings.warn(..., DeprecationWarning)`` is intentionally -# omitted here. The repo's pyproject ``filterwarnings = ["error"]`` would turn -# it into a test/caller failure, and the async helper (``stream_pydantic_ai_events``) -# still threads this handler through for existing callers that lack a ``trace_id`` -# on the async path. The runtime warning and caller migration are deferred until -# ``trace_id`` threading lands on the async helper in a future API-versioning change. - -Mirrors the LangGraph tracing handler pattern: the caller creates a handler -bound to a ``trace_id`` and a ``parent_span_id``, then hands it to -``stream_pydantic_ai_events(..., tracing_handler=handler)``. The streamer -calls ``on_tool_start`` / ``on_tool_end`` as it observes the corresponding -events in the agent stream, and the handler records one Agentex child span -per tool call. - -Why a handler-on-the-streamer rather than an OpenTelemetry bridge: -pydantic-ai exposes its stream of ``AgentStreamEvent`` directly, and that -stream already contains every signal we need to record tool spans. Going -through an OTel processor would require setting up an OTel ``TracerProvider`` -plus a bridge processor — that's a much larger investment, and orthogonal -to the streaming path we already own. This handler hooks into the same -event stream the UI-streaming helper consumes, so a single pass over the -events produces both: live deltas on Redis and child spans on the AgentEx -tracing pipeline. - -Why span IDs are derived from ``tool_call_id`` instead of held in a dict: -pydantic-ai's ``TemporalAgent`` splits the agent run across one or more -Temporal activities. The ``event_stream_handler`` is invoked once per -activity, with a fresh handler instance each time. So ``on_tool_start`` -(emitted inside the model activity that issued the tool call) and -``on_tool_end`` (emitted inside the next model activity, after the tool -runs) land in different handler instances — an in-memory dict can't pair -them. Deriving the span ID deterministically from ``(trace_id, -tool_call_id)`` makes the open/close pairing stateless: ``on_tool_end`` -re-derives the same ID and PATCHes the existing span directly. - -Span hierarchy produced:: - - (e.g. "Turn N", created by the caller) - ├── tool: (one child span per tool call) - └── tool: -""" - -from __future__ import annotations - -import uuid -from typing import Any -from datetime import UTC, datetime - -from agentex import AsyncAgentex -from agentex.lib.utils.logging import make_logger -from agentex.lib.adk._modules.tracing import TracingModule -from agentex.lib.adk.utils._modules.client import create_async_agentex_client - -logger = make_logger(__name__) - - -# Stable namespace for deriving tool-call span IDs. The exact UUID value is -# arbitrary; it just needs to be a constant so the same (trace_id, tool_call_id) -# always maps to the same span ID across handler invocations. -_TOOL_SPAN_NAMESPACE = uuid.UUID("8c2f9a2b-3e4d-4b5a-9c1f-0a1b2c3d4e5f") - - -def _tool_span_id(trace_id: str, tool_call_id: str) -> str: - """Deterministic span ID for a given tool call within a trace.""" - return str(uuid.uuid5(_TOOL_SPAN_NAMESPACE, f"{trace_id}:{tool_call_id}")) - - -class AgentexPydanticAITracingHandler: - """Records Agentex tracing spans for tool calls observed in a pydantic-ai event stream. - - .. deprecated:: - Superseded by ``UnifiedEmitter`` (``agentex.lib.core.harness``), which - derives tool and reasoning spans from the canonical ``StreamTaskMessage*`` - stream automatically when ``trace_id`` is provided. This class remains - fully functional but will be removed in a future release. New code should - use ``UnifiedEmitter`` with a trace context instead of constructing this - handler directly. - - Pass an instance to ``stream_pydantic_ai_events(..., tracing_handler=...)`` - or call ``on_tool_start`` / ``on_tool_end`` yourself if you're consuming - the event stream by hand. - """ - - def __init__( - self, - trace_id: str, - parent_span_id: str | None = None, - task_id: str | None = None, - tracing: TracingModule | None = None, - client: AsyncAgentex | None = None, - ) -> None: - self._trace_id = trace_id - self._parent_span_id = parent_span_id - # task_id on the span record (separate from trace_id) is what the - # AgentEx UI's per-task spans dropdown filters by. If you want your - # tool spans visible in that dropdown, set this to the task ID. - self._task_id = task_id - # ``_tracing`` is retained for callers / tests that want to inject a - # mocked TracingModule, even though the on_tool_* methods now go - # direct to the AgentEx client (see module docstring for why). - self._tracing_eager = tracing - self._tracing_lazy: TracingModule | None = None - # Defer client construction until first use so httpx binds to the - # running event loop (matches the TracingModule pattern). - self._client_eager = client - self._client_lazy: AsyncAgentex | None = None - - @property - def _tracing(self) -> TracingModule: - if self._tracing_eager is not None: - return self._tracing_eager - if self._tracing_lazy is None: - self._tracing_lazy = TracingModule() - return self._tracing_lazy - - @property - def _client(self) -> AsyncAgentex: - if self._client_eager is not None: - return self._client_eager - if self._client_lazy is None: - self._client_lazy = create_async_agentex_client() - return self._client_lazy - - async def on_tool_start( - self, - tool_call_id: str, - tool_name: str, - arguments: dict[str, Any] | str | None, - ) -> None: - """Open a child span for a tool call. - - Uses a deterministic span ID derived from ``tool_call_id`` so that - ``on_tool_end`` — which may run inside a different handler instance - when pydantic-ai splits the run across Temporal activities — can - close the same span without needing in-memory state. - """ - span_id = _tool_span_id(self._trace_id, tool_call_id) - await self._client.spans.create( - id=span_id, - trace_id=self._trace_id, - task_id=self._task_id, - parent_id=self._parent_span_id, - name=f"tool:{tool_name}" if tool_name else "tool", - start_time=datetime.now(UTC), - input={"arguments": arguments}, - data={"__span_type__": "CUSTOM"}, - ) - - async def on_tool_end(self, tool_call_id: str, result: Any) -> None: - """Close a child span by PATCHing its end_time and output. - - Re-derives the deterministic span ID from ``tool_call_id`` and updates - the existing span record directly. No in-memory span lookup, so this - works even when ``on_tool_start`` ran inside a different handler - instance (e.g. across pydantic-ai TemporalAgent activity boundaries). - """ - span_id = _tool_span_id(self._trace_id, tool_call_id) - await self._client.spans.update( - span_id, - end_time=datetime.now(UTC), - output={"result": result}, - ) - - async def on_tool_error(self, tool_call_id: str, error: BaseException | str) -> None: - """Close a child span with an error payload as output.""" - span_id = _tool_span_id(self._trace_id, tool_call_id) - await self._client.spans.update( - span_id, - end_time=datetime.now(UTC), - output={"error": str(error)}, - ) - - -def create_pydantic_ai_tracing_handler( - trace_id: str, - parent_span_id: str | None = None, - task_id: str | None = None, -) -> AgentexPydanticAITracingHandler: - """Create a tracing handler that records Agentex spans for pydantic-ai tool calls. - - .. deprecated:: - Superseded by ``UnifiedEmitter`` (``agentex.lib.core.harness``), which - derives tool and reasoning spans from the canonical ``StreamTaskMessage*`` - stream automatically when ``trace_id`` is provided. This function remains - fully functional but will be removed in a future release. New code should - construct a ``UnifiedEmitter`` with a trace context instead. - - Args: - trace_id: The trace ID. Typically the Agentex task ID. - parent_span_id: Optional parent span ID to nest tool spans under. If - omitted, the tool spans become trace-root spans. - task_id: Optional task ID stamped onto each span. Required for the - AgentEx UI's per-task spans dropdown to display the spans. - - Returns: - A handler suitable for passing to ``stream_pydantic_ai_events(..., tracing_handler=...)``. - """ - return AgentexPydanticAITracingHandler( - trace_id=trace_id, - parent_span_id=parent_span_id, - task_id=task_id, - ) diff --git a/src/agentex/lib/adk/_modules/_pydantic_ai_turn.py b/src/agentex/lib/adk/_modules/_pydantic_ai_turn.py index b06172e7f..4e9340d7a 100644 --- a/src/agentex/lib/adk/_modules/_pydantic_ai_turn.py +++ b/src/agentex/lib/adk/_modules/_pydantic_ai_turn.py @@ -15,7 +15,7 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Any, AsyncIterator +from typing import Any, AsyncIterator from pydantic_ai.run import AgentRunResultEvent @@ -28,9 +28,6 @@ ) from agentex.lib.adk._modules._pydantic_ai_sync import convert_pydantic_ai_to_agentex_events -if TYPE_CHECKING: - from agentex.lib.adk._modules._pydantic_ai_tracing import AgentexPydanticAITracingHandler - StreamTaskMessage = StreamTaskMessageStart | StreamTaskMessageDelta | StreamTaskMessageFull | StreamTaskMessageDone @@ -83,19 +80,17 @@ class PydanticAITurn: ``events`` is identical to the bare ``convert_pydantic_ai_to_agentex_events`` output (tool calls stream as ``Start + ToolRequestDelta + Done``, preserving argument-token streaming on the sync/yield channel). The foundation - ``auto_send`` delivers the streamed tool-request shape natively (AGX1-377), - so no coalescing is needed on either channel. + ``auto_send`` delivers the streamed tool-request shape natively, so no + coalescing is needed on either channel. """ def __init__( self, stream: AsyncIterator[Any], model: str | None = None, - tracing_handler: "AgentexPydanticAITracingHandler | None" = None, ) -> None: self._stream = stream self._model = model - self._tracing_handler = tracing_handler self._usage = TurnUsage(model=model) @property @@ -119,7 +114,6 @@ def _capture(result_event: AgentRunResultEvent) -> None: raw_stream = convert_pydantic_ai_to_agentex_events( self._stream, - tracing_handler=self._tracing_handler, on_result=_capture, ) async for ev in raw_stream: @@ -132,3 +126,48 @@ def usage(self) -> TurnUsage: Before exhaustion the model field is set but token fields are None. """ return self._usage + + +async def stream_pydantic_ai_events( + stream, + task_id: str, +) -> str: + """Stream Pydantic AI events to Agentex via Redis. + + Consumes a Pydantic AI ``agent.run_stream_events(...)`` async iterator and + pushes Agentex streaming updates to Redis via the ``adk.streaming`` + contexts. For use with async ACP agents that stream via Redis rather than + HTTP yields. + + Text and thinking tokens stream as deltas inside coalesced streaming + contexts. Tool requests and tool results are posted as open+close pairs + on a streaming context (the unified surface persists ``initial_content`` + when a context is closed without deltas). This matches the ``auto_send`` + convention used by all other async/Temporal harnesses. + + Tracing is derived automatically from the event stream by the emitter when + a ``trace_id`` is provided to the ``UnifiedEmitter``. + + Args: + stream: Async iterator yielded by ``agent.run_stream_events(...)``. + task_id: The Agentex task ID to stream messages to. + + Returns: + The accumulated text content of the **last** text part in the run. + Multi-step runs (where the model emits text, then a tool call, then + more text) return only the final text segment, matching the + ``stream_langgraph_events`` convention. + """ + from agentex.lib.core.harness.emitter import UnifiedEmitter + + turn = PydanticAITurn( + stream, + model=None, + ) + emitter = UnifiedEmitter( + task_id=task_id, + trace_id=None, + parent_span_id=None, + ) + result = await emitter.auto_send_turn(turn) + return result.final_text diff --git a/tests/lib/adk/test_pydantic_ai_async.py b/tests/lib/adk/test_pydantic_ai_async.py index 49cb6054c..4ab468152 100644 --- a/tests/lib/adk/test_pydantic_ai_async.py +++ b/tests/lib/adk/test_pydantic_ai_async.py @@ -36,7 +36,7 @@ from agentex.types.tool_request_content import ToolRequestContent from agentex.types.tool_response_content import ToolResponseContent from agentex.types.reasoning_content_delta import ReasoningContentDelta -from agentex.lib.adk._modules._pydantic_ai_async import stream_pydantic_ai_events +from agentex.lib.adk._modules._pydantic_ai_turn import stream_pydantic_ai_events TASK_ID = "task_test" @@ -262,8 +262,8 @@ async def test_tool_call_opens_streaming_context_with_identity( ) -> None: """Tool requests are delivered as a streaming context (Start+Delta+Done). - AGX1-377 fix: auto_send now delivers streamed tool-request messages - natively (Start+ToolRequestDelta+Done). The streaming context is opened + auto_send delivers streamed tool-request messages natively + (Start+ToolRequestDelta+Done). The streaming context is opened at the Start event with the initial ToolRequestContent (tool_call_id + name + empty arguments), argument tokens are streamed as deltas, and the context is closed on Done. @@ -304,7 +304,7 @@ async def test_tool_call_opens_streaming_context_with_identity( assert content.tool_call_id == "c1" assert content.name == "get_weather" assert content.author == "agent" - # AGX1-377 streamed shape: initial_content has empty args (args come via delta) + # Streamed shape: initial_content has empty args (args come via delta) assert content.arguments == {} # The arg delta is delivered as a stream_update assert len(ctx.updates) == 1 @@ -657,292 +657,6 @@ async def test_part_delta_without_matching_start_is_ignored( assert final == "" -class TestTracingHandler: - """Tracing handler hooks fire alongside streaming for each tool call.""" - - @dataclass - class _RecordingHandler: - starts: list[dict[str, Any]] = field(default_factory=list) - ends: list[dict[str, Any]] = field(default_factory=list) - - async def on_tool_start(self, tool_call_id: str, tool_name: str, arguments: Any) -> None: - self.starts.append({"tool_call_id": tool_call_id, "tool_name": tool_name, "arguments": arguments}) - - async def on_tool_end(self, tool_call_id: str, result: Any) -> None: - self.ends.append({"tool_call_id": tool_call_id, "result": result}) - - async def test_handler_records_start_and_end_for_each_tool_call( - self, fake_adk: tuple[FakeStreamingModule, FakeMessagesModule] - ) -> None: - streaming, messages = fake_adk - handler = self._RecordingHandler() - events = [ - PartStartEvent( - index=0, - part=ToolCallPart(tool_name="get_weather", args=None, tool_call_id="c1"), - ), - PartEndEvent( - index=0, - part=ToolCallPart(tool_name="get_weather", args='{"city":"Paris"}', tool_call_id="c1"), - ), - FunctionToolResultEvent( - part=ToolReturnPart(tool_name="get_weather", content="Sunny", tool_call_id="c1"), - ), - ] - await stream_pydantic_ai_events( - _aiter(events), - TASK_ID, - tracing_handler=handler, # type: ignore[arg-type] - ) - - # AGX1-373: tool messages arrive via streaming_task_message_context. - # Tracing is still additive — both messages are delivered AND hooks fire. - assert messages.created == [] - assert len(streaming.contexts) == 2 - assert isinstance(streaming.contexts[0].initial_content, ToolRequestContent) - assert isinstance(streaming.contexts[1].initial_content, ToolResponseContent) - # And both lifecycle hooks fired exactly once with the right payload. - assert handler.starts == [ - { - "tool_call_id": "c1", - "tool_name": "get_weather", - "arguments": {"city": "Paris"}, - } - ] - assert handler.ends == [{"tool_call_id": "c1", "result": "Sunny"}] - - async def test_handler_not_called_when_no_tool_calls_in_stream( - self, fake_adk: tuple[FakeStreamingModule, FakeMessagesModule] - ) -> None: - handler = self._RecordingHandler() - events = [ - PartStartEvent(index=0, part=TextPart(content="")), - PartDeltaEvent(index=0, delta=TextPartDelta(content_delta="Hello")), - PartEndEvent(index=0, part=TextPart(content="Hello")), - ] - await stream_pydantic_ai_events( - _aiter(events), - TASK_ID, - tracing_handler=handler, # type: ignore[arg-type] - ) - assert handler.starts == [] - assert handler.ends == [] - - async def test_handler_records_each_tool_in_multi_tool_run( - self, fake_adk: tuple[FakeStreamingModule, FakeMessagesModule] - ) -> None: - """A turn with two tool calls must produce two start/end pairs in order.""" - handler = self._RecordingHandler() - events = [ - PartStartEvent( - index=0, - part=ToolCallPart(tool_name="get_weather", args=None, tool_call_id="c1"), - ), - PartEndEvent( - index=0, - part=ToolCallPart(tool_name="get_weather", args="{}", tool_call_id="c1"), - ), - FunctionToolResultEvent( - part=ToolReturnPart(tool_name="get_weather", content="Sunny", tool_call_id="c1"), - ), - PartStartEvent( - index=0, - part=ToolCallPart(tool_name="lookup_city", args=None, tool_call_id="c2"), - ), - PartEndEvent( - index=0, - part=ToolCallPart(tool_name="lookup_city", args="{}", tool_call_id="c2"), - ), - FunctionToolResultEvent( - part=ToolReturnPart(tool_name="lookup_city", content="Paris, FR", tool_call_id="c2"), - ), - ] - await stream_pydantic_ai_events( - _aiter(events), - TASK_ID, - tracing_handler=handler, # type: ignore[arg-type] - ) - - assert [s["tool_call_id"] for s in handler.starts] == ["c1", "c2"] - assert [e["tool_call_id"] for e in handler.ends] == ["c1", "c2"] - assert handler.starts[0]["tool_name"] == "get_weather" - assert handler.starts[1]["tool_name"] == "lookup_city" - - async def test_omitting_handler_is_a_no_op_for_existing_behavior( - self, fake_adk: tuple[FakeStreamingModule, FakeMessagesModule] - ) -> None: - """Regression: passing no tracing handler preserves streaming behavior. - - AGX1-373: tool messages arrive via streaming_task_message_context - regardless of whether tracing_handler is passed. - """ - streaming, messages = fake_adk - events = [ - PartStartEvent( - index=0, - part=ToolCallPart(tool_name="get_weather", args=None, tool_call_id="c1"), - ), - PartEndEvent( - index=0, - part=ToolCallPart(tool_name="get_weather", args="{}", tool_call_id="c1"), - ), - FunctionToolResultEvent( - part=ToolReturnPart(tool_name="get_weather", content="Sunny", tool_call_id="c1"), - ), - ] - await stream_pydantic_ai_events(_aiter(events), TASK_ID) - # AGX1-373: tool messages via streaming_task_message_context. - assert messages.created == [] - assert len(streaming.contexts) == 2 - content_types = [type(ctx.initial_content).__name__ for ctx in streaming.contexts] - assert content_types == ["ToolRequestContent", "ToolResponseContent"] - - -class TestPydanticAITracingHandlerDeterministicIds: - """Regression coverage for ``AgentexPydanticAITracingHandler``. - - pydantic-ai's ``TemporalAgent`` splits a single agent run across several - Temporal activities. The event_stream_handler is invoked once per - activity, with a fresh handler instance each time. So ``on_tool_start`` - (during the model activity that issued the tool call) and ``on_tool_end`` - (during the next model activity, after the tool ran) end up in DIFFERENT - handler instances — an in-memory dict can't pair them. - - The fix is deterministic span IDs derived from ``(trace_id, tool_call_id)``. - These tests lock that in. - """ - - class _RecordingClient: - """Stand-in for ``AsyncAgentex`` capturing spans.create / spans.update calls.""" - - def __init__(self) -> None: - self.creates: list[dict[str, Any]] = [] - self.updates: list[tuple[str, dict[str, Any]]] = [] - self.spans = self # so .spans.create / .spans.update resolve back here - - async def create(self, **kwargs: Any) -> Any: - self.creates.append(kwargs) - return None - - async def update(self, span_id: str, **kwargs: Any) -> Any: - self.updates.append((span_id, kwargs)) - return None - - async def test_same_tool_call_id_yields_same_span_id_across_handler_instances( - self, - ) -> None: - """The whole point of the design: two handler instances with the same - trace_id and tool_call_id resolve to the same span ID — otherwise - ``on_tool_end`` patches a different (non-existent) record and the span - in the DB never gets ``end_time`` / ``output``.""" - from agentex.lib.adk._modules._pydantic_ai_tracing import ( - AgentexPydanticAITracingHandler, - ) - - client_a = self._RecordingClient() - client_b = self._RecordingClient() - - # Two independent handler instances — simulates the cross-activity - # invocation pattern in TemporalAgent. - handler_a = AgentexPydanticAITracingHandler( - trace_id="trace-1", - parent_span_id="parent-1", - task_id="task-1", - client=client_a, # type: ignore[arg-type] - ) - handler_b = AgentexPydanticAITracingHandler( - trace_id="trace-1", - parent_span_id="parent-1", - task_id="task-1", - client=client_b, # type: ignore[arg-type] - ) - - await handler_a.on_tool_start(tool_call_id="call_abc", tool_name="get_weather", arguments={"city": "Paris"}) - await handler_b.on_tool_end(tool_call_id="call_abc", result="Sunny, 72F") - - assert len(client_a.creates) == 1 - assert len(client_b.updates) == 1 - - created_span_id = client_a.creates[0]["id"] - updated_span_id = client_b.updates[0][0] - assert created_span_id == updated_span_id, ( - "on_tool_start and on_tool_end must address the same span across handler " - "instances; mismatch means tool spans will be left open and the AgentEx UI " - "will hide their trace." - ) - - async def test_different_tool_call_ids_yield_different_span_ids(self) -> None: - from agentex.lib.adk._modules._pydantic_ai_tracing import ( - AgentexPydanticAITracingHandler, - ) - - client = self._RecordingClient() - handler = AgentexPydanticAITracingHandler( - trace_id="trace-1", - client=client, # type: ignore[arg-type] - ) - - await handler.on_tool_start("call_a", "get_weather", {"city": "Paris"}) - await handler.on_tool_start("call_b", "get_weather", {"city": "Tokyo"}) - - ids = {c["id"] for c in client.creates} - assert len(ids) == 2, "Distinct tool_call_ids must map to distinct span IDs" - - async def test_same_tool_call_id_in_different_traces_yields_different_span_ids( - self, - ) -> None: - """Span IDs are namespaced by trace_id so two unrelated runs with the - same provider-issued tool_call_id don't collide.""" - from agentex.lib.adk._modules._pydantic_ai_tracing import ( - AgentexPydanticAITracingHandler, - ) - - client = self._RecordingClient() - handler_t1 = AgentexPydanticAITracingHandler(trace_id="trace-1", client=client) # type: ignore[arg-type] - handler_t2 = AgentexPydanticAITracingHandler(trace_id="trace-2", client=client) # type: ignore[arg-type] - - await handler_t1.on_tool_start("call_abc", "t", None) - await handler_t2.on_tool_start("call_abc", "t", None) - - ids = {c["id"] for c in client.creates} - assert len(ids) == 2 - - async def test_on_tool_end_patches_only_end_time_and_output(self) -> None: - """Don't overwrite start_time, name, parent_id, etc. on close — only patch - the fields we have new values for. Sending start_time again could clobber - what was set at create time.""" - from agentex.lib.adk._modules._pydantic_ai_tracing import ( - AgentexPydanticAITracingHandler, - ) - - client = self._RecordingClient() - handler = AgentexPydanticAITracingHandler(trace_id="trace-1", client=client) # type: ignore[arg-type] - - await handler.on_tool_end("call_abc", "Sunny") - - assert len(client.updates) == 1 - _, patch_kwargs = client.updates[0] - assert set(patch_kwargs.keys()) == {"end_time", "output"}, ( - f"Unexpected fields in tool span PATCH: {set(patch_kwargs.keys())}" - ) - assert patch_kwargs["output"] == {"result": "Sunny"} - - async def test_on_tool_error_patches_error_output(self) -> None: - from agentex.lib.adk._modules._pydantic_ai_tracing import ( - AgentexPydanticAITracingHandler, - ) - - client = self._RecordingClient() - handler = AgentexPydanticAITracingHandler(trace_id="trace-1", client=client) # type: ignore[arg-type] - - await handler.on_tool_error("call_abc", RuntimeError("boom")) - - assert len(client.updates) == 1 - _, patch_kwargs = client.updates[0] - assert "error" in patch_kwargs["output"] - assert "boom" in patch_kwargs["output"]["error"] - - class TestCleanupOnException: async def test_open_contexts_are_closed_on_iterator_failure( self, fake_adk: tuple[FakeStreamingModule, FakeMessagesModule] diff --git a/tests/lib/adk/test_pydantic_ai_sync.py b/tests/lib/adk/test_pydantic_ai_sync.py index 080bc5be8..ac9986f2b 100644 --- a/tests/lib/adk/test_pydantic_ai_sync.py +++ b/tests/lib/adk/test_pydantic_ai_sync.py @@ -1,4 +1,12 @@ -"""Tests for the Pydantic AI -> Agentex stream event converter.""" +"""Tests for the sync Pydantic AI -> Agentex path. + +Covers: +- The bare converter ``convert_pydantic_ai_to_agentex_events`` (text/thinking/ + tool-call streaming and arg-delta handling). +- The unified sync (HTTP ACP) path ``UnifiedEmitter.yield_turn(PydanticAITurn(...))``: + * Passthrough: yield_turn events equal PydanticAITurn(stream).events + * Span derivation (tool + reasoning) with a fake tracing backend +""" from __future__ import annotations @@ -25,6 +33,7 @@ FunctionToolResultEvent, ) +from agentex.lib.core.harness import UnifiedEmitter from agentex.types.reasoning_content import ReasoningContent from agentex.types.task_message_delta import TextDelta from agentex.types.tool_request_delta import ToolRequestDelta @@ -42,6 +51,9 @@ _args_delta_to_str, convert_pydantic_ai_to_agentex_events, ) +from agentex.lib.adk._modules._pydantic_ai_turn import PydanticAITurn + +from ..core.harness._fakes import FakeTracing async def _aiter(events: list[Any]) -> AsyncIterator[Any]: @@ -290,90 +302,6 @@ async def test_tool_retry_prompt_surfaces_as_response(self): assert out[0].content.content == "bad arguments" -class TestTracingHandlerSync: - """The sync converter has the same opt-in tracing-handler contract as the - async streamer: pass a handler and the converter calls ``on_tool_start`` / - ``on_tool_end`` for each tool call. Streaming yields are unchanged when - omitted.""" - - class _RecordingHandler: - def __init__(self) -> None: - self.starts: list[dict[str, Any]] = [] - self.ends: list[dict[str, Any]] = [] - - async def on_tool_start(self, tool_call_id: str, tool_name: str, arguments: Any) -> None: - self.starts.append({"tool_call_id": tool_call_id, "tool_name": tool_name, "arguments": arguments}) - - async def on_tool_end(self, tool_call_id: str, result: Any) -> None: - self.ends.append({"tool_call_id": tool_call_id, "result": result}) - - async def test_handler_records_start_and_end_for_a_tool_call(self): - handler = self._RecordingHandler() - events = [ - PartStartEvent( - index=0, - part=ToolCallPart(tool_name="get_weather", args=None, tool_call_id="c1"), - ), - PartEndEvent( - index=0, - part=ToolCallPart(tool_name="get_weather", args='{"city":"Paris"}', tool_call_id="c1"), - ), - FunctionToolResultEvent( - part=ToolReturnPart(tool_name="get_weather", content="Sunny", tool_call_id="c1"), - ), - ] - out = await _collect( - convert_pydantic_ai_to_agentex_events(_aiter(events), tracing_handler=handler) # type: ignore[arg-type] - ) - - # Streaming output is unchanged. - assert any(isinstance(e, StreamTaskMessageStart) for e in out) - assert any(isinstance(e, StreamTaskMessageFull) for e in out) - - assert handler.starts == [ - { - "tool_call_id": "c1", - "tool_name": "get_weather", - "arguments": {"city": "Paris"}, - } - ] - assert handler.ends == [{"tool_call_id": "c1", "result": "Sunny"}] - - async def test_handler_not_called_when_no_tool_calls(self): - handler = self._RecordingHandler() - events = [ - PartStartEvent(index=0, part=TextPart(content="")), - PartDeltaEvent(index=0, delta=TextPartDelta(content_delta="hi")), - PartEndEvent(index=0, part=TextPart(content="hi")), - ] - await _collect( - convert_pydantic_ai_to_agentex_events(_aiter(events), tracing_handler=handler) # type: ignore[arg-type] - ) - assert handler.starts == [] - assert handler.ends == [] - - async def test_omitting_handler_preserves_pre_tracing_behavior(self): - events = [ - PartStartEvent( - index=0, - part=ToolCallPart(tool_name="t", args=None, tool_call_id="c"), - ), - PartEndEvent( - index=0, - part=ToolCallPart(tool_name="t", args="{}", tool_call_id="c"), - ), - FunctionToolResultEvent( - part=ToolReturnPart(tool_name="t", content="ok", tool_call_id="c"), - ), - ] - out = await _collect(convert_pydantic_ai_to_agentex_events(_aiter(events))) - # Same emit shape as before: Start, Done, Full - types = [type(e).__name__ for e in out] - assert "StreamTaskMessageStart" in types - assert "StreamTaskMessageDone" in types - assert "StreamTaskMessageFull" in types - - class TestMultiStepRun: async def test_text_then_tool_then_text_assigns_distinct_indices(self): """A multi-step run: model emits text + tool call → tool runs → model emits more text. @@ -555,3 +483,157 @@ async def on_result_async(event: AgentRunResultEvent) -> None: assert len(awaited) == 1 assert awaited[0].result.output == "async_output" + + +# --------------------------------------------------------------------------- +# Unified sync path: PydanticAITurn + UnifiedEmitter.yield_turn +# +# Exercises the path documented in _pydantic_ai_sync.py under +# "Recommended: unified surface": +# - events forwarded by yield_turn equal PydanticAITurn(stream).events (passthrough) +# - with a trace context + fake tracing backend, tool / reasoning spans are derived +# --------------------------------------------------------------------------- + + +class TestUnifiedSyncPathPassthrough: + """The events forwarded by yield_turn are identical to PydanticAITurn.events.""" + + async def test_text_stream_passthrough(self): + raw_events = [ + PartStartEvent(index=0, part=TextPart(content="")), + PartDeltaEvent(index=0, delta=TextPartDelta(content_delta="hello")), + PartEndEvent(index=0, part=TextPart(content="hello")), + ] + + turn_a = PydanticAITurn(_aiter(raw_events), model="openai:gpt-4o") + direct = await _collect(turn_a.events) + + turn_b = PydanticAITurn(_aiter(raw_events), model="openai:gpt-4o") + emitter = UnifiedEmitter(task_id="t", trace_id=None, parent_span_id=None) + via_emitter = await _collect(emitter.yield_turn(turn_b)) + + assert len(via_emitter) == len(direct) + for a, b in zip(via_emitter, direct): + assert type(a) is type(b) + assert a.model_dump() == b.model_dump() + + async def test_tool_call_stream_passthrough(self): + raw_events = [ + PartStartEvent(index=0, part=ToolCallPart(tool_name="Bash", args=None, tool_call_id="c1")), + PartDeltaEvent(index=0, delta=ToolCallPartDelta(args_delta='{"cmd":"ls"}')), + PartEndEvent( + index=0, + part=ToolCallPart(tool_name="Bash", args='{"cmd":"ls"}', tool_call_id="c1"), + ), + ] + + turn_a = PydanticAITurn(_aiter(raw_events), model="openai:gpt-4o") + direct = await _collect(turn_a.events) + + turn_b = PydanticAITurn(_aiter(raw_events), model="openai:gpt-4o") + emitter = UnifiedEmitter(task_id="t", trace_id=None, parent_span_id=None) + via_emitter = await _collect(emitter.yield_turn(turn_b)) + + assert len(via_emitter) == len(direct) + for a, b in zip(via_emitter, direct): + assert type(a) is type(b) + assert a.model_dump() == b.model_dump() + + +class TestUnifiedSyncPathSpanDerivation: + """With trace context + fake tracing, spans are derived from the stream.""" + + async def test_tool_span_opened_and_closed(self): + """A tool call produces start_span + end_span on the fake tracing backend.""" + tool_events = [ + PartStartEvent( + index=0, + part=ToolCallPart(tool_name="Bash", args={"cmd": "ls"}, tool_call_id="call_1"), + ), + PartEndEvent( + index=0, + part=ToolCallPart(tool_name="Bash", args='{"cmd":"ls"}', tool_call_id="call_1"), + ), + FunctionToolResultEvent( + part=ToolReturnPart(tool_name="Bash", content="files", tool_call_id="call_1"), + ), + ] + + fake = FakeTracing() + turn = PydanticAITurn(_aiter(tool_events), model="openai:gpt-4o") + emitter = UnifiedEmitter(task_id="t", trace_id="tr", parent_span_id="p", tracing=fake) + + events = await _collect(emitter.yield_turn(turn)) + + assert len(events) >= 2, "at least Start(tool) + Done + Full(response)" + assert len(fake.started) == 1, "one tool span opened" + assert len(fake.ended) == 1, "one tool span closed" + span_name, parent_id, span_input = fake.started[0] + assert span_name == "Bash" + assert parent_id == "p" + closed_name, closed_output = fake.ended[0] + assert closed_name == "Bash" + + async def test_reasoning_span_opened_and_closed(self): + """A thinking/reasoning block produces start_span + end_span.""" + reasoning_events = [ + PartStartEvent(index=0, part=ThinkingPart(content="")), + PartDeltaEvent(index=0, delta=ThinkingPartDelta(content_delta="let me think")), + PartEndEvent(index=0, part=ThinkingPart(content="let me think")), + ] + + fake = FakeTracing() + turn = PydanticAITurn(_aiter(reasoning_events), model="openai:gpt-4o") + emitter = UnifiedEmitter(task_id="t", trace_id="tr", parent_span_id="p", tracing=fake) + + await _collect(emitter.yield_turn(turn)) + + assert len(fake.started) == 1, "one reasoning span opened" + assert len(fake.ended) == 1, "one reasoning span closed" + span_name, parent_id, _ = fake.started[0] + assert span_name == "reasoning" + assert parent_id == "p" + + async def test_no_trace_id_means_no_spans(self): + """When trace_id is None, no spans are derived even with a fake tracing backend.""" + raw_events = [ + PartStartEvent( + index=0, + part=ToolCallPart(tool_name="Bash", args={"cmd": "ls"}, tool_call_id="c2"), + ), + PartEndEvent( + index=0, + part=ToolCallPart(tool_name="Bash", args='{"cmd":"ls"}', tool_call_id="c2"), + ), + ] + + fake = FakeTracing() + turn = PydanticAITurn(_aiter(raw_events), model="openai:gpt-4o") + emitter = UnifiedEmitter(task_id="t", trace_id=None, parent_span_id=None, tracing=fake) + + await _collect(emitter.yield_turn(turn)) + + assert fake.started == [], "no spans when trace_id is absent" + assert fake.ended == [] + + async def test_tracer_false_suppresses_spans_even_with_trace_id(self): + """tracer=False disables span derivation regardless of trace_id.""" + raw_events = [ + PartStartEvent( + index=0, + part=ToolCallPart(tool_name="Bash", args={"cmd": "ls"}, tool_call_id="c3"), + ), + PartEndEvent( + index=0, + part=ToolCallPart(tool_name="Bash", args='{"cmd":"ls"}', tool_call_id="c3"), + ), + ] + + fake = FakeTracing() + turn = PydanticAITurn(_aiter(raw_events), model="openai:gpt-4o") + emitter = UnifiedEmitter(task_id="t", trace_id="tr", parent_span_id="p", tracer=False, tracing=fake) + + await _collect(emitter.yield_turn(turn)) + + assert fake.started == [] + assert fake.ended == [] diff --git a/tests/lib/adk/test_pydantic_ai_sync_unified.py b/tests/lib/adk/test_pydantic_ai_sync_unified.py deleted file mode 100644 index f920418de..000000000 --- a/tests/lib/adk/test_pydantic_ai_sync_unified.py +++ /dev/null @@ -1,209 +0,0 @@ -"""Tests for the unified sync (HTTP ACP) path: PydanticAITurn + UnifiedEmitter. - -Exercises the path documented in _pydantic_ai_sync.py under "Recommended: unified surface": -- events forwarded by yield_turn equal PydanticAITurn(stream).events (passthrough) -- with a trace context + fake tracing backend, tool spans are derived (start_span / end_span called) -- with a trace context + fake tracing backend, reasoning spans are derived -""" - -from __future__ import annotations - -from typing import Any, AsyncIterator - -from pydantic_ai.run import AgentRunResult, AgentRunResultEvent -from pydantic_ai.usage import RunUsage -from pydantic_ai.messages import ( - TextPart, - PartEndEvent, - ThinkingPart, - ToolCallPart, - TextPartDelta, - PartDeltaEvent, - PartStartEvent, - ThinkingPartDelta, - ToolCallPartDelta, -) - -from agentex.lib.core.harness import UnifiedEmitter -from agentex.lib.adk._modules._pydantic_ai_turn import PydanticAITurn - - -async def _aiter(events: list[Any]) -> AsyncIterator[Any]: - for e in events: - yield e - - -async def _collect(stream: AsyncIterator[Any]) -> list[Any]: - return [e async for e in stream] - - -class _FakeSpan: - def __init__(self, name: str): - self.name = name - self.output: Any = None - - -class _FakeTracing: - def __init__(self) -> None: - self.started: list[tuple[str, str | None, Any]] = [] - self.ended: list[tuple[str, Any]] = [] - - async def start_span(self, *, trace_id, name, input=None, parent_id=None, data=None, task_id=None): - self.started.append((name, parent_id, input)) - return _FakeSpan(name) - - async def end_span(self, *, trace_id, span): - self.ended.append((span.name, span.output)) - - -def _make_result_event(usage: RunUsage | None = None) -> AgentRunResultEvent: - result = AgentRunResult(output="done", _output_tool_name=None) - if usage is not None: - result._state.usage = usage - return AgentRunResultEvent(result=result) - - -class TestUnifiedSyncPathPassthrough: - """The events forwarded by yield_turn are identical to PydanticAITurn.events.""" - - async def test_text_stream_passthrough(self): - raw_events = [ - PartStartEvent(index=0, part=TextPart(content="")), - PartDeltaEvent(index=0, delta=TextPartDelta(content_delta="hello")), - PartEndEvent(index=0, part=TextPart(content="hello")), - ] - - turn_a = PydanticAITurn(_aiter(raw_events), model="openai:gpt-4o") - direct = await _collect(turn_a.events) - - turn_b = PydanticAITurn(_aiter(raw_events), model="openai:gpt-4o") - emitter = UnifiedEmitter(task_id="t", trace_id=None, parent_span_id=None) - via_emitter = await _collect(emitter.yield_turn(turn_b)) - - assert len(via_emitter) == len(direct) - for a, b in zip(via_emitter, direct): - assert type(a) is type(b) - assert a.model_dump() == b.model_dump() - - async def test_tool_call_stream_passthrough(self): - raw_events = [ - PartStartEvent(index=0, part=ToolCallPart(tool_name="Bash", args=None, tool_call_id="c1")), - PartDeltaEvent(index=0, delta=ToolCallPartDelta(args_delta='{"cmd":"ls"}')), - PartEndEvent( - index=0, - part=ToolCallPart(tool_name="Bash", args='{"cmd":"ls"}', tool_call_id="c1"), - ), - ] - - turn_a = PydanticAITurn(_aiter(raw_events), model="openai:gpt-4o") - direct = await _collect(turn_a.events) - - turn_b = PydanticAITurn(_aiter(raw_events), model="openai:gpt-4o") - emitter = UnifiedEmitter(task_id="t", trace_id=None, parent_span_id=None) - via_emitter = await _collect(emitter.yield_turn(turn_b)) - - assert len(via_emitter) == len(direct) - for a, b in zip(via_emitter, direct): - assert type(a) is type(b) - assert a.model_dump() == b.model_dump() - - -class TestUnifiedSyncPathSpanDerivation: - """With trace context + fake tracing, spans are derived from the stream.""" - - async def test_tool_span_opened_and_closed(self): - """A tool call produces start_span + end_span on the fake tracing backend.""" - from pydantic_ai.messages import ToolReturnPart, FunctionToolResultEvent - - tool_events = [ - PartStartEvent( - index=0, - part=ToolCallPart(tool_name="Bash", args={"cmd": "ls"}, tool_call_id="call_1"), - ), - PartEndEvent( - index=0, - part=ToolCallPart(tool_name="Bash", args='{"cmd":"ls"}', tool_call_id="call_1"), - ), - FunctionToolResultEvent( - part=ToolReturnPart(tool_name="Bash", content="files", tool_call_id="call_1"), - ), - ] - - fake = _FakeTracing() - turn = PydanticAITurn(_aiter(tool_events), model="openai:gpt-4o") - emitter = UnifiedEmitter(task_id="t", trace_id="tr", parent_span_id="p", tracing=fake) - - events = await _collect(emitter.yield_turn(turn)) - - assert len(events) >= 2, "at least Start(tool) + Done + Full(response)" - assert len(fake.started) == 1, "one tool span opened" - assert len(fake.ended) == 1, "one tool span closed" - span_name, parent_id, span_input = fake.started[0] - assert span_name == "Bash" - assert parent_id == "p" - closed_name, closed_output = fake.ended[0] - assert closed_name == "Bash" - - async def test_reasoning_span_opened_and_closed(self): - """A thinking/reasoning block produces start_span + end_span.""" - reasoning_events = [ - PartStartEvent(index=0, part=ThinkingPart(content="")), - PartDeltaEvent(index=0, delta=ThinkingPartDelta(content_delta="let me think")), - PartEndEvent(index=0, part=ThinkingPart(content="let me think")), - ] - - fake = _FakeTracing() - turn = PydanticAITurn(_aiter(reasoning_events), model="openai:gpt-4o") - emitter = UnifiedEmitter(task_id="t", trace_id="tr", parent_span_id="p", tracing=fake) - - await _collect(emitter.yield_turn(turn)) - - assert len(fake.started) == 1, "one reasoning span opened" - assert len(fake.ended) == 1, "one reasoning span closed" - span_name, parent_id, _ = fake.started[0] - assert span_name == "reasoning" - assert parent_id == "p" - - async def test_no_trace_id_means_no_spans(self): - """When trace_id is None, no spans are derived even with a fake tracing backend.""" - raw_events = [ - PartStartEvent( - index=0, - part=ToolCallPart(tool_name="Bash", args={"cmd": "ls"}, tool_call_id="c2"), - ), - PartEndEvent( - index=0, - part=ToolCallPart(tool_name="Bash", args='{"cmd":"ls"}', tool_call_id="c2"), - ), - ] - - fake = _FakeTracing() - turn = PydanticAITurn(_aiter(raw_events), model="openai:gpt-4o") - emitter = UnifiedEmitter(task_id="t", trace_id=None, parent_span_id=None, tracing=fake) - - await _collect(emitter.yield_turn(turn)) - - assert fake.started == [], "no spans when trace_id is absent" - assert fake.ended == [] - - async def test_tracer_false_suppresses_spans_even_with_trace_id(self): - """tracer=False disables span derivation regardless of trace_id.""" - raw_events = [ - PartStartEvent( - index=0, - part=ToolCallPart(tool_name="Bash", args={"cmd": "ls"}, tool_call_id="c3"), - ), - PartEndEvent( - index=0, - part=ToolCallPart(tool_name="Bash", args='{"cmd":"ls"}', tool_call_id="c3"), - ), - ] - - fake = _FakeTracing() - turn = PydanticAITurn(_aiter(raw_events), model="openai:gpt-4o") - emitter = UnifiedEmitter(task_id="t", trace_id="tr", parent_span_id="p", tracer=False, tracing=fake) - - await _collect(emitter.yield_turn(turn)) - - assert fake.started == [] - assert fake.ended == [] diff --git a/tests/lib/adk/test_pydantic_ai_turn.py b/tests/lib/adk/test_pydantic_ai_turn.py index 46bf247a3..c57251db6 100644 --- a/tests/lib/adk/test_pydantic_ai_turn.py +++ b/tests/lib/adk/test_pydantic_ai_turn.py @@ -233,8 +233,8 @@ async def test_no_usage_event_leaves_default_usage(self): class TestToolRequestStreaming: """PydanticAITurn.events equals the bare converter output unconditionally. - The foundation auto_send delivers Start+ToolRequestDelta+Done natively - (AGX1-377), so no coalescing is needed on either channel. + The foundation auto_send delivers Start+ToolRequestDelta+Done natively, so + no coalescing is needed on either channel. """ async def test_events_match_bare_converter_for_streamed_tool_call(self): diff --git a/tests/lib/core/harness/test_harness_pydantic_ai_async.py b/tests/lib/core/harness/test_harness_pydantic_ai_async.py index 8bda7d020..4b6b86415 100644 --- a/tests/lib/core/harness/test_harness_pydantic_ai_async.py +++ b/tests/lib/core/harness/test_harness_pydantic_ai_async.py @@ -12,7 +12,7 @@ The async path uses the bare PydanticAITurn (no coalescing): the foundation auto_send delivers streamed tool-request Start+ToolRequestDelta+Done messages -natively (AGX1-377 fix), so no coalescing wrapper is needed. +natively, so no coalescing wrapper is needed. What is tested -------------- @@ -51,6 +51,8 @@ from agentex.types.tool_response_content import ToolResponseContent from agentex.lib.adk._modules._pydantic_ai_turn import PydanticAITurn +from ._fakes import FakeTracing + # --------------------------------------------------------------------------- # Minimal agent under test # --------------------------------------------------------------------------- @@ -120,39 +122,6 @@ def streaming_task_message_context( return _FakeCtx(self.sink, ctype, initial_content) -# --------------------------------------------------------------------------- -# Fake tracing backend -# --------------------------------------------------------------------------- - - -class _FakeSpan: - def __init__(self, name: str) -> None: - self.name = name - self.output: Any = None - - -class _FakeTracing: - def __init__(self) -> None: - self.started: list[tuple[str, str | None]] = [] - self.ended: list[tuple[str, Any]] = [] - - async def start_span( - self, - *, - trace_id: str, - name: str, - input: Any = None, - parent_id: Any = None, - data: Any = None, - task_id: Any = None, - ) -> _FakeSpan: - self.started.append((name, parent_id)) - return _FakeSpan(name) - - async def end_span(self, *, trace_id: str, span: _FakeSpan) -> None: - self.ended.append((span.name, span.output)) - - # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- @@ -163,7 +132,7 @@ async def _run_auto_send_turn( user_msg: str = "What is the weather in Paris?", trace_id: str | None = None, parent_span_id: str | None = None, - fake_tracing: _FakeTracing | None = None, + fake_tracing: FakeTracing | None = None, ) -> tuple[TurnResult, _FakeStreaming]: """Drive the async (auto_send) path and return the TurnResult + fake streaming state.""" fake_streaming = _FakeStreaming() @@ -304,9 +273,9 @@ async def test_context_lifecycle_open_then_close(self) -> None: class TestAsyncAutoSendSpanDerivation: """Span derivation on the async path now works for streamed tool requests. - The foundation auto_send delivers Start+ToolRequestDelta+Done natively - (AGX1-377 fix). The SpanDeriver opens a tool span on Done(tool_request), - so the async path now derives spans just like the sync path. + The foundation auto_send delivers Start+ToolRequestDelta+Done natively. + The SpanDeriver opens a tool span on Done(tool_request), so the async path + derives spans just like the sync path. """ async def test_tool_span_derived_on_async_path(self) -> None: @@ -314,7 +283,7 @@ async def test_tool_span_derived_on_async_path(self) -> None: on the async/auto_send path when auto_send delivers the streamed Start+ToolRequestDelta+Done sequence.""" agent = _make_agent() - fake_tracing = _FakeTracing() + fake_tracing = FakeTracing() tracer = SpanTracer( trace_id="trace1", parent_span_id="parent", diff --git a/tests/lib/core/harness/test_harness_pydantic_ai_sync.py b/tests/lib/core/harness/test_harness_pydantic_ai_sync.py index 1557d0dd1..04beea81d 100644 --- a/tests/lib/core/harness/test_harness_pydantic_ai_sync.py +++ b/tests/lib/core/harness/test_harness_pydantic_ai_sync.py @@ -49,6 +49,8 @@ from agentex.types.tool_response_content import ToolResponseContent from agentex.lib.adk._modules._pydantic_ai_turn import PydanticAITurn +from ._fakes import FakeTracing + # --------------------------------------------------------------------------- # Minimal agent under test # --------------------------------------------------------------------------- @@ -74,39 +76,6 @@ def get_weather(city: str) -> str: return agent -# --------------------------------------------------------------------------- -# Fake tracing backend (no network calls) -# --------------------------------------------------------------------------- - - -class _FakeSpan: - def __init__(self, name: str) -> None: - self.name = name - self.output: Any = None - - -class _FakeTracing: - def __init__(self) -> None: - self.started: list[tuple[str, str | None]] = [] - self.ended: list[tuple[str, Any]] = [] - - async def start_span( - self, - *, - trace_id: str, - name: str, - input: Any = None, - parent_id: Any = None, - data: Any = None, - task_id: Any = None, - ) -> _FakeSpan: - self.started.append((name, parent_id)) - return _FakeSpan(name) - - async def end_span(self, *, trace_id: str, span: _FakeSpan) -> None: - self.ended.append((span.name, span.output)) - - # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- @@ -117,7 +86,7 @@ async def _run_yield_turn( user_msg: str = "What is the weather in Paris?", trace_id: str | None = None, parent_span_id: str | None = None, - fake_tracing: _FakeTracing | None = None, + fake_tracing: FakeTracing | None = None, ) -> list[Any]: """Drive the sync (yield) path and collect all yielded events.""" tracer: SpanTracer | bool | None = None @@ -245,7 +214,7 @@ class TestSyncYieldSpanDerivation: async def test_tool_span_opened_and_closed(self) -> None: """One tool span is opened and closed per tool call.""" agent = _make_agent() - fake_tracing = _FakeTracing() + fake_tracing = FakeTracing() tracer = SpanTracer( trace_id="trace1", parent_span_id="parent-span", @@ -266,14 +235,14 @@ async def test_tool_span_opened_and_closed(self) -> None: assert len(fake_tracing.started) == 1, "Expected exactly one tool span opened" assert len(fake_tracing.ended) == 1, "Expected exactly one tool span closed" - span_name, parent_id = fake_tracing.started[0] + span_name, parent_id, _ = fake_tracing.started[0] assert span_name == "get_weather" assert parent_id == "parent-span" async def test_tool_span_output_is_tool_result(self) -> None: """The closed tool span's output equals the tool's return value.""" agent = _make_agent() - fake_tracing = _FakeTracing() + fake_tracing = FakeTracing() tracer = SpanTracer( trace_id="trace1", parent_span_id="parent-span", @@ -299,7 +268,7 @@ async def test_tool_span_output_is_tool_result(self) -> None: async def test_no_trace_id_means_no_spans(self) -> None: """With trace_id=None, no spans are derived (emitter disables tracing).""" agent = _make_agent() - fake_tracing = _FakeTracing() + fake_tracing = FakeTracing() async with agent.run_stream_events("What is the weather in Paris?") as stream: turn = PydanticAITurn(stream, model="test") @@ -317,7 +286,7 @@ async def test_no_trace_id_means_no_spans(self) -> None: async def test_tracer_false_suppresses_spans(self) -> None: """tracer=False disables span derivation regardless of trace_id.""" agent = _make_agent() - fake_tracing = _FakeTracing() + fake_tracing = FakeTracing() async with agent.run_stream_events("What is the weather in Paris?") as stream: turn = PydanticAITurn(stream, model="test") @@ -345,7 +314,7 @@ async def handle(self, signal: Any) -> None: received_signals.append(signal) await super().handle(signal) - fake_tracing = _FakeTracing() + fake_tracing = FakeTracing() tracer = _RecordingTracer( trace_id="trace1", parent_span_id="parent", From eb2efcc40324db15793100f55f180b120102152d Mon Sep 17 00:00:00 2001 From: Declan Brady Date: Tue, 23 Jun 2026 12:49:53 -0400 Subject: [PATCH 2/2] docs(changelog): record Pydantic-AI tracing handler removal This PR removes create_pydantic_ai_tracing_handler from the public adk surface; document it here (the previous PR's entry was narrowed to the LangGraph handler it actually removed). Co-Authored-By: Claude Opus 4.8 (1M context) --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f758931b3..184bcd5c6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### ⚠ BREAKING CHANGES * **harness:** removed the deprecated bespoke LangGraph tracing handler `create_langgraph_tracing_handler` (and its `AgentexLangGraphTracingHandler` class) from the public `agentex.lib.adk` surface. Span tracing is now derived from the canonical `StreamTaskMessage*` stream by `UnifiedEmitter` — wrap your run in the harness `*Turn` and drive `UnifiedEmitter.yield_turn` / `auto_send_turn`. The `agentex init` templates were migrated accordingly. +* **harness:** removed the deprecated bespoke Pydantic-AI tracing handler `create_pydantic_ai_tracing_handler` (and its `AgentexPydanticAITracingHandler` class) from the public `agentex.lib.adk` surface. Span tracing is now derived from the canonical `StreamTaskMessage*` stream by `UnifiedEmitter` — wrap your run in `PydanticAITurn` and drive `UnifiedEmitter.yield_turn` / `auto_send_turn`. The `agentex init` templates were migrated accordingly. * **harness:** each harness now exposes exactly `__sync.py` + `__turn.py` under `agentex.lib.adk._modules`. The OpenAI harness `OpenAITurn` and `convert_openai_to_agentex_events` moved to `agentex.lib.adk._modules._openai_turn` / `_openai_sync`; back-compat shims remain at `agentex.lib.adk.providers._modules.{openai_turn,sync_provider}` for one release. Public facade names (`stream_pydantic_ai_events`, `stream_langgraph_events`, `emit_langgraph_messages`, etc.) are unchanged. ### Features