Skip to content

[Refactor] Unify agent loop output protocol on AgentMessage#1904

Merged
hhaAndroid merged 2 commits into
InternLM:mainfrom
Harold-lkk:lkk/agent-loop-refactor
Jun 11, 2026
Merged

[Refactor] Unify agent loop output protocol on AgentMessage#1904
hhaAndroid merged 2 commits into
InternLM:mainfrom
Harold-lkk:lkk/agent-loop-refactor

Conversation

@Harold-lkk

@Harold-lkk Harold-lkk commented Jun 10, 2026

Copy link
Copy Markdown
Member
  • Stage surfaces full AgentMessage dump as artifacts["response_message"]; judger / _fill_rollout_state read finish_info / content from it directly.
  • Move tool_turns counting into agent_in_localhost_loop (single source); trainer / evaluator just read extra_fields["agent_tool_turns"] and emit tool_turns/{mean,min,max} to tensorboard.
  • Trajectory dump: drop redundant fields (final_assistant_finish_reason, raw_prompt, lengths.input_tokens) and helper chain; messages/tools/tool_turns moved under agent.* sub-key. Switch to one-line JSONL records.
  • Drop _to_json_safe / _dump_jsonl_record wrappers; explicit model_dump only where needed (StageRecord). Remove dead isinstance(Tensor) branches.
  • Drop local _TokenBucket (use lagent.utils.rate_limiter.FairAsyncTokenBucket via API-level rate limiting in tools).

@Harold-lkk Harold-lkk marked this pull request as ready for review June 10, 2026 15:13
…]; judger / _fill_rollout_state read finish_info / content from it directly.

- Move tool_turns counting into agent_in_localhost_loop (single source); trainer / evaluator just read extra_fields[agent_tool_turns] and emit tool_turns/{mean,min,max} to tensorboard.
- Trajectory dump: drop redundant fields (final_assistant_finish_reason, raw_prompt, lengths.input_tokens) and helper chain; messages/tools/tool_turns moved under agent.* sub-key. Switch to one-line JSONL records.
- Drop _to_json_safe / _dump_jsonl_record wrappers; explicit model_dump only where needed (StageRecord). Remove dead isinstance(Tensor) branches.
- Rollout: add EADDRINUSE retry for session_server and worker launch; add sample_timeout_s; disable replay buffer checkpoint by default.
- Add asyncio_diagnostics module (USR1 task dump + asyncio_run watchdog) for prod deadlock troubleshooting.
- Drop local _TokenBucket (use lagent.utils.rate_limiter.FairAsyncTokenBucket via API-level rate limiting in tools).
@Harold-lkk Harold-lkk force-pushed the lkk/agent-loop-refactor branch from d6fcac7 to c21ce32 Compare June 10, 2026 15:14
@hhaAndroid

Copy link
Copy Markdown
Collaborator

@claude review

Comment on lines 45 to 62
try:
# reward_model stays as-is (dataset-provided ground_truth/style etc.).
# Per-rollout artifacts (response message, agent trace) flow through extra_fields.
reward_model = dict(item.reward_model or {})
segment = item.artifacts["messages"][-1]
response_message = item.artifacts.get("response_message") or {}
content = response_message.get("content")
response = content if isinstance(content, str) else (str(content) if content is not None else "")

messages = item.artifacts["messages"][-1]["messages"]
tool_turns = sum(
1 for message in messages if isinstance(message.get("tool_calls"), list) and message["tool_calls"]
)
reward_model.setdefault("agent_trace", messages)
reward_model.setdefault("num_turns", tool_turns)

response = str(item.artifacts.get("response") or "")
rollout_state = RolloutState(
message=[{"role": "user", "content": item.instruction}],
response=response,
reward_model=reward_model,
extra_fields={
"agent_messages": segment["messages"],
"response_message": response_message,
},
status=Status.COMPLETED,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude: Warning — Breaking change to judger contract.

Previously, reward_model.setdefault("agent_trace", messages) and reward_model.setdefault("num_turns", tool_turns) made conversation history available to custom judger implementations via rollout_state.reward_model. This PR moves that data to extra_fields["agent_messages"] and extra_fields["response_message"].

Any custom/external judger that reads rollout_state.reward_model["agent_trace"] or rollout_state.reward_model["num_turns"] will silently receive None (dict lookup miss) instead of the expected data, causing incorrect reward computation without raising an error.

Consider preserving the setdefault calls alongside the new extra_fields plumbing, or documenting this as a breaking protocol change in the PR description.

Comment on lines +155 to +157
)
return self._fail_rollout_state(
rollout_state,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude: Warningfinish_reason semantic expansion affects ProduceBatchResult.

Previously, finish_reason was either "stop" or "error". This PR introduces "timeout" as a third value (from the new asyncio.TimeoutError handler) and also propagates arbitrary values from response_message.get("finish_reason") — effectively making finish_reason open-ended.

Downstream consumers that switch on finish_reason == "error" to classify failures will no longer catch timeouts. If ProduceBatchResult accounting or retry logic depends on enumerating finish reasons, this is a behavioral change.

ProduceBatchResult impact: finish_reason can now be "timeout" or any string from the LLM response, changing how failed/completed samples are classified by downstream consumers.

Comment on lines +65 to +77
)


