Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 96 additions & 24 deletions src/agentex/lib/core/tracing/processors/agentex_tracing_processor.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import os
import asyncio
import weakref
from typing import TYPE_CHECKING, Any, Dict, override

from agentex import Agentex
from agentex.types.span import Span
from agentex.lib.types.tracing import AgentexTracingProcessorConfig
from agentex.lib.utils.logging import make_logger
from agentex.lib.adk.utils._modules.client import create_async_agentex_client
from agentex.lib.core.tracing.processors.tracing_processor_interface import (
SyncTracingProcessor,
Expand All @@ -14,28 +16,88 @@
if TYPE_CHECKING:
from agentex import AsyncAgentex

logger = make_logger(__name__)


# NOTE: This is the Agentex-backend toggle (writes to the agentex `spans`
# table via the Agentex API). It is intentionally SEPARATE from the SGP/EGP
# processor's ``AGENTEX_TRACING_SKIP_SPAN_START`` so the two backends can be
# controlled independently.
_SKIP_SPAN_START_ENV = "AGENTEX_TRACING_SKIP_AGENTEX_SPAN_START"


def _skip_span_start_enabled() -> bool:
"""Whether to skip the Agentex span-start write and persist each span only on end.

The Agentex processor otherwise writes every span twice: a ``spans.create``
on start (no ``end_time``/``output`` yet) and a ``spans.update`` on end.
The start row is overwritten by the end write moments later, so persisting
it doubles the per-span HTTP/DB write volume against the Agentex control
plane — the load that timed out span-start activities and pressured the
Agentex Postgres connection pool under load.

When enabled (the default), the start write is skipped and the END write
becomes a single ``spans.create`` carrying the complete span — one INSERT
per span instead of an INSERT + UPDATE. (A plain ``spans.update`` on end
would 404 because the row was never created.)

Default ON. Set ``AGENTEX_TRACING_SKIP_AGENTEX_SPAN_START`` to
``0``/``false``/``no``/``off`` to restore the start write — e.g. if you
need in-flight spans visible before they complete, or spans that never end
(process crash) to still be persisted.
"""
raw = os.environ.get(_SKIP_SPAN_START_ENV, "1").strip().lower()
return raw not in ("0", "false", "no", "off")


def _create_kwargs(span: Span) -> Dict[str, Any]:
"""Full-span kwargs for ``spans.create`` — used on start (skip disabled) and
on end (skip enabled, single-INSERT path)."""
return {
"name": span.name,
"start_time": span.start_time,
"end_time": span.end_time,
"id": span.id,
"trace_id": span.trace_id,
"parent_id": span.parent_id,
"input": span.input,
"output": span.output,
"data": span.data,
"task_id": span.task_id,
}


class AgentexSyncTracingProcessor(SyncTracingProcessor):
def __init__(self, config: AgentexTracingProcessorConfig): # noqa: ARG002
self.client = Agentex()
# Capture the skip decision once at init: both halves of a span's
# lifecycle MUST agree, otherwise a start-skip + end-update lands on a
# non-existent row (404) — or the reverse double-creates. Re-reading the
# env per event would let a mid-span toggle (tests, config reload) split
# the decision. Deploy-time flag, so a single read is correct.
self._skip_span_start = _skip_span_start_enabled()
logger.info(
"Agentex tracing span-start write %s (%s)",
"disabled — end-only ingest" if self._skip_span_start else "enabled",
_SKIP_SPAN_START_ENV,
)

@override
def on_span_start(self, span: Span) -> None:
self.client.spans.create(
name=span.name,
start_time=span.start_time,
end_time=span.end_time,
trace_id=span.trace_id,
id=span.id,
data=span.data,
input=span.input,
output=span.output,
parent_id=span.parent_id,
task_id=span.task_id,
)
# End-only ingest: by default the start write is skipped (see
# _skip_span_start_enabled) so each span is persisted once, on end.
if self._skip_span_start:
return
self.client.spans.create(**_create_kwargs(span))

@override
def on_span_end(self, span: Span) -> None:
# End-only ingest: the start create was skipped, so persist the complete
# span as a single INSERT here (a bare spans.update would 404 — no row).
if self._skip_span_start:
self.client.spans.create(**_create_kwargs(span))
return

update: Dict[str, Any] = {}
if span.trace_id:
update["trace_id"] = span.trace_id
Expand Down Expand Up @@ -82,6 +144,17 @@ def __init__(self, config: AgentexTracingProcessorConfig): # noqa: ARG002
self._clients_by_loop: weakref.WeakKeyDictionary[
asyncio.AbstractEventLoop, "AsyncAgentex"
] = weakref.WeakKeyDictionary()
# Capture the skip decision once at init: both halves of a span's
# lifecycle MUST agree, otherwise a start-skip + end-update lands on a
# non-existent row (404) — or the reverse double-creates. Re-reading the
# env per event would let a mid-span toggle (tests, config reload) split
# the decision. Deploy-time flag, so a single read is correct.
self._skip_span_start = _skip_span_start_enabled()
logger.info(
"Agentex tracing span-start write %s (%s)",
"disabled — end-only ingest" if self._skip_span_start else "enabled",
_SKIP_SPAN_START_ENV,
)

def _build_client(self) -> "AsyncAgentex":
import httpx
Expand Down Expand Up @@ -111,21 +184,20 @@ def client(self) -> "AsyncAgentex":
# https://linear.app/scale-epd/issue/AGX1-199/add-agentex-batch-endpoint-for-traces
@override
async def on_span_start(self, span: Span) -> None:
await self.client.spans.create(
name=span.name,
start_time=span.start_time,
end_time=span.end_time,
id=span.id,
trace_id=span.trace_id,
parent_id=span.parent_id,
input=span.input,
output=span.output,
data=span.data,
task_id=span.task_id,
)
# End-only ingest: by default the start write is skipped (see
# _skip_span_start_enabled) so each span is persisted once, on end.
if self._skip_span_start:
return
await self.client.spans.create(**_create_kwargs(span))

@override
async def on_span_end(self, span: Span) -> None:
# End-only ingest: the start create was skipped, so persist the complete
# span as a single INSERT here (a bare spans.update would 404 — no row).
if self._skip_span_start:
await self.client.spans.create(**_create_kwargs(span))
return

update: Dict[str, Any] = {}
if span.trace_id:
update["trace_id"] = span.trace_id
Expand Down
155 changes: 154 additions & 1 deletion tests/lib/core/tracing/processors/test_agentex_tracing_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

import asyncio
import weakref
from unittest.mock import MagicMock, patch
from datetime import datetime, timezone
from unittest.mock import AsyncMock, MagicMock, patch

import pytest

Expand All @@ -24,11 +25,163 @@
MODULE = "agentex.lib.core.tracing.processors.agentex_tracing_processor"


SKIP_ENV = "AGENTEX_TRACING_SKIP_AGENTEX_SPAN_START"


def _make_config() -> MagicMock:
"""Empty config — AgentexTracingProcessorConfig is unused by __init__."""
return MagicMock()


def _make_span():
from agentex.types.span import Span

now = datetime.now(timezone.utc)
return Span(
id="span-1",
trace_id="trace-1",
name="test-span",
start_time=now,
end_time=now,
input={"in": 1},
output={"out": 2},
)


class TestAgentexSyncSkipSpanStart:
"""The Agentex backend writes create-on-start + update-on-end by default.
End-only ingest (default) skips the start write and makes the END a single
create — verify the start is a no-op and end does an INSERT, not an UPDATE.
"""

def test_start_skipped_and_end_creates_by_default(self, monkeypatch):
monkeypatch.delenv(SKIP_ENV, raising=False) # default ON
with patch(f"{MODULE}.Agentex") as MockAgentex:
from agentex.lib.core.tracing.processors.agentex_tracing_processor import (
AgentexSyncTracingProcessor,
)

processor = AgentexSyncTracingProcessor(_make_config())
client = MockAgentex.return_value
span = _make_span()

processor.on_span_start(span)
client.spans.create.assert_not_called() # start skipped
client.spans.update.assert_not_called()

processor.on_span_end(span)
client.spans.create.assert_called_once() # single INSERT on end
client.spans.update.assert_not_called() # never a 404-prone UPDATE

def test_start_creates_and_end_updates_when_skip_disabled(self, monkeypatch):
monkeypatch.setenv(SKIP_ENV, "0")
with patch(f"{MODULE}.Agentex") as MockAgentex:
from agentex.lib.core.tracing.processors.agentex_tracing_processor import (
AgentexSyncTracingProcessor,
)

processor = AgentexSyncTracingProcessor(_make_config())
client = MockAgentex.return_value
span = _make_span()

processor.on_span_start(span)
client.spans.create.assert_called_once() # start write restored

processor.on_span_end(span)
client.spans.update.assert_called_once() # end is the UPDATE

def test_skip_decision_captured_at_init_not_per_call(self, monkeypatch):
"""The two halves of a span MUST use the same skip decision. A flag
toggled after construction must not split it (start-skip + end-update
would 404). The decision is captured once at init.
"""
monkeypatch.delenv(SKIP_ENV, raising=False) # construct with skip ON
with patch(f"{MODULE}.Agentex") as MockAgentex:
from agentex.lib.core.tracing.processors.agentex_tracing_processor import (
AgentexSyncTracingProcessor,
)

processor = AgentexSyncTracingProcessor(_make_config())
client = MockAgentex.return_value
span = _make_span()

processor.on_span_start(span) # skipped (cached ON)
monkeypatch.setenv(SKIP_ENV, "0") # toggle mid-span — must be ignored
processor.on_span_end(span)

client.spans.create.assert_called_once() # still end-only INSERT
client.spans.update.assert_not_called() # NOT a 404-prone UPDATE


class TestAgentexAsyncSkipSpanStart:
async def test_start_skipped_and_end_creates_by_default(self, monkeypatch):
monkeypatch.delenv(SKIP_ENV, raising=False) # default ON
with patch(f"{MODULE}.create_async_agentex_client") as mock_factory:
client = MagicMock()
client.spans.create = AsyncMock()
client.spans.update = AsyncMock()
mock_factory.return_value = client

from agentex.lib.core.tracing.processors.agentex_tracing_processor import (
AgentexAsyncTracingProcessor,
)

processor = AgentexAsyncTracingProcessor(_make_config())
span = _make_span()

await processor.on_span_start(span)
client.spans.create.assert_not_called() # start skipped
client.spans.update.assert_not_called()

await processor.on_span_end(span)
client.spans.create.assert_awaited_once() # single INSERT on end
client.spans.update.assert_not_called()

async def test_start_creates_and_end_updates_when_skip_disabled(self, monkeypatch):
monkeypatch.setenv(SKIP_ENV, "0")
with patch(f"{MODULE}.create_async_agentex_client") as mock_factory:
client = MagicMock()
client.spans.create = AsyncMock()
client.spans.update = AsyncMock()
mock_factory.return_value = client

from agentex.lib.core.tracing.processors.agentex_tracing_processor import (
AgentexAsyncTracingProcessor,
)

processor = AgentexAsyncTracingProcessor(_make_config())
span = _make_span()

await processor.on_span_start(span)
client.spans.create.assert_awaited_once() # start write restored

await processor.on_span_end(span)
client.spans.update.assert_awaited_once() # end is the UPDATE

async def test_skip_decision_captured_at_init_not_per_call(self, monkeypatch):
"""A flag toggled after construction must not split a span's lifecycle."""
monkeypatch.delenv(SKIP_ENV, raising=False) # construct with skip ON
with patch(f"{MODULE}.create_async_agentex_client") as mock_factory:
client = MagicMock()
client.spans.create = AsyncMock()
client.spans.update = AsyncMock()
mock_factory.return_value = client

from agentex.lib.core.tracing.processors.agentex_tracing_processor import (
AgentexAsyncTracingProcessor,
)

processor = AgentexAsyncTracingProcessor(_make_config())
span = _make_span()

await processor.on_span_start(span) # skipped (cached ON)
monkeypatch.setenv(SKIP_ENV, "0") # toggle mid-span — must be ignored
await processor.on_span_end(span)

client.spans.create.assert_awaited_once() # still end-only INSERT
client.spans.update.assert_not_called() # NOT a 404-prone UPDATE


class TestAgentexAsyncTracingProcessor:
"""Coverage for the per-event-loop client cache. The SGP processor has
matching tests; mirror them here so a regression in the Agentex side
Expand Down
Loading