diff --git a/contributing/samples/gepa/experiment.py b/contributing/samples/gepa/experiment.py index f3751206a8..2710c3894c 100644 --- a/contributing/samples/gepa/experiment.py +++ b/contributing/samples/gepa/experiment.py @@ -43,7 +43,6 @@ from tau_bench.types import EnvRunResult from tau_bench.types import RunConfig import tau_bench_agent as tau_bench_agent_lib - import utils diff --git a/contributing/samples/gepa/run_experiment.py b/contributing/samples/gepa/run_experiment.py index d857da9635..e31db15788 100644 --- a/contributing/samples/gepa/run_experiment.py +++ b/contributing/samples/gepa/run_experiment.py @@ -25,7 +25,6 @@ from absl import flags import experiment from google.genai import types - import utils _OUTPUT_DIR = flags.DEFINE_string( diff --git a/src/google/adk/agents/config_agent_utils.py b/src/google/adk/agents/config_agent_utils.py index b38e3fb3a6..2cf8d535c4 100644 --- a/src/google/adk/agents/config_agent_utils.py +++ b/src/google/adk/agents/config_agent_utils.py @@ -192,6 +192,13 @@ def resolve_code_reference(code_config: CodeConfig) -> Any: obj = getattr(module, obj_name) if code_config.args and callable(obj): + if not inspect.isclass(obj): + raise ValueError( + f"Code reference '{code_config.name}' is not a class constructor." + " Only class constructors may be invoked with 'args' in YAML config." + " Plain functions and built-ins cannot be called with args here." + " Remove 'args' from the config, or reference a class instead." + ) kwargs = {arg.name: arg.value for arg in code_config.args if arg.name} positional_args = [arg.value for arg in code_config.args if not arg.name] diff --git a/src/google/adk/cli/built_in_agents/utils/resolve_root_directory.py b/src/google/adk/cli/built_in_agents/utils/resolve_root_directory.py index ca7398733f..c840a96254 100644 --- a/src/google/adk/cli/built_in_agents/utils/resolve_root_directory.py +++ b/src/google/adk/cli/built_in_agents/utils/resolve_root_directory.py @@ -26,6 +26,33 @@ from .path_normalizer import sanitize_generated_file_path +def _validate_root_directory(root_directory: str) -> None: + """Validate that root_directory from session state is safe to use. + + Rejects values that could redirect file operations outside the project root. + + Args: + root_directory: The root_directory value from session state. + + Raises: + ValueError: If root_directory contains unsafe path components. + """ + if not root_directory: + return + if Path(root_directory).is_absolute(): + raise ValueError( + f'root_directory must be a relative path, got: {root_directory!r}' + ) + if any(c in root_directory for c in ['\x00', '\\']): + raise ValueError( + f'root_directory contains invalid characters: {root_directory!r}' + ) + if any(part == '..' for part in Path(root_directory).parts): + raise ValueError( + f"root_directory must not contain '..': {root_directory!r}" + ) + + def resolve_file_path( file_path: str, session_state: Optional[Dict[str, Any]] = None, @@ -43,32 +70,41 @@ def resolve_file_path( Returns: Resolved absolute Path object + + Raises: + ValueError: If the resolved path escapes the project root. """ normalized_path = sanitize_generated_file_path(file_path) file_path_obj = Path(normalized_path) - # If already absolute, use as-is - if file_path_obj.is_absolute(): - return file_path_obj - # Get root directory from session state, default to "./" - root_directory = "./" - if session_state and "root_directory" in session_state: - root_directory = session_state["root_directory"] + root_directory = './' + if session_state and 'root_directory' in session_state: + root_directory = session_state['root_directory'] + _validate_root_directory(root_directory) - # Use the same resolution logic as the main function + # Compute the resolved root as an absolute path root_path_obj = Path(root_directory) + if working_directory: + resolved_root = (Path(working_directory) / root_path_obj).resolve() + else: + resolved_root = (Path(os.getcwd()) / root_path_obj).resolve() - if root_path_obj.is_absolute(): - resolved_root = root_path_obj + # Resolve the candidate path + if file_path_obj.is_absolute(): + candidate = file_path_obj.resolve() else: - if working_directory: - resolved_root = Path(working_directory) / root_directory - else: - resolved_root = Path(os.getcwd()) / root_directory + candidate = (resolved_root / file_path_obj).resolve() + + # Enforce boundary: reject paths that escape the project root + try: + candidate.relative_to(resolved_root) + except ValueError as e: + raise ValueError( + f'Path {file_path!r} resolves outside project root {resolved_root!r}' + ) from e - # Resolve file path relative to root directory - return resolved_root / file_path_obj + return candidate def resolve_file_paths(