Skip to content
Draft
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class ToolCallRequestPart:

name: str
id: str | None = None
arguments: dict[str, object] | list[object] | None = None
arguments: dict[str, object] | list[object] | str | None = None
type: str = field(default="tool_call", init=False)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,5 @@ microsoft_agents_a365/observability/extensions/agentframework/

## Dependencies

- `agent-framework-azure-ai` - Microsoft Agents SDK
- `agent-framework` - Microsoft Agents SDK
- `microsoft-agents-a365-observability-core` - Core observability
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

"""Maps Agent Framework span tag messages to A365 versioned message format.

Agent Framework sets ``gen_ai.input.messages`` / ``gen_ai.output.messages`` as span
tags containing JSON arrays of ``{role, parts[{type, content}], finish_reason?}``.
This mapper converts them to :class:`InputMessages` / :class:`OutputMessages`.
"""

from __future__ import annotations

import json
import logging
from typing import Any

from microsoft_agents_a365.observability.core.message_utils import serialize_messages
from microsoft_agents_a365.observability.core.models.messages import (
BlobPart,
ChatMessage,
FilePart,
GenericPart,
InputMessages,
MessagePart,
MessageRole,
OutputMessage,
OutputMessages,
ReasoningPart,
TextPart,
ToolCallRequestPart,
ToolCallResponsePart,
UriPart,
)

logger = logging.getLogger(__name__)

_ROLE_MAP: dict[str, MessageRole] = {
"system": MessageRole.SYSTEM,
"user": MessageRole.USER,
"assistant": MessageRole.ASSISTANT,
"tool": MessageRole.TOOL,
}


def map_input_messages(messages_json: str) -> str | None:
"""Map a ``gen_ai.input.messages`` tag value to a serialized A365 JSON string.

Args:
messages_json: The raw JSON string from the span attribute.

Returns:
Serialized :class:`InputMessages` JSON string, or ``None`` if the
input is empty or cannot be parsed.
"""
try:
raw = json.loads(messages_json)
except (json.JSONDecodeError, TypeError):
logger.debug("Failed to parse input messages JSON: %s", messages_json[:200])
return None

if not isinstance(raw, list):
return None

chat_messages: list[ChatMessage] = []
for msg in raw:
if not isinstance(msg, dict):
continue
role = _map_role(msg.get("role"), MessageRole.USER)
parts = _map_parts(msg)
if parts:
chat_messages.append(ChatMessage(role=role, parts=parts, name=msg.get("name")))

if not chat_messages:
return None

return serialize_messages(InputMessages(messages=chat_messages))


def map_output_messages(messages_json: str) -> str | None:
"""Map a ``gen_ai.output.messages`` tag value to a serialized A365 JSON string.

Args:
messages_json: The raw JSON string from the span attribute.

Returns:
Serialized :class:`OutputMessages` JSON string, or ``None`` if the
input is empty or cannot be parsed.
"""
try:
raw = json.loads(messages_json)
except (json.JSONDecodeError, TypeError):
logger.debug("Failed to parse output messages JSON: %s", messages_json[:200])
return None

if not isinstance(raw, list):
return None

output_messages: list[OutputMessage] = []
for msg in raw:
if not isinstance(msg, dict):
continue
role = _map_role(msg.get("role"), MessageRole.ASSISTANT)
parts = _map_parts(msg)
finish_reason = msg.get("finish_reason")
if parts:
output_messages.append(
OutputMessage(role=role, parts=parts, finish_reason=finish_reason)
)

if not output_messages:
return None

return serialize_messages(OutputMessages(messages=output_messages))


# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------


def _map_role(role: str | None, default: MessageRole) -> MessageRole:
"""Map a raw role string to a :class:`MessageRole` enum."""
if not role:
return default
return _ROLE_MAP.get(role.lower(), default)


def _map_parts(msg: dict[str, Any]) -> list[MessagePart]:
"""Map all parts in a raw message dict."""
parts_data = msg.get("parts", [])
if not isinstance(parts_data, list):
return []
mapped = [_map_single_part(p) for p in parts_data if isinstance(p, dict)]
return [p for p in mapped if p is not None]


def _map_single_part(part: dict[str, Any]) -> MessagePart | None:
"""Map a single raw part dict to the appropriate A365 message part."""
part_type = part.get("type", "")

if part_type == "text":
content = part.get("content", "")
return TextPart(content=content) if content else None

if part_type == "reasoning":
content = part.get("content", "")
return ReasoningPart(content=content) if content else None

if part_type == "tool_call":
name = part.get("name")
if not name:
return None
return ToolCallRequestPart(
name=name,
id=part.get("id"),
arguments=_parse_arguments(part.get("arguments")),
)

if part_type == "tool_call_response":
return ToolCallResponsePart(
id=part.get("id"),
response=part.get("response"),
)

if part_type == "blob":
modality = part.get("modality", "")
content = part.get("content", "")
if not modality or not content:
return None
return BlobPart(modality=modality, content=content, mime_type=part.get("mime_type"))

if part_type == "file":
modality = part.get("modality", "")
file_id = part.get("file_id", "")
if not modality or not file_id:
return None
return FilePart(modality=modality, file_id=file_id, mime_type=part.get("mime_type"))

if part_type == "uri":
modality = part.get("modality", "")
uri = part.get("uri", "")
if not modality or not uri:
return None
return UriPart(modality=modality, uri=uri, mime_type=part.get("mime_type"))

# Fallback: GenericPart for unknown/future part types
data = {k: v for k, v in part.items() if k != "type"}
return GenericPart(type=part_type, data=data) if part_type else None


