From d6ee3e2d311c09cb08fdc3e656fdd4cec1ae2958 Mon Sep 17 00:00:00 2001 From: Nitesh Dhanpal Date: Tue, 23 Jun 2026 10:49:48 -0700 Subject: [PATCH 1/2] feat(tracing): skip Agentex span-start write by default (end-only ingest) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Agentex tracing processor wrote every span twice — a `spans.create` on start and a `spans.update` on end — doubling per-span HTTP/DB writes against the Agentex control plane. Under load this is what timed out span-start activities and pressured the Agentex Postgres connection pool. Default to end-only ingest: skip the start write and persist each span once, as a single `spans.create` on end (a bare `spans.update` would 404 since the row was never created). Gated by a dedicated, default-ON env var `AGENTEX_TRACING_SKIP_AGENTEX_SPAN_START`, independent of the SGP/EGP processor's `AGENTEX_TRACING_SKIP_SPAN_START` so the EGP span path is unchanged. Set to 0/false/no/off to restore start writes. Co-Authored-By: Claude Opus 4.8 --- .../processors/agentex_tracing_processor.py | 108 +++++++++++++---- .../test_agentex_tracing_processor.py | 110 +++++++++++++++++- 2 files changed, 193 insertions(+), 25 deletions(-) diff --git a/src/agentex/lib/core/tracing/processors/agentex_tracing_processor.py b/src/agentex/lib/core/tracing/processors/agentex_tracing_processor.py index 98d50546b..f28609846 100644 --- a/src/agentex/lib/core/tracing/processors/agentex_tracing_processor.py +++ b/src/agentex/lib/core/tracing/processors/agentex_tracing_processor.py @@ -1,3 +1,4 @@ +import os import asyncio import weakref from typing import TYPE_CHECKING, Any, Dict, override @@ -5,6 +6,7 @@ from agentex import Agentex from agentex.types.span import Span from agentex.lib.types.tracing import AgentexTracingProcessorConfig +from agentex.lib.utils.logging import make_logger from agentex.lib.adk.utils._modules.client import create_async_agentex_client from agentex.lib.core.tracing.processors.tracing_processor_interface import ( SyncTracingProcessor, @@ -14,28 +16,82 @@ if TYPE_CHECKING: from agentex import AsyncAgentex +logger = make_logger(__name__) + + +# NOTE: This is the Agentex-backend toggle (writes to the agentex `spans` +# table via the Agentex API). It is intentionally SEPARATE from the SGP/EGP +# processor's ``AGENTEX_TRACING_SKIP_SPAN_START`` so the two backends can be +# controlled independently. +_SKIP_SPAN_START_ENV = "AGENTEX_TRACING_SKIP_AGENTEX_SPAN_START" + + +def _skip_span_start_enabled() -> bool: + """Whether to skip the Agentex span-start write and persist each span only on end. + + The Agentex processor otherwise writes every span twice: a ``spans.create`` + on start (no ``end_time``/``output`` yet) and a ``spans.update`` on end. + The start row is overwritten by the end write moments later, so persisting + it doubles the per-span HTTP/DB write volume against the Agentex control + plane — the load that timed out span-start activities and pressured the + Agentex Postgres connection pool under load. + + When enabled (the default), the start write is skipped and the END write + becomes a single ``spans.create`` carrying the complete span — one INSERT + per span instead of an INSERT + UPDATE. (A plain ``spans.update`` on end + would 404 because the row was never created.) + + Default ON. Set ``AGENTEX_TRACING_SKIP_AGENTEX_SPAN_START`` to + ``0``/``false``/``no``/``off`` to restore the start write — e.g. if you + need in-flight spans visible before they complete, or spans that never end + (process crash) to still be persisted. + """ + raw = os.environ.get(_SKIP_SPAN_START_ENV, "1").strip().lower() + return raw not in ("0", "false", "no", "off") + + +def _create_kwargs(span: Span) -> Dict[str, Any]: + """Full-span kwargs for ``spans.create`` — used on start (skip disabled) and + on end (skip enabled, single-INSERT path).""" + return { + "name": span.name, + "start_time": span.start_time, + "end_time": span.end_time, + "id": span.id, + "trace_id": span.trace_id, + "parent_id": span.parent_id, + "input": span.input, + "output": span.output, + "data": span.data, + "task_id": span.task_id, + } + class AgentexSyncTracingProcessor(SyncTracingProcessor): def __init__(self, config: AgentexTracingProcessorConfig): # noqa: ARG002 self.client = Agentex() + logger.info( + "Agentex tracing span-start write %s (%s)", + "disabled — end-only ingest" if _skip_span_start_enabled() else "enabled", + _SKIP_SPAN_START_ENV, + ) @override def on_span_start(self, span: Span) -> None: - self.client.spans.create( - name=span.name, - start_time=span.start_time, - end_time=span.end_time, - trace_id=span.trace_id, - id=span.id, - data=span.data, - input=span.input, - output=span.output, - parent_id=span.parent_id, - task_id=span.task_id, - ) + # End-only ingest: by default the start write is skipped (see + # _skip_span_start_enabled) so each span is persisted once, on end. + if _skip_span_start_enabled(): + return + self.client.spans.create(**_create_kwargs(span)) @override def on_span_end(self, span: Span) -> None: + # End-only ingest: the start create was skipped, so persist the complete + # span as a single INSERT here (a bare spans.update would 404 — no row). + if _skip_span_start_enabled(): + self.client.spans.create(**_create_kwargs(span)) + return + update: Dict[str, Any] = {} if span.trace_id: update["trace_id"] = span.trace_id @@ -82,6 +138,11 @@ def __init__(self, config: AgentexTracingProcessorConfig): # noqa: ARG002 self._clients_by_loop: weakref.WeakKeyDictionary[ asyncio.AbstractEventLoop, "AsyncAgentex" ] = weakref.WeakKeyDictionary() + logger.info( + "Agentex tracing span-start write %s (%s)", + "disabled — end-only ingest" if _skip_span_start_enabled() else "enabled", + _SKIP_SPAN_START_ENV, + ) def _build_client(self) -> "AsyncAgentex": import httpx @@ -111,21 +172,20 @@ def client(self) -> "AsyncAgentex": # https://linear.app/scale-epd/issue/AGX1-199/add-agentex-batch-endpoint-for-traces @override async def on_span_start(self, span: Span) -> None: - await self.client.spans.create( - name=span.name, - start_time=span.start_time, - end_time=span.end_time, - id=span.id, - trace_id=span.trace_id, - parent_id=span.parent_id, - input=span.input, - output=span.output, - data=span.data, - task_id=span.task_id, - ) + # End-only ingest: by default the start write is skipped (see + # _skip_span_start_enabled) so each span is persisted once, on end. + if _skip_span_start_enabled(): + return + await self.client.spans.create(**_create_kwargs(span)) @override async def on_span_end(self, span: Span) -> None: + # End-only ingest: the start create was skipped, so persist the complete + # span as a single INSERT here (a bare spans.update would 404 — no row). + if _skip_span_start_enabled(): + await self.client.spans.create(**_create_kwargs(span)) + return + update: Dict[str, Any] = {} if span.trace_id: update["trace_id"] = span.trace_id diff --git a/tests/lib/core/tracing/processors/test_agentex_tracing_processor.py b/tests/lib/core/tracing/processors/test_agentex_tracing_processor.py index ec1ed5e88..5c0ce641a 100644 --- a/tests/lib/core/tracing/processors/test_agentex_tracing_processor.py +++ b/tests/lib/core/tracing/processors/test_agentex_tracing_processor.py @@ -2,7 +2,8 @@ import asyncio import weakref -from unittest.mock import MagicMock, patch +from datetime import datetime, timezone +from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -24,11 +25,118 @@ MODULE = "agentex.lib.core.tracing.processors.agentex_tracing_processor" +SKIP_ENV = "AGENTEX_TRACING_SKIP_AGENTEX_SPAN_START" + + def _make_config() -> MagicMock: """Empty config — AgentexTracingProcessorConfig is unused by __init__.""" return MagicMock() +def _make_span(): + from agentex.types.span import Span + + now = datetime.now(timezone.utc) + return Span( + id="span-1", + trace_id="trace-1", + name="test-span", + start_time=now, + end_time=now, + input={"in": 1}, + output={"out": 2}, + ) + + +class TestAgentexSyncSkipSpanStart: + """The Agentex backend writes create-on-start + update-on-end by default. + End-only ingest (default) skips the start write and makes the END a single + create — verify the start is a no-op and end does an INSERT, not an UPDATE. + """ + + def test_start_skipped_and_end_creates_by_default(self, monkeypatch): + monkeypatch.delenv(SKIP_ENV, raising=False) # default ON + with patch(f"{MODULE}.Agentex") as MockAgentex: + from agentex.lib.core.tracing.processors.agentex_tracing_processor import ( + AgentexSyncTracingProcessor, + ) + + processor = AgentexSyncTracingProcessor(_make_config()) + client = MockAgentex.return_value + span = _make_span() + + processor.on_span_start(span) + client.spans.create.assert_not_called() # start skipped + client.spans.update.assert_not_called() + + processor.on_span_end(span) + client.spans.create.assert_called_once() # single INSERT on end + client.spans.update.assert_not_called() # never a 404-prone UPDATE + + def test_start_creates_and_end_updates_when_skip_disabled(self, monkeypatch): + monkeypatch.setenv(SKIP_ENV, "0") + with patch(f"{MODULE}.Agentex") as MockAgentex: + from agentex.lib.core.tracing.processors.agentex_tracing_processor import ( + AgentexSyncTracingProcessor, + ) + + processor = AgentexSyncTracingProcessor(_make_config()) + client = MockAgentex.return_value + span = _make_span() + + processor.on_span_start(span) + client.spans.create.assert_called_once() # start write restored + + processor.on_span_end(span) + client.spans.update.assert_called_once() # end is the UPDATE + + +class TestAgentexAsyncSkipSpanStart: + async def test_start_skipped_and_end_creates_by_default(self, monkeypatch): + monkeypatch.delenv(SKIP_ENV, raising=False) # default ON + with patch(f"{MODULE}.create_async_agentex_client") as mock_factory: + client = MagicMock() + client.spans.create = AsyncMock() + client.spans.update = AsyncMock() + mock_factory.return_value = client + + from agentex.lib.core.tracing.processors.agentex_tracing_processor import ( + AgentexAsyncTracingProcessor, + ) + + processor = AgentexAsyncTracingProcessor(_make_config()) + span = _make_span() + + await processor.on_span_start(span) + client.spans.create.assert_not_called() # start skipped + client.spans.update.assert_not_called() + + await processor.on_span_end(span) + client.spans.create.assert_awaited_once() # single INSERT on end + client.spans.update.assert_not_called() + + async def test_start_creates_and_end_updates_when_skip_disabled(self, monkeypatch): + monkeypatch.setenv(SKIP_ENV, "0") + with patch(f"{MODULE}.create_async_agentex_client") as mock_factory: + client = MagicMock() + client.spans.create = AsyncMock() + client.spans.update = AsyncMock() + mock_factory.return_value = client + + from agentex.lib.core.tracing.processors.agentex_tracing_processor import ( + AgentexAsyncTracingProcessor, + ) + + processor = AgentexAsyncTracingProcessor(_make_config()) + span = _make_span() + + await processor.on_span_start(span) + client.spans.create.assert_awaited_once() # start write restored + + await processor.on_span_end(span) + client.spans.update.assert_awaited_once() # end is the UPDATE + + class TestAgentexAsyncTracingProcessor: """Coverage for the per-event-loop client cache. The SGP processor has matching tests; mirror them here so a regression in the Agentex side From 8474cc026dfb4c104913875d44c181f2ec7d4fff Mon Sep 17 00:00:00 2001 From: Nitesh Dhanpal Date: Tue, 23 Jun 2026 11:11:06 -0700 Subject: [PATCH 2/2] fix(tracing): capture skip-span-start decision once at init MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per Greptile review: re-reading the flag in both on_span_start and on_span_end risks splitting a span's lifecycle if the env toggles between them — start-skip + end-update lands on a non-existent row (404). Unlike the SGP processor (idempotent upsert_batch on both halves), the Agentex backend's create-on-start / update-on-end asymmetry makes this a real failure. Capture _skip_span_start once in __init__ for both sync and async processors and reference it in both handlers, so the decision is always consistent. Adds a regression test (toggle env mid-span -> still end-only INSERT). Co-Authored-By: Claude Opus 4.8 --- .../processors/agentex_tracing_processor.py | 24 +++++++--- .../test_agentex_tracing_processor.py | 45 +++++++++++++++++++ 2 files changed, 63 insertions(+), 6 deletions(-) diff --git a/src/agentex/lib/core/tracing/processors/agentex_tracing_processor.py b/src/agentex/lib/core/tracing/processors/agentex_tracing_processor.py index f28609846..448d013e9 100644 --- a/src/agentex/lib/core/tracing/processors/agentex_tracing_processor.py +++ b/src/agentex/lib/core/tracing/processors/agentex_tracing_processor.py @@ -70,9 +70,15 @@ def _create_kwargs(span: Span) -> Dict[str, Any]: class AgentexSyncTracingProcessor(SyncTracingProcessor): def __init__(self, config: AgentexTracingProcessorConfig): # noqa: ARG002 self.client = Agentex() + # Capture the skip decision once at init: both halves of a span's + # lifecycle MUST agree, otherwise a start-skip + end-update lands on a + # non-existent row (404) — or the reverse double-creates. Re-reading the + # env per event would let a mid-span toggle (tests, config reload) split + # the decision. Deploy-time flag, so a single read is correct. + self._skip_span_start = _skip_span_start_enabled() logger.info( "Agentex tracing span-start write %s (%s)", - "disabled — end-only ingest" if _skip_span_start_enabled() else "enabled", + "disabled — end-only ingest" if self._skip_span_start else "enabled", _SKIP_SPAN_START_ENV, ) @@ -80,7 +86,7 @@ def __init__(self, config: AgentexTracingProcessorConfig): # noqa: ARG002 def on_span_start(self, span: Span) -> None: # End-only ingest: by default the start write is skipped (see # _skip_span_start_enabled) so each span is persisted once, on end. - if _skip_span_start_enabled(): + if self._skip_span_start: return self.client.spans.create(**_create_kwargs(span)) @@ -88,7 +94,7 @@ def on_span_start(self, span: Span) -> None: def on_span_end(self, span: Span) -> None: # End-only ingest: the start create was skipped, so persist the complete # span as a single INSERT here (a bare spans.update would 404 — no row). - if _skip_span_start_enabled(): + if self._skip_span_start: self.client.spans.create(**_create_kwargs(span)) return @@ -138,9 +144,15 @@ def __init__(self, config: AgentexTracingProcessorConfig): # noqa: ARG002 self._clients_by_loop: weakref.WeakKeyDictionary[ asyncio.AbstractEventLoop, "AsyncAgentex" ] = weakref.WeakKeyDictionary() + # Capture the skip decision once at init: both halves of a span's + # lifecycle MUST agree, otherwise a start-skip + end-update lands on a + # non-existent row (404) — or the reverse double-creates. Re-reading the + # env per event would let a mid-span toggle (tests, config reload) split + # the decision. Deploy-time flag, so a single read is correct. + self._skip_span_start = _skip_span_start_enabled() logger.info( "Agentex tracing span-start write %s (%s)", - "disabled — end-only ingest" if _skip_span_start_enabled() else "enabled", + "disabled — end-only ingest" if self._skip_span_start else "enabled", _SKIP_SPAN_START_ENV, ) @@ -174,7 +186,7 @@ def client(self) -> "AsyncAgentex": async def on_span_start(self, span: Span) -> None: # End-only ingest: by default the start write is skipped (see # _skip_span_start_enabled) so each span is persisted once, on end. - if _skip_span_start_enabled(): + if self._skip_span_start: return await self.client.spans.create(**_create_kwargs(span)) @@ -182,7 +194,7 @@ async def on_span_start(self, span: Span) -> None: async def on_span_end(self, span: Span) -> None: # End-only ingest: the start create was skipped, so persist the complete # span as a single INSERT here (a bare spans.update would 404 — no row). - if _skip_span_start_enabled(): + if self._skip_span_start: await self.client.spans.create(**_create_kwargs(span)) return diff --git a/tests/lib/core/tracing/processors/test_agentex_tracing_processor.py b/tests/lib/core/tracing/processors/test_agentex_tracing_processor.py index 5c0ce641a..84f37b495 100644 --- a/tests/lib/core/tracing/processors/test_agentex_tracing_processor.py +++ b/tests/lib/core/tracing/processors/test_agentex_tracing_processor.py @@ -90,6 +90,28 @@ def test_start_creates_and_end_updates_when_skip_disabled(self, monkeypatch): processor.on_span_end(span) client.spans.update.assert_called_once() # end is the UPDATE + def test_skip_decision_captured_at_init_not_per_call(self, monkeypatch): + """The two halves of a span MUST use the same skip decision. A flag + toggled after construction must not split it (start-skip + end-update + would 404). The decision is captured once at init. + """ + monkeypatch.delenv(SKIP_ENV, raising=False) # construct with skip ON + with patch(f"{MODULE}.Agentex") as MockAgentex: + from agentex.lib.core.tracing.processors.agentex_tracing_processor import ( + AgentexSyncTracingProcessor, + ) + + processor = AgentexSyncTracingProcessor(_make_config()) + client = MockAgentex.return_value + span = _make_span() + + processor.on_span_start(span) # skipped (cached ON) + monkeypatch.setenv(SKIP_ENV, "0") # toggle mid-span — must be ignored + processor.on_span_end(span) + + client.spans.create.assert_called_once() # still end-only INSERT + client.spans.update.assert_not_called() # NOT a 404-prone UPDATE + class TestAgentexAsyncSkipSpanStart: async def test_start_skipped_and_end_creates_by_default(self, monkeypatch): @@ -136,6 +158,29 @@ async def test_start_creates_and_end_updates_when_skip_disabled(self, monkeypatc await processor.on_span_end(span) client.spans.update.assert_awaited_once() # end is the UPDATE + async def test_skip_decision_captured_at_init_not_per_call(self, monkeypatch): + """A flag toggled after construction must not split a span's lifecycle.""" + monkeypatch.delenv(SKIP_ENV, raising=False) # construct with skip ON + with patch(f"{MODULE}.create_async_agentex_client") as mock_factory: + client = MagicMock() + client.spans.create = AsyncMock() + client.spans.update = AsyncMock() + mock_factory.return_value = client + + from agentex.lib.core.tracing.processors.agentex_tracing_processor import ( + AgentexAsyncTracingProcessor, + ) + + processor = AgentexAsyncTracingProcessor(_make_config()) + span = _make_span() + + await processor.on_span_start(span) # skipped (cached ON) + monkeypatch.setenv(SKIP_ENV, "0") # toggle mid-span — must be ignored + await processor.on_span_end(span) + + client.spans.create.assert_awaited_once() # still end-only INSERT + client.spans.update.assert_not_called() # NOT a 404-prone UPDATE + class TestAgentexAsyncTracingProcessor: """Coverage for the per-event-loop client cache. The SGP processor has