Skip to content

Commit 73fc765

Browse files
danielmillerpclaude
andcommitted
feat(sdk): add webhook helper for forward-route handlers
Add agentex.lib.sdk.utils.webhooks.handle_webhook — a reusable helper an agent calls from a forward-route handler (@acp.post on the route the server's /agents/forward/name/{agent}/{path} ingress proxies to). It shapes the payload (generic or GitHub PR), resolves task params (inline or fetched from a config resolve URL for config-by-id), get-or-creates a task on a stable session key so repeat events fold into one task, drives the turn (sync message / async event), and returns/polls the reply. This keeps webhook triggering on the supported forward mechanism + its built-in GitHub/Slack signature auth, instead of a parallel ingress. Config-by-id is ingress-independent: point params_source at the platform's config-resolve endpoint. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 7e5be61 commit 73fc765

3 files changed

Lines changed: 487 additions & 2 deletions

File tree

Lines changed: 311 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,311 @@
1+
"""Drive an agent turn from an inbound webhook, inside a forward-route handler.
2+
3+
The Agentex server already exposes a webhook ingress: a request to
4+
``/agents/forward/name/{agent}/{path}`` is signature-verified (GitHub ``sha256=`` /
5+
Slack ``v0:`` HMAC via the agent's registered keys) and proxied to the agent's own
6+
HTTP route. This helper is what that route handler calls to turn the inbound payload
7+
into an agent turn — without each agent re-implementing payload shaping, config
8+
resolution, session continuity, and reply handling.
9+
10+
Typical use inside an agent::
11+
12+
from fastapi import Request
13+
from agentex.lib.sdk.utils.webhooks import handle_webhook
14+
15+
16+
@acp.post("/github-pr")
17+
async def github_pr(request: Request):
18+
body = await request.json()
19+
result = await handle_webhook(
20+
agent_name="my-agent",
21+
payload=body,
22+
acp_type="sync",
23+
shaper="github_pr",
24+
params_source="https://<host>/public/v5/agent_configs/<id>/resolve",
25+
params_source_headers={"x-api-key": ..., "x-selected-account-id": ...},
26+
wait=True,
27+
)
28+
return {"task_id": result.task_id, "reply": result.reply}
29+
30+
Config-by-id: pass ``params_source`` pointing at the platform's config-resolve
31+
endpoint; the resolved params (e.g. system_prompt / harness / model / tools) are
32+
forwarded opaquely to ``task/create``. Or pass inline ``params`` for a one-off.
33+
"""
34+
35+
from __future__ import annotations
36+
37+
import json
38+
import hashlib
39+
from typing import Any, Literal
40+
from dataclasses import field, dataclass
41+
from collections.abc import Callable, Awaitable
42+
43+
from agentex.lib import adk
44+
from agentex.lib.utils.logging import make_logger
45+
from agentex.types.task_message_content import TextContent
46+
47+
logger = make_logger(__name__)
48+
49+
# Injectable params fetcher (url -> JSON). Default uses httpx; tests inject a fake.
50+
ParamsFetcher = Callable[[str], Awaitable[dict[str, Any]]]
51+
52+
MAX_BODY_CHARS = 4000
53+
MAX_DIFF_CHARS = 30000
54+
55+
56+
class WebhookError(RuntimeError):
57+
"""Raised when a webhook turn cannot be driven (e.g. params resolution failed)."""
58+
59+
60+
@dataclass
61+
class WebhookResult:
62+
task_id: str
63+
# Sync agents reply inline. For async agents, ``reply`` is None unless ``wait`` was
64+
# set, in which case it is the polled reply (or None if it didn't settle in time).
65+
reply: str | None = None
66+
task_metadata: dict[str, str] = field(default_factory=dict)
67+
68+
69+
# --------------------------------------------------------------------------- shaping
70+
71+
72+
def session_key(agent_name: str, channel: str, peer_id: str) -> str:
73+
"""Stable per-conversation task name → reused for get-or-create on task/create, so
74+
repeat events from the same source fold into one task instead of spawning new ones."""
75+
basis = peer_id or "main"
76+
digest = hashlib.sha1(f"{agent_name}:{channel}:{basis}".encode()).hexdigest()[:16]
77+
return f"wh-{channel}-{digest}"
78+
79+
80+
def render_generic(body: dict[str, Any]) -> str:
81+
"""Generic payload → prompt text: first of text/message/goal/prompt, else raw JSON."""
82+
for key in ("text", "message", "goal", "prompt"):
83+
value = body.get(key)
84+
if isinstance(value, str) and value.strip():
85+
return value.strip()
86+
return json.dumps(body, indent=2)[:8000]
87+
88+
89+
def shape_github_pr(body: dict[str, Any]) -> tuple[str, str | None, str]:
90+
"""Shape a GitHub/Gitea pull-request webhook into (prompt, peer_id, sender).
91+
92+
``peer_id`` is ``repo#number`` so repeated events for the same PR (opened,
93+
synchronize, ...) fold into one task. Falls back to generic rendering for non-PR
94+
payloads (ping, issue, ...).
95+
"""
96+
pull_request = body.get("pull_request")
97+
if not isinstance(pull_request, dict):
98+
return render_generic(body), None, _github_actor(body)
99+
100+
repo = _repo_full_name(body)
101+
number = pull_request.get("number")
102+
title = (pull_request.get("title") or "").strip()
103+
action = (body.get("action") or "").strip()
104+
description = (pull_request.get("body") or "").strip()
105+
html_url = pull_request.get("html_url") or pull_request.get("url")
106+
107+
header = "Pull request"
108+
if repo and number is not None:
109+
header = f"Pull request {repo}#{number}"
110+
elif number is not None:
111+
header = f"Pull request #{number}"
112+
113+
lines = [f"{header}: {title}" if title else header]
114+
if action:
115+
lines.append(f"Action: {action}")
116+
if html_url:
117+
lines.append(f"URL: {html_url}")
118+
if description:
119+
lines.extend(["", "Description:", description[:MAX_BODY_CHARS]])
120+
121+
diff = _inline_diff(body, pull_request)
122+
if diff:
123+
lines.extend(["", "Diff:", diff[:MAX_DIFF_CHARS]])
124+
125+
peer_id = None
126+
if repo and number is not None:
127+
peer_id = f"{repo}#{number}"
128+
elif number is not None:
129+
peer_id = f"pr#{number}"
130+
return "\n".join(lines), peer_id, _github_actor(body)
131+
132+
133+
def _repo_full_name(body: dict[str, Any]) -> str | None:
134+
repo = body.get("repository")
135+
if isinstance(repo, dict) and isinstance(repo.get("full_name"), str):
136+
return repo["full_name"] or None
137+
return None
138+
139+
140+
def _github_actor(body: dict[str, Any]) -> str:
141+
sender = body.get("sender")
142+
if isinstance(sender, dict) and isinstance(sender.get("login"), str) and sender["login"]:
143+
return sender["login"]
144+
return "webhook"
145+
146+
147+
def _inline_diff(body: dict[str, Any], pull_request: dict[str, Any]) -> str | None:
148+
for source in (body, pull_request):
149+
diff = source.get("diff")
150+
if isinstance(diff, str) and diff.strip():
151+
return diff.strip()
152+
return None
153+
154+
155+
# ------------------------------------------------------------------- params resolution
156+
157+
158+
async def _default_fetch(url: str, headers: dict[str, str]) -> dict[str, Any]:
159+
"""GET a params source over HTTP. Imported lazily so callers that only pass inline
160+
params carry no httpx dependency."""
161+
import httpx
162+
163+
request_headers = {"accept": "application/json", **headers}
164+
try:
165+
async with httpx.AsyncClient(timeout=30.0) as client:
166+
response = await client.get(url, headers=request_headers)
167+
response.raise_for_status()
168+
return response.json()
169+
except httpx.HTTPError as exc:
170+
raise WebhookError(f"params source request failed: {exc}") from exc
171+
except ValueError as exc: # json.JSONDecodeError subclasses ValueError
172+
raise WebhookError(f"params source returned invalid JSON: {exc}") from exc
173+
174+
175+
async def resolve_remote_params(
176+
url: str,
177+
headers: dict[str, str] | None = None,
178+
*,
179+
fetch: ParamsFetcher | None = None,
180+
) -> tuple[dict[str, Any], dict[str, str]]:
181+
"""Fetch params (+ optional task_metadata) from a config-resolve URL.
182+
183+
Response shape (lenient)::
184+
185+
{"params": {...}, "task_metadata": {...}}
186+
187+
A bare object with no ``params`` key is treated as the params dict itself (minus a
188+
top-level ``task_metadata``, which is returned separately for stamping).
189+
"""
190+
do_fetch = fetch or (lambda u: _default_fetch(u, headers or {}))
191+
payload = await do_fetch(url)
192+
if not isinstance(payload, dict):
193+
raise WebhookError("params source returned a non-object response")
194+
195+
metadata_raw = payload.get("task_metadata")
196+
task_metadata = {str(k): str(v) for k, v in metadata_raw.items()} if isinstance(metadata_raw, dict) else {}
197+
params = payload.get("params")
198+
if not isinstance(params, dict):
199+
params = {k: v for k, v in payload.items() if k != "task_metadata"}
200+
return params, task_metadata
201+
202+
203+
# ------------------------------------------------------------------------- dispatch
204+
205+
206+
def _agent_reply_text(messages: object) -> str | None:
207+
"""Join agent-authored text from a message list (sync result or polled stream)."""
208+
if not isinstance(messages, list):
209+
return None
210+
parts = []
211+
for message in messages:
212+
content = getattr(message, "content", None)
213+
if (
214+
content is not None
215+
and getattr(content, "author", None) == "agent"
216+
and getattr(content, "type", None) == "text"
217+
):
218+
text = (getattr(content, "content", "") or "").strip()
219+
if text:
220+
parts.append(text)
221+
return "\n\n".join(parts) if parts else None
222+
223+
224+
async def handle_webhook(
225+
*,
226+
agent_name: str,
227+
payload: dict[str, Any],
228+
acp_type: Literal["sync", "async"] = "sync",
229+
shaper: Literal["generic", "github_pr"] = "generic",
230+
channel: str | None = None,
231+
params: dict[str, Any] | None = None,
232+
params_source: str | None = None,
233+
params_source_headers: dict[str, str] | None = None,
234+
peer_id: str | None = None,
235+
extra_task_metadata: dict[str, str] | None = None,
236+
wait: bool = False,
237+
fetch: ParamsFetcher | None = None,
238+
) -> WebhookResult:
239+
"""Drive an agent turn from a webhook payload, agent-side, via the ADK client.
240+
241+
- Shapes the payload (generic or GitHub PR) into a prompt + conversation scope.
242+
- Resolves task params: inline ``params``, or fetched from ``params_source``
243+
(config-by-id). The platform never interprets params — they're forwarded to the
244+
agent as ``task/create`` params.
245+
- Get-or-creates a task keyed on a stable session key, so repeat events fold in.
246+
- Sends the turn (sync → message/send returns the reply inline; async → event/send,
247+
with optional ``wait`` to poll for the reply).
248+
"""
249+
channel = channel or shaper
250+
if shaper == "github_pr":
251+
text, derived_peer, sender = shape_github_pr(payload)
252+
peer_id = peer_id or derived_peer
253+
else:
254+
text, sender = render_generic(payload), "webhook"
255+
256+
task_metadata: dict[str, str] = {"channel": channel, "sender_id": sender}
257+
if peer_id:
258+
task_metadata["peer_id"] = peer_id
259+
260+
resolved_params = dict(params) if params else {}
261+
if params_source:
262+
resolved_params, source_metadata = await resolve_remote_params(
263+
params_source, params_source_headers, fetch=fetch
264+
)
265+
# Source metadata + caller extras never override the canonical fields above.
266+
for key, value in {**source_metadata, **(extra_task_metadata or {})}.items():
267+
task_metadata.setdefault(key, str(value))
268+
elif extra_task_metadata:
269+
for key, value in extra_task_metadata.items():
270+
task_metadata.setdefault(key, str(value))
271+
272+
name = session_key(agent_name, channel, peer_id or "")
273+
task = await adk.acp.create_task(
274+
name=name,
275+
agent_name=agent_name,
276+
params=resolved_params or None,
277+
request={"task_metadata": task_metadata} if task_metadata else None,
278+
)
279+
280+
content = TextContent(author="user", content=text, format="markdown")
281+
282+
if acp_type == "sync":
283+
messages = await adk.acp.send_message(task_id=task.id, agent_name=agent_name, content=content)
284+
return WebhookResult(task_id=task.id, reply=_agent_reply_text(messages), task_metadata=task_metadata)
285+
286+
await adk.acp.send_event(task_id=task.id, agent_name=agent_name, content=content)
287+
reply = await _await_reply(task.id) if wait else None
288+
return WebhookResult(task_id=task.id, reply=reply, task_metadata=task_metadata)
289+
290+
291+
async def _await_reply(
292+
task_id: str, *, timeout_s: float = 120.0, interval_s: float = 2.0, quiescence_s: float = 6.0
293+
) -> str | None:
294+
"""Poll a task's messages for the agent's reply until it settles or times out."""
295+
import asyncio
296+
297+
waited = 0.0
298+
last: str | None = None
299+
stable_for = 0.0
300+
while waited < timeout_s:
301+
await asyncio.sleep(interval_s)
302+
waited += interval_s
303+
messages = await adk.messages.list(task_id=task_id)
304+
text = _agent_reply_text(messages)
305+
if text and text == last:
306+
stable_for += interval_s
307+
if stable_for >= quiescence_s:
308+
return text
309+
elif text:
310+
last, stable_for = text, 0.0
311+
return last

0 commit comments

Comments
 (0)