def _parse_arguments(
raw: dict[str, object] | list[object] | str | None,
) -> dict[str, object] | list[object] | str | None:
"""Return structured arguments when possible, keeping strings that fail JSON parsing.

If ``raw`` is already a ``dict`` or ``list``, it is returned as-is.
If ``raw`` is a ``str``, an attempt is made to parse it with :func:`json.loads`.
A successfully parsed ``dict`` or ``list`` is returned; any other parsed
value (scalar) or a parse failure returns the original string.
``None`` is returned unchanged.
"""
if raw is None or isinstance(raw, (dict, list)):
return raw
if isinstance(raw, str):
try:
parsed = json.loads(raw)
except (json.JSONDecodeError, ValueError):
logger.debug("Failed to parse tool call arguments as JSON: %s", raw[:200])
return raw
if isinstance(parsed, (dict, list)):
return parsed
return raw
return raw # pragma: no cover — unexpected type, pass through unchanged
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
# Licensed under the MIT License.

from microsoft_agents_a365.observability.core.constants import (
CHAT_OPERATION_NAME,
EXECUTE_TOOL_OPERATION_NAME,
GEN_AI_INPUT_MESSAGES_KEY,
GEN_AI_OPERATION_NAME_KEY,
GEN_AI_OUTPUT_MESSAGES_KEY,
GEN_AI_TOOL_ARGS_KEY,
GEN_AI_TOOL_CALL_RESULT_KEY,
Expand All @@ -12,33 +14,50 @@
from microsoft_agents_a365.observability.core.exporters.enriched_span import EnrichedReadableSpan
from opentelemetry.sdk.trace import ReadableSpan

from .utils import extract_input_content, extract_output_content
from .message_mapper import map_input_messages, map_output_messages

# Agent Framework specific attribute keys
AF_TOOL_CALL_ARGUMENTS_KEY = "gen_ai.tool.call.arguments"
AF_TOOL_CALL_RESULT_KEY = "gen_ai.tool.call.result"

_MESSAGE_OPERATIONS = {INVOKE_AGENT_OPERATION_NAME, CHAT_OPERATION_NAME}


def enrich_agent_framework_span(span: ReadableSpan) -> ReadableSpan:
"""Enricher function for Agent Framework spans.

For ``invoke_agent`` and ``chat`` operations, maps the raw
``gen_ai.input.messages`` / ``gen_ai.output.messages`` JSON arrays
to the A365 versioned format.

For ``execute_tool`` operations, maps Agent Framework tool attribute
keys to the A365 standard keys.
"""
Enricher function for Agent Framework spans.
"""
extra_attributes = {}
extra_attributes: dict[str, str] = {}
attributes = span.attributes or {}
operation = attributes.get(GEN_AI_OPERATION_NAME_KEY, "")

is_message_span = operation in _MESSAGE_OPERATIONS or span.name.startswith(
INVOKE_AGENT_OPERATION_NAME
)
is_tool_span = operation == EXECUTE_TOOL_OPERATION_NAME or span.name.startswith(
EXECUTE_TOOL_OPERATION_NAME
)

# Only extract content for invoke_agent spans
if span.name.startswith(INVOKE_AGENT_OPERATION_NAME):
# Extract all text content from input messages
if is_message_span:
input_messages = attributes.get(GEN_AI_INPUT_MESSAGES_KEY)
if input_messages:
extra_attributes[GEN_AI_INPUT_MESSAGES_KEY] = extract_input_content(input_messages)
mapped = map_input_messages(input_messages)
if mapped is not None:
extra_attributes[GEN_AI_INPUT_MESSAGES_KEY] = mapped

output_messages = attributes.get(GEN_AI_OUTPUT_MESSAGES_KEY)
if output_messages:
extra_attributes[GEN_AI_OUTPUT_MESSAGES_KEY] = extract_output_content(output_messages)
mapped = map_output_messages(output_messages)
if mapped is not None:
extra_attributes[GEN_AI_OUTPUT_MESSAGES_KEY] = mapped

# Map tool attributes for execute_tool spans
elif span.name.startswith(EXECUTE_TOOL_OPERATION_NAME):
elif is_tool_span:
if AF_TOOL_CALL_ARGUMENTS_KEY in attributes:
extra_attributes[GEN_AI_TOOL_ARGS_KEY] = attributes[AF_TOOL_CALL_ARGUMENTS_KEY]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@
class AgentFrameworkSpanProcessor(SpanProcessor):
"""SpanProcessor for Agent Framework.

Note: The span processing logic was removed as GEN_AI_EVENT_CONTENT is no longer used.
This processor is kept for interface compatibility.
Attribute mutation happens in the enricher (via :class:`EnrichedReadableSpan`)
because OTel Python ``ReadableSpan`` is immutable after ``on_end``.
The enricher is invoked at export time by the ``EnrichingSpanProcessor``.
"""

TOOL_CALL_RESULT_TAG = "gen_ai.tool.call.result"

def __init__(self, service_name: str | None = None):
self.service_name = service_name
super().__init__()
Expand All @@ -22,5 +21,9 @@ def on_start(self, span, parent_context):
pass

def on_end(self, span):
"""Called when a span ends. Intentionally a no-op."""
"""Called when a span ends. Intentionally a no-op.

Message mapping is handled by the span enricher at export time
since ReadableSpan is immutable in the Python OTel SDK.
"""
pass
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
# -----------------------------
# 3) The Instrumentor class
# -----------------------------
_instruments = ("agent-framework-azure-ai >= 1.0.0",)
_instruments = ("agent-framework >= 1.0.0",)


class AgentFrameworkInstrumentor(BaseInstrumentor):
Expand Down
Loading
Loading