def _extract_reward_payload(item: AgentRolloutItem) -> dict[str, Any] | None:
for record in item.judgers.values():
reward = record.metadata.get("reward")
if isinstance(reward, dict):
payload = dict(reward)
if item.reward is not None:
payload.setdefault("score", item.reward)
return payload
if item.reward is not None:
return {"score": item.reward}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude: Warning_extract_reward_payload changes the shape of rollout_state.reward.

Previously: {"score": item.reward} (always single key).
Now: may include additional keys from record.metadata["reward"] (the full judger payload), plus "score" set via setdefault.

Downstream code that does data.reward["score"] will still work. But if any consumer iterates over data.reward keys or serializes the full dict (e.g. trajectory files, metrics), it will now see extra judger-specific keys. The trajectory dump already writes "reward_payload": data.reward, which captures this expanded dict — verify this is intentional and that downstream JSONL parsers tolerate variable-width reward dicts.

Comment on lines +51 to +53
response_message = output.model_dump(mode="json") if hasattr(output, "model_dump") else None
if response_message is None:
raise TypeError("Agent forward must return an AgentMessage-like object.")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude: Warning — Hard failure on non-AgentMessage output.

Previously, the stage was permissive — output.content if hasattr(output, "content") else output handled plain strings. Now anything without model_dump raises TypeError. If any configured agent returns a raw string or dict (e.g. a simpler agent or a mock in tests), this will crash the entire sample.

If backward-compatibility with non-AgentMessage outputs is needed, consider a softer fallback (e.g. constructing a minimal response_message dict from the raw output).

raise TypeError("Agent messages trace segment.messages must be a list.")
item.artifacts["messages"] = messages
result = StageResult(stdout=item.artifacts["response"], return_code=0)
content = response_message.get("content")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude: Nit — The content extraction pattern content if isinstance(content, str) else (str(content) if content is not None else "") appears 4 times across this PR (here, agent_in_localhost_loop.py:249 and :278, and judger.py:50). Consider a small helper to keep the semantics in one place.

Comment thread xtuner/v1/train/rl_trainer.py Outdated
@claude

claude Bot commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

Claude: ## Summary

Solid refactoring that unifies the agent loop output protocol around the full AgentMessage dump. The tool_turns single-source-of-truth, richer trajectory format, and per-sample timeout are good additions. A few contract changes warrant attention before merge.

Issues

Warning

  • [judger.py:45-62] Removal of reward_model.setdefault("agent_trace", ...) and setdefault("num_turns", ...) is a silent breaking change for any custom judger that reads those keys.
  • [agent_in_localhost_loop.py:155-157] finish_reason expands from a closed set (stop/error) to open-ended values (timeout, plus whatever LLM response carries). Downstream consumers may not handle this.
  • [agent_in_localhost_loop.py:65-77] _extract_reward_payload can now include extra judger keys beyond just score — verify downstream parsers tolerate this.
  • [stage.py:51-53] Hard TypeError for non-AgentMessage outputs removes backward-compat with agents that return plain strings.

Nit

  • Content extraction pattern duplicated 4x — consider a helper.
  • Inconsistent .get(key) vs .get(key, None) style in trajectory dicts.

ProduceBatchResult impact: finish_reason now accepts "timeout" and arbitrary response_message values. Does not change which samples land in COMPLETED vs FAILED status.
RoutedExperts impact: not affected.

Verdict

COMMENT — The protocol changes are intentional per the PR description but warrant explicit documentation of the breaking judger contract change. No correctness bugs found.

