diff --git a/contributing/samples/gepa/experiment.py b/contributing/samples/gepa/experiment.py index f3751206a8..2710c3894c 100644 --- a/contributing/samples/gepa/experiment.py +++ b/contributing/samples/gepa/experiment.py @@ -43,7 +43,6 @@ from tau_bench.types import EnvRunResult from tau_bench.types import RunConfig import tau_bench_agent as tau_bench_agent_lib - import utils diff --git a/contributing/samples/gepa/run_experiment.py b/contributing/samples/gepa/run_experiment.py index d857da9635..e31db15788 100644 --- a/contributing/samples/gepa/run_experiment.py +++ b/contributing/samples/gepa/run_experiment.py @@ -25,7 +25,6 @@ from absl import flags import experiment from google.genai import types - import utils _OUTPUT_DIR = flags.DEFINE_string( diff --git a/src/google/adk/agents/parallel_agent.py b/src/google/adk/agents/parallel_agent.py index cb8b09f655..dbc8cbec38 100644 --- a/src/google/adk/agents/parallel_agent.py +++ b/src/google/adk/agents/parallel_agent.py @@ -145,6 +145,7 @@ async def process_an_agent(events_for_one_agent): finally: for task in tasks: task.cancel() + await asyncio.gather(*tasks, return_exceptions=True) class ParallelAgent(BaseAgent): diff --git a/src/google/adk/flows/llm_flows/functions.py b/src/google/adk/flows/llm_flows/functions.py index eda8474c01..f9393c4986 100644 --- a/src/google/adk/flows/llm_flows/functions.py +++ b/src/google/adk/flows/llm_flows/functions.py @@ -68,6 +68,13 @@ _TOOL_THREAD_POOLS: dict[int, ThreadPoolExecutor] = {} _TOOL_THREAD_POOL_LOCK = threading.Lock() +# Sentinel object used to distinguish a FunctionTool that legitimately returns +# None from a non-FunctionTool sync tool that skips thread-pool execution. +# Using None as a sentinel would cause tools whose underlying function has no +# explicit return statement (implicit None) to fall through to the async +# fallback path and execute a second time. +_SYNC_TOOL_RESULT_UNSET = object() + def _is_live_request_queue_annotation(param: inspect.Parameter) -> bool: """Check whether a parameter is annotated as LiveRequestQueue. @@ -159,13 +166,14 @@ def run_sync_tool(): } return tool.func(**args_to_call) else: - # For other sync tool types, we can't easily run them in thread pool - return None + # For other sync tool types, we can't easily run them in thread pool. + # Return the sentinel so the caller knows to fall back to run_async. + return _SYNC_TOOL_RESULT_UNSET result = await loop.run_in_executor( executor, lambda: ctx.run(run_sync_tool) ) - if result is not None: + if result is not _SYNC_TOOL_RESULT_UNSET: return result else: # For async tools, run them in a new event loop in a background thread. diff --git a/tests/unittests/agents/test_parallel_agent.py b/tests/unittests/agents/test_parallel_agent.py index cad1ce3a83..305911c100 100644 --- a/tests/unittests/agents/test_parallel_agent.py +++ b/tests/unittests/agents/test_parallel_agent.py @@ -20,6 +20,7 @@ from google.adk.agents.base_agent import BaseAgent from google.adk.agents.base_agent import BaseAgentState from google.adk.agents.invocation_context import InvocationContext +from google.adk.agents.parallel_agent import _merge_agent_run_pre_3_11 from google.adk.agents.parallel_agent import ParallelAgent from google.adk.agents.sequential_agent import SequentialAgent from google.adk.agents.sequential_agent import SequentialAgentState @@ -373,3 +374,38 @@ async def test_stop_agent_if_sub_agent_fails( async for _ in agen: # The infinite agent could iterate a few times depending on scheduling. pass + + +async def _slow_agent_with_cleanup_delay(): + """Async generator that sleeps in its finally block to simulate cleanup.""" + try: + await asyncio.sleep(10) + yield 'slow-event' + finally: + await asyncio.sleep(0.05) + + +async def _failing_agent(): + """Async generator that raises after a short delay.""" + await asyncio.sleep(0.01) + raise ValueError('simulated sub-agent failure') + yield # pragma: no cover + + +@pytest.mark.asyncio +async def test_merge_agent_run_pre_3_11_no_aclose_error_on_failure(): + """Regression test for Python 3.10 RuntimeError: aclose() already running. + + _merge_agent_run_pre_3_11 must await all cancelled tasks before returning so + that generators are fully released before the caller invokes aclose() on them. + """ + agent_runs = [_slow_agent_with_cleanup_delay(), _failing_agent()] + + with pytest.raises(ValueError, match='simulated sub-agent failure'): + async for _ in _merge_agent_run_pre_3_11(agent_runs): + pass + + # If tasks were not properly awaited, aclose() on a still-running generator + # would raise RuntimeError here. + for agen in agent_runs: + await agen.aclose() diff --git a/tests/unittests/flows/llm_flows/test_functions_thread_pool.py b/tests/unittests/flows/llm_flows/test_functions_thread_pool.py index e134a555c1..f129d7c3c3 100644 --- a/tests/unittests/flows/llm_flows/test_functions_thread_pool.py +++ b/tests/unittests/flows/llm_flows/test_functions_thread_pool.py @@ -277,6 +277,56 @@ def blocking_sleep() -> dict: event_loop_ticks >= 5 ), f'Event loop should have ticked at least 5 times, got {event_loop_ticks}' + @pytest.mark.asyncio + @pytest.mark.parametrize( + 'return_value,use_implicit_return', + [ + (None, True), # implicit None (no return statement) + (None, False), # explicit `return None` + (0, False), # falsy int + ('', False), # falsy str + ({}, False), # falsy dict + (False, False), # falsy bool + ], + ) + async def test_sync_tool_falsy_return_executes_exactly_once( + self, return_value, use_implicit_return + ): + """FunctionTools returning None or other falsy values must execute exactly once. + + Regression test for https://github.com/google/adk-python/issues/5284. + Previously, a None return was mistaken for the internal sentinel used to + signal 'non-FunctionTool, fall back to run_async', causing a second + invocation. The fix uses an identity-based sentinel so that None and other + falsy values (0, '', {}, False) are treated as valid results. + """ + call_count = 0 + + def sync_func(): + nonlocal call_count + call_count += 1 + if not use_implicit_return: + return return_value + # implicit None — no return statement + + tool = FunctionTool(sync_func) + model = testing_utils.MockModel.create(responses=[]) + agent = Agent(name='test_agent', model=model, tools=[tool]) + invocation_context = await testing_utils.create_invocation_context( + agent=agent, user_content='' + ) + tool_context = ToolContext( + invocation_context=invocation_context, + function_call_id='test_id', + ) + + result = await _call_tool_in_thread_pool(tool, {}, tool_context) + + assert result == return_value + assert ( + call_count == 1 + ), f'Tool function executed {call_count} time(s); expected exactly 1.' + @pytest.mark.asyncio async def test_sync_tool_exception_propagates(self): """Test that exceptions from sync tools propagate correctly."""