Comment thread xtuner/v1/train/rl_trainer.py Outdated
ground_truth = None
if data.reward_model is not None:
ground_truth = data.reward_model.get("ground_truth")
response_text_len = len(self.tokenizer.encode(response or "", add_special_tokens=False))

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

response_text_len 和 response_train_len 区别是啥,都是 id 长度吧?

message=[{"role": "user", "content": item.instruction}],
response=response,
reward_model=reward_model,
extra_fields={

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个信息 judge 需要吗?

"total_len": len(rewards),
}
json.dump(summary, f, ensure_ascii=False, indent=2)
json.dump(summary, f, ensure_ascii=False, separators=(",", ":"))

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为啥要改这个,改了后好难看呀

"status": data.status.value if hasattr(data.status, "value") else str(data.status),
"finish_reason": data.finish_reason,
"error_msg": data.error_msg,
"prompt": data.message,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

raw_prompt 要保留,用于确认对话模板是否正确

"total_len": len(rewards),
}
json.dump(summary, f, ensure_ascii=False, indent=2)
json.dump(summary, f, ensure_ascii=False, separators=(",", ":"))

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

也是不要改

Comment thread xtuner/v1/train/rl_trainer.py Outdated
"response_len": response_len,
"lengths": {
"num_tokens": data.num_tokens,
"response_train_tokens": response_train_len,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

既然都放到 lenths 里面了,那外面的response_len就去掉,而且 response_train_tokens 和 response_text_tokens区别是啥?我跑了下发现是一样的

- Drop redundant `lengths` nested struct: response_train_tokens duplicates
  response_len, response_text_tokens is misleading in agent loops (single_turn
  matches response_train but localhost only covers the last turn). Keep one
  definition: response_len = len(response_ids).
- Promote prompt length to top-level as `prompt_len` (was `lengths.num_tokens`,
  which actually held len(prompt_ids) per local_run.py).
- Unify second trajectory dump path with the first (response_len = len(response_ids)
  instead of encode(response); behaviour matches in eval where response_ids is
  None and _get_trajectory_response_ids falls back to encode(data.response)).
- Drop dead extra_fields["raw_prompt"] write in agent_in_localhost_loop and
  the matching dead .get("raw_prompt") read in the trajectory item — no
  reader remained after the earlier trajectory cleanup.
@hhaAndroid hhaAndroid merged commit a4bea0d into InternLM:main Jun 11, 2026
6 of 7 checks passed
braisedpork1964 pushed a commit to braisedpork1964/xtuner that referenced this pull request Jun 11, 2026
…#1904)

* - Stage surfaces full AgentMessage dump as artifacts[response_message]; judger / _fill_rollout_state read finish_info / content from it directly.
- Move tool_turns counting into agent_in_localhost_loop (single source); trainer / evaluator just read extra_fields[agent_tool_turns] and emit tool_turns/{mean,min,max} to tensorboard.
- Trajectory dump: drop redundant fields (final_assistant_finish_reason, raw_prompt, lengths.input_tokens) and helper chain; messages/tools/tool_turns moved under agent.* sub-key. Switch to one-line JSONL records.
- Drop _to_json_safe / _dump_jsonl_record wrappers; explicit model_dump only where needed (StageRecord). Remove dead isinstance(Tensor) branches.
- Rollout: add EADDRINUSE retry for session_server and worker launch; add sample_timeout_s; disable replay buffer checkpoint by default.
- Add asyncio_diagnostics module (USR1 task dump + asyncio_run watchdog) for prod deadlock troubleshooting.
- Drop local _TokenBucket (use lagent.utils.rate_limiter.FairAsyncTokenBucket via API-level rate limiting in tools).

* [Refactor] Simplify trajectory dump length fields

- Drop redundant `lengths` nested struct: response_train_tokens duplicates
  response_len, response_text_tokens is misleading in agent loops (single_turn
  matches response_train but localhost only covers the last turn). Keep one
  definition: response_len = len(response_ids).
- Promote prompt length to top-level as `prompt_len` (was `lengths.num_tokens`,
  which actually held len(prompt_ids) per local_run.py).
- Unify second trajectory dump path with the first (response_len = len(response_ids)
  instead of encode(response); behaviour matches in eval where response_ids is
  None and _get_trajectory_response_ids falls back to encode(data.response)).
- Drop dead extra_fields["raw_prompt"] write in agent_in_localhost_loop and
  the matching dead .get("raw_prompt") read in the trajectory item — no
  reader remained after the earlier trajectory cleanup.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants