diff --git a/src/google/adk/cli/cli_deploy.py b/src/google/adk/cli/cli_deploy.py index bda96497b0..15fd7af44f 100644 --- a/src/google/adk/cli/cli_deploy.py +++ b/src/google/adk/cli/cli_deploy.py @@ -99,33 +99,7 @@ def _ensure_agent_engine_dependency(requirements_txt_path: str) -> None: EXPOSE {port} -CMD adk {command} --port={port} {host_option} {service_option} {trace_to_cloud_option} {otel_to_cloud_option} {allow_origins_option} {a2a_option} {trigger_sources_option} "/app/agents" -""" - -_AGENT_ENGINE_APP_TEMPLATE: Final[str] = """ -import os -import vertexai -from vertexai.agent_engines import AdkApp - -if {is_config_agent}: - from google.adk.agents import config_agent_utils - config_path = os.path.join(os.path.dirname(__file__), "root_agent.yaml") - root_agent = config_agent_utils.from_config(config_path) -else: - from .agent import {adk_app_object} - -if {express_mode}: # Whether or not to use Express Mode - vertexai.init(api_key=os.environ.get("GOOGLE_API_KEY")) -else: - vertexai.init( - project=os.environ.get("GOOGLE_CLOUD_PROJECT"), - location=os.environ.get("GOOGLE_CLOUD_LOCATION"), - ) - -adk_app = AdkApp( - {adk_app_type}={adk_app_object}, - enable_tracing={trace_to_cloud_option}, -) +CMD adk {command} --port={port} {host_option} {service_option} {trace_to_cloud_option} {otel_to_cloud_option} {allow_origins_option} {a2a_option} {trigger_sources_option} {gemini_enterprise_option} "/app/agents" """ _AGENT_ENGINE_CLASS_METHODS = [ @@ -408,6 +382,13 @@ def _ensure_agent_engine_dependency(requirements_txt_path: str) -> None: ] +def _resolve_adk_version() -> str: + """Returns the default ADK version.""" + from google.adk.version import __version__ + + return __version__ + + def _resolve_project(project_in_option: Optional[str]) -> str: if project_in_option: return project_in_option @@ -740,6 +721,7 @@ def to_cloud_run( host_option=host_option, a2a_option=a2a_option, trigger_sources_option=trigger_sources_option, + gemini_enterprise_option='', ) dockerfile_path = os.path.join(temp_folder, 'Dockerfile') os.makedirs(temp_folder, exist_ok=True) @@ -830,7 +812,7 @@ def to_agent_engine( *, agent_folder: str, temp_folder: Optional[str] = None, - adk_app: str, + adk_app: Optional[str] = None, staging_bucket: Optional[str] = None, trace_to_cloud: Optional[bool] = None, otel_to_cloud: Optional[bool] = None, @@ -846,6 +828,9 @@ def to_agent_engine( env_file: Optional[str] = None, agent_engine_config_file: Optional[str] = None, skip_agent_import_validation: bool = True, + trigger_sources: Optional[str] = None, + artifact_service_uri: Optional[str] = None, + adk_version: Optional[str] = None, ): """Deploys an agent to Vertex AI Agent Engine. @@ -853,44 +838,31 @@ def to_agent_engine( - __init__.py - agent.py - - .py (optional, for customization; will be autogenerated otherwise) - requirements.txt (optional, for additional dependencies) - .env (optional, for environment variables) - ... (other required source files) - The contents of `adk_app` should look something like: - - ``` - from agent import - from vertexai.agent_engines import AdkApp - - adk_app = AdkApp( - agent=, # or `app=` - ) - ``` - Args: agent_folder (str): The folder (absolute path) containing the agent source code. temp_folder (str): The temp folder for the generated Agent Engine source files. It will be replaced with the generated files if it already exists. - adk_app (str): The name of the file (without .py) containing the AdkApp - instance. + adk_app (str): Deprecated. This argument is no longer required or used. staging_bucket (str): Deprecated. This argument is no longer required or used. - trace_to_cloud (bool): Whether to enable Cloud Trace. + trace_to_cloud (bool): Deprecated. This argument is no longer required or + used. otel_to_cloud (bool): Whether to enable exporting OpenTelemetry signals to Google Cloud. api_key (str): Optional. The API key to use for Express Mode. If not provided, the API key from the GOOGLE_API_KEY environment variable will be used. It will only be used if GOOGLE_GENAI_USE_VERTEXAI is true. - adk_app_object (str): Optional. The Python object corresponding to the root - ADK agent or app. Defaults to `root_agent` if not specified. + adk_app_object (str): Deprecated. This argument is no longer required or + used. agent_engine_id (str): Optional. The ID of the Agent Engine instance to update. If not specified, a new Agent Engine instance will be created. - absolutize_imports (bool): Optional. Default is True. Whether to absolutize - imports. If True, all relative imports will be converted to absolute - import statements. + absolutize_imports (bool): Deprecated. This argument is no longer required + or used. project (str): Optional. Google Cloud project id for the deployed agent. If not specified, the project from the `GOOGLE_CLOUD_PROJECT` environment variable will be used. It will be ignored if `api_key` is specified. @@ -899,9 +871,8 @@ def to_agent_engine( variable will be used. It will be ignored if `api_key` is specified. display_name (str): Optional. The display name of the Agent Engine. description (str): Optional. The description of the Agent Engine. - requirements_file (str): Optional. The filepath to the `requirements.txt` - file to use. If not specified, the `requirements.txt` file in the - `agent_folder` will be used. + requirements_file (str): Deprecated. This argument is no longer required or + used. env_file (str): Optional. The filepath to the `.env` file for environment variables. If not specified, the `.env` file in the `agent_folder` will be used. The values of `GOOGLE_CLOUD_PROJECT` and `GOOGLE_CLOUD_LOCATION` @@ -909,21 +880,33 @@ def to_agent_engine( agent_engine_config_file (str): The filepath to the agent engine config file to use. If not specified, the `.agent_engine_config.json` file in the `agent_folder` will be used. - skip_agent_import_validation (bool): Optional. Default is True. If True, - skip the pre-deployment import validation of `agent.py`. This can be - useful when the local environment does not have the same dependencies as - the deployment environment. + skip_agent_import_validation (bool): Deprecated. This argument is no longer + required or used. + trigger_sources (str): Optional. Comma-separated list of trigger sources to + enable (e.g., 'pubsub,eventarc'). Registers /trigger/* endpoints for + batch and event-driven agent invocations. + artifact_service_uri (str): Optional. The URI of the artifact service. + adk_version (str): Optional. The ADK version to use in Agent Engine + deployment. If not specified, the version in the dev environment will be + used. """ app_name = os.path.basename(agent_folder) display_name = display_name or app_name parent_folder = os.path.dirname(agent_folder) - adk_app_object = adk_app_object or 'root_agent' - if adk_app_object not in ['root_agent', 'app']: - click.echo( - f'Invalid adk_app_object: {adk_app_object}. Please use "root_agent"' - ' or "app".' + if adk_app_object: + warnings.warn( + 'WARNING: `--adk_app_object` is deprecated and will be removed in the' + ' future. Please drop it from the list of arguments.', + DeprecationWarning, + stacklevel=2, + ) + if adk_app: + warnings.warn( + 'WARNING: `adk_app` is deprecated and will be removed in a future' + ' release. Please drop it from the list of arguments.', + DeprecationWarning, + stacklevel=2, ) - return if staging_bucket: warnings.warn( 'WARNING: `staging_bucket` is deprecated and will be removed in a' @@ -931,6 +914,9 @@ def to_agent_engine( DeprecationWarning, stacklevel=2, ) + if not adk_version: + adk_version = _resolve_adk_version() + click.echo(f'Using default ADK version: {adk_version}') original_cwd = os.getcwd() did_change_cwd = False @@ -943,7 +929,7 @@ def to_agent_engine( did_change_cwd = True tmp_app_name = app_name + '_tmp' + datetime.now().strftime('%Y%m%d_%H%M%S') temp_folder = temp_folder or tmp_app_name - agent_src_path = os.path.join(parent_folder, temp_folder) + agent_src_path = os.path.join(parent_folder, temp_folder, 'agents', app_name) click.echo(f'Staging all files in: {agent_src_path}') # remove agent_src_path if it exists if os.path.exists(agent_src_path): @@ -966,6 +952,7 @@ def to_agent_engine( ignore=ignore_patterns, dirs_exist_ok=True, ) + os.chdir(os.path.join(parent_folder, temp_folder)) click.echo('Copying agent source code complete.') project = _resolve_project(project) @@ -1002,30 +989,20 @@ def to_agent_engine( ) agent_config['description'] = description - requirements_txt_path = os.path.join(agent_src_path, 'requirements.txt') if requirements_file: - if os.path.exists(requirements_txt_path): - click.echo( - f'Overwriting {requirements_txt_path} with {requirements_file}' - ) - shutil.copyfile(requirements_file, requirements_txt_path) - elif 'requirements_file' in agent_config: - if os.path.exists(requirements_txt_path): - click.echo( - f'Overwriting {requirements_txt_path} with' - f' {agent_config["requirements_file"]}' - ) - shutil.copyfile(agent_config['requirements_file'], requirements_txt_path) - else: - # Attempt to read requirements from requirements.txt in the dir (if any). - if not os.path.exists(requirements_txt_path): - click.echo(f'Creating {requirements_txt_path}...') - with open(requirements_txt_path, 'w', encoding='utf-8') as f: - f.write(_AGENT_ENGINE_REQUIREMENT + '\n') - click.echo(f'Created {requirements_txt_path}') - _ensure_agent_engine_dependency(requirements_txt_path) - agent_config['requirements_file'] = f'{temp_folder}/requirements.txt' - + warnings.warn( + 'WARNING: `--requirements_file` is deprecated and will be removed in' + ' the future. Please define `requirements.txt` in the agent folder.', + DeprecationWarning, + stacklevel=2, + ) + if trace_to_cloud: + warnings.warn( + 'WARNING: `--trace_to_cloud` is deprecated and will be removed in the' + ' future. Please use `--otel_to_cloud` instead.', + DeprecationWarning, + stacklevel=2, + ) env_vars = {} if not env_file: # Attempt to read the env variables from .env in the dir (if any). @@ -1094,92 +1071,98 @@ def to_agent_engine( from ..utils._google_client_headers import get_tracking_headers + http_options = {'headers': get_tracking_headers()} if project and region: - click.echo('Initializing Vertex AI...') + click.echo('Initializing Client with project and region...') client = vertexai.Client( project=project, location=region, - http_options={'headers': get_tracking_headers()}, + http_options=http_options, ) elif api_key: - click.echo('Initializing Vertex AI in Express Mode with API key...') - client = vertexai.Client( - api_key=api_key, http_options={'headers': get_tracking_headers()} - ) + click.echo('Initializing Client with Express Mode API key...') + client = vertexai.Client(api_key=api_key, http_options=http_options) else: click.echo( 'No project/region or api_key provided. ' 'Please specify either project/region or api_key.' ) return - click.echo('Vertex AI initialized.') - - is_config_agent = False - config_root_agent_file = os.path.join(agent_src_path, 'root_agent.yaml') - if os.path.exists(config_root_agent_file): - click.echo(f'Config agent detected: {config_root_agent_file}') - is_config_agent = True - - # Validate that the agent module can be imported before deployment. - if not skip_agent_import_validation: - click.echo('Validating agent module...') - _validate_agent_import(agent_src_path, adk_app_object, is_config_agent) - - adk_app_file = os.path.join(temp_folder, f'{adk_app}.py') - if adk_app_object == 'root_agent': - adk_app_type = 'agent' - elif adk_app_object == 'app': - adk_app_type = 'app' - else: - click.echo( - f'Invalid adk_app_object: {adk_app_object}. Please use "root_agent"' - ' or "app".' + + if skip_agent_import_validation: + warnings.warn( + 'WARNING: `--skip-agent-import-validation` is deprecated and will be' + ' removed in the future. Please drop it from the list of arguments.', + DeprecationWarning, + stacklevel=2, ) - return - with open(adk_app_file, 'w', encoding='utf-8') as f: - f.write( - _AGENT_ENGINE_APP_TEMPLATE.format( - app_name=app_name, - trace_to_cloud_option=trace_to_cloud, - is_config_agent=is_config_agent, - agent_folder=f'./{temp_folder}', - adk_app_object=adk_app_object, - adk_app_type=adk_app_type, - express_mode=api_key is not None, - ) + + def create_dockerfile_for_agent_engine(resource_name: str): + requirements_txt_path = os.path.join(agent_src_path, 'requirements.txt') + install_agent_deps = ( + f'RUN pip install -r "/app/agents/{app_name}/requirements.txt"' + if os.path.exists(requirements_txt_path) + else '# No requirements.txt found.' ) - click.echo(f'Created {adk_app_file}') - click.echo('Files and dependencies resolved') + trigger_sources_option = ( + f'--trigger_sources={trigger_sources}' if trigger_sources else '' + ) + agent_engine_uri = f'agentengine://{resource_name}' + dockerfile_content = _DOCKERFILE_TEMPLATE.format( + gcp_project_id=project, + gcp_region=region, + app_name=app_name, + port=8080, + command='api_server', + install_agent_deps=install_agent_deps, + service_option=_get_service_option_by_adk_version( + adk_version, + agent_engine_uri, # session_service_uri + artifact_service_uri, + agent_engine_uri, # memory_service_uri + False, # use_local_storage + ), + trace_to_cloud_option='--trace_to_cloud' if trace_to_cloud else '', + otel_to_cloud_option='--otel_to_cloud' if otel_to_cloud else '', + allow_origins_option='', # Not supported for now. + adk_version=adk_version, + host_option='--host=0.0.0.0', + a2a_option='--a2a', + trigger_sources_option=trigger_sources_option, + gemini_enterprise_option=f'--gemini_enterprise_app_name={app_name}', + ) + with open('Dockerfile', 'w', encoding='utf-8') as f: + f.write(dockerfile_content) + if absolutize_imports: - click.echo( - 'Agent Engine deployments have switched to source-based deployment, ' - 'so it is no longer necessary to absolutize imports.' + warnings.warn( + 'WARNING: `--absolutize_imports` is deprecated and will be removed' + ' in the future. Please drop it from the list of arguments.', + DeprecationWarning, + stacklevel=2, ) click.echo('Deploying to agent engine...') - agent_config['entrypoint_module'] = f'{temp_folder}.{adk_app}' - agent_config['entrypoint_object'] = 'adk_app' - agent_config['source_packages'] = [temp_folder] + agent_config['source_packages'] = [f'agents/{app_name}', 'Dockerfile'] + agent_config['image_spec'] = {} # Use the Dockerfile agent_config['class_methods'] = _AGENT_ENGINE_CLASS_METHODS agent_config['agent_framework'] = 'google-adk' if not agent_engine_id: - agent_engine = client.agent_engines.create(config=agent_config) - click.secho( - f'✅ Created agent engine: {agent_engine.api_resource.name}', - fg='green', - ) - _print_agent_engine_url(agent_engine.api_resource.name) - else: - if project and region and not agent_engine_id.startswith('projects/'): - agent_engine_id = f'projects/{project}/locations/{region}/reasoningEngines/{agent_engine_id}' - client.agent_engines.update(name=agent_engine_id, config=agent_config) - click.secho(f'✅ Updated agent engine: {agent_engine_id}', fg='green') - _print_agent_engine_url(agent_engine_id) + agent_engine = client.agent_engines.create() + agent_engine_id = agent_engine.api_resource.name + click.secho(f'Created a new agent engine: {agent_engine_id}', fg='green') + elif project and region and not agent_engine_id.startswith('projects/'): + agent_engine_id = f'projects/{project}/locations/{region}/reasoningEngines/{agent_engine_id}' + click.echo('Creating Dockerfile...') + create_dockerfile_for_agent_engine(agent_engine_id) + click.echo(f'Dockerfile created at {os.getcwd()}/Dockerfile.') + client.agent_engines.update(name=agent_engine_id, config=agent_config) + click.secho(f'Deployed to agent engine: {agent_engine_id}', fg='green') + _print_agent_engine_url(agent_engine_id) finally: click.echo(f'Cleaning up the temp folder: {temp_folder}') - shutil.rmtree(agent_src_path) - if did_change_cwd: - os.chdir(original_cwd) + os.chdir(original_cwd) + shutil.rmtree(temp_folder) def to_gke( @@ -1304,6 +1287,7 @@ def to_gke( trigger_sources_option=( f'--trigger_sources={trigger_sources}' if trigger_sources else '' ), + gemini_enterprise_option='', ) dockerfile_path = os.path.join(temp_folder, 'Dockerfile') os.makedirs(temp_folder, exist_ok=True) diff --git a/src/google/adk/cli/cli_tools_click.py b/src/google/adk/cli/cli_tools_click.py index 07ccc15892..6b42cf5f1e 100644 --- a/src/google/adk/cli/cli_tools_click.py +++ b/src/google/adk/cli/cli_tools_click.py @@ -1398,7 +1398,7 @@ def wrapper(*args, **kwargs): return decorator -def _deprecate_staging_bucket(ctx, param, value): +def _deprecate_parameter(ctx, param, value): if value: click.echo( click.style( @@ -1411,6 +1411,19 @@ def _deprecate_staging_bucket(ctx, param, value): return value +def _deprecate_trace_to_cloud(ctx, param, value): + if value: + click.echo( + click.style( + f"WARNING: --{param} is deprecated and will be removed. Please" + " use --otel_to_cloud instead.", + fg="yellow", + ), + err=True, + ) + return value + + def deprecated_adk_services_options(): """Deprecated ADK services options.""" @@ -1720,6 +1733,15 @@ async def _lifespan(app: FastAPI): "Automatically create a session if it doesn't exist when calling /run." ), ) +@click.option( + "--gemini_enterprise_app_name", + type=str, + default=None, + help=( + "The app_name to register with Gemini Enterprise via" + " https://docs.cloud.google.com/gemini/enterprise/docs/register-and-manage-an-adk-agent" + ), +) def cli_api_server( agents_dir: str, eval_storage_uri: Optional[str] = None, @@ -1742,6 +1764,7 @@ def cli_api_server( extra_plugins: Optional[list[str]] = None, auto_create_session: bool = False, trigger_sources: Optional[list[str]] = None, + gemini_enterprise_app_name: Optional[str] = None, ): """Starts a FastAPI server for agents. @@ -1776,6 +1799,7 @@ def cli_api_server( extra_plugins=extra_plugins, auto_create_session=auto_create_session, trigger_sources=trigger_sources, + gemini_enterprise_app_name=gemini_enterprise_app_name, ), host=host, port=port, @@ -2123,7 +2147,7 @@ def cli_migrate_session( type=str, default=None, help="Deprecated. This argument is no longer required or used.", - callback=_deprecate_staging_bucket, + callback=_deprecate_parameter, ) @click.option( "--agent_engine_id", @@ -2145,7 +2169,8 @@ def cli_migrate_session( is_flag=True, show_default=True, default=None, - help="Optional. Whether to enable Cloud Trace for Agent Engine.", + help=" NOTE: This flag is deprecated and will be removed in the future.", + callback=_deprecate_trace_to_cloud, ) @click.option( "--otel_to_cloud", @@ -2172,11 +2197,9 @@ def cli_migrate_session( @click.option( "--adk_app", type=str, - default="agent_engine_app", - help=( - "Optional. Python file for defining the ADK application" - " (default: a file named agent_engine_app.py)" - ), + default=None, + help=" NOTE: This flag is deprecated and will be removed in the future.", + callback=_deprecate_parameter, ) @click.option( "--temp_folder", @@ -2192,35 +2215,29 @@ def cli_migrate_session( "--adk_app_object", type=str, default=None, - help=( - "Optional. Python object corresponding to the root ADK agent or app." - " It can only be `root_agent` or `app`. (default: `root_agent`)" - ), + help=" NOTE: This flag is deprecated and will be removed in the future.", + callback=_deprecate_parameter, ) @click.option( "--env_file", type=str, default="", - help=( - "Optional. The filepath to the `.env` file for environment variables." - " (default: the `.env` file in the `agent` directory, if any.)" - ), + help=" NOTE: This flag is deprecated and will be removed in the future.", + callback=_deprecate_parameter, ) @click.option( "--requirements_file", type=str, default="", - help=( - "Optional. The filepath to the `requirements.txt` file to use." - " (default: the `requirements.txt` file in the `agent` directory, if" - " any.)" - ), + help=" NOTE: This flag is deprecated and will be removed in the future.", + callback=_deprecate_parameter, ) @click.option( "--absolutize_imports", type=bool, default=False, help=" NOTE: This flag is deprecated and will be removed in the future.", + callback=_deprecate_parameter, ) @click.option( "--agent_engine_config_file", @@ -2236,21 +2253,52 @@ def cli_migrate_session( @click.option( "--validate-agent-import/--no-validate-agent-import", default=False, - help=( - "Optional. Validate that the agent module can be imported before" - " deployment. This requires your local environment to have the same" - " dependencies as the deployment environment. (default: disabled)" - ), + help=" NOTE: This flag is deprecated and will be removed in the future.", + callback=_deprecate_parameter, ) @click.option( "--skip-agent-import-validation", "skip_agent_import_validation_alias", is_flag=True, default=False, + help=" NOTE: This flag is deprecated and will be removed in the future.", + callback=_deprecate_parameter, +) +# Kept as raw str (not parsed to list) — interpolated directly into Dockerfile CMD. +@click.option( + "--trigger_sources", + type=str, help=( - "Optional. Skip pre-deployment import validation of `agent.py`. This is" - " the default; use --validate-agent-import to enable validation." + "Optional. Comma-separated list of trigger sources to enable" + " (e.g., 'pubsub,eventarc'). Registers /trigger/* endpoints" + " for batch and event-driven agent invocations." ), + default=None, +) +@click.option( + "--adk_version", + type=str, + default=version.__version__, + show_default=True, + help=( + "Optional. The ADK version used in Agent Engine deployment. (default: " + " the version in the dev environment)" + ), +) +@click.option( + "--artifact_service_uri", + type=str, + help=textwrap.dedent( + """\ + Optional. The URI of the artifact service. If set, ADK uses this service. + + \b + If unset, ADK chooses a default artifact service. + - Use 'gs://' to connect to the GCS artifact service. + - Use 'memory://' to force the in-memory artifact service. + - Use 'file://' to store artifacts in a custom local directory.""" + ), + default=None, ) @click.argument( "agent", @@ -2269,7 +2317,7 @@ def cli_deploy_agent_engine( api_key: Optional[str], display_name: str, description: str, - adk_app: str, + adk_app: Optional[str], adk_app_object: Optional[str], temp_folder: Optional[str], env_file: str, @@ -2278,6 +2326,9 @@ def cli_deploy_agent_engine( agent_engine_config_file: str, validate_agent_import: bool = False, skip_agent_import_validation_alias: bool = False, + adk_version: Optional[str] = None, + trigger_sources: Optional[str] = None, + artifact_service_uri: Optional[str] = None, ): """Deploys an agent to Agent Engine. @@ -2317,6 +2368,9 @@ def cli_deploy_agent_engine( absolutize_imports=absolutize_imports, agent_engine_config_file=agent_engine_config_file, skip_agent_import_validation=not validate_agent_import, + trigger_sources=trigger_sources, + artifact_service_uri=artifact_service_uri, + adk_version=adk_version, ) except Exception as e: click.secho(f"Deploy failed: {e}", fg="red", err=True) diff --git a/src/google/adk/cli/fast_api.py b/src/google/adk/cli/fast_api.py index fa1948d4e2..35e8037f97 100644 --- a/src/google/adk/cli/fast_api.py +++ b/src/google/adk/cli/fast_api.py @@ -29,11 +29,18 @@ import click from fastapi import FastAPI from fastapi import HTTPException +from fastapi import Request from fastapi import UploadFile +from fastapi.encoders import jsonable_encoder from fastapi.responses import FileResponse +from fastapi.responses import JSONResponse from fastapi.responses import PlainTextResponse +from fastapi.responses import StreamingResponse +from opentelemetry import context +from opentelemetry import trace from opentelemetry.sdk.trace import export from opentelemetry.sdk.trace import TracerProvider +from pydantic import BaseModel from starlette.types import Lifespan from watchdog.observers import Observer @@ -43,6 +50,7 @@ from ..runners import Runner from .adk_web_server import AdkWebServer from .service_registry import load_services_module +from .utils import _telemetry as telemetry_utils from .utils import envs from .utils import evals from .utils.agent_change_handler import AgentChangeEventHandler @@ -52,6 +60,12 @@ from .utils.service_factory import create_memory_service_from_options from .utils.service_factory import create_session_service_from_options + +class _QueryRequest(BaseModel): + input: dict | None = None + class_method: str | None = None + + logger = logging.getLogger("google_adk." + __name__) _LAZY_SERVICE_IMPORTS: dict[str, str] = { @@ -97,6 +111,7 @@ def get_fast_api_app( logo_image_url: Optional[str] = None, auto_create_session: bool = False, trigger_sources: Optional[list[Literal["pubsub", "eventarc"]]] = None, + gemini_enterprise_app_name: Optional[str] = None, ) -> FastAPI: """Constructs and returns a FastAPI application for serving ADK agents. @@ -143,6 +158,8 @@ def get_fast_api_app( trigger_sources: List of trigger sources to enable (e.g. ["pubsub", "eventarc"]). When set, registers /trigger/* endpoints for batch and event-driven agent invocations. None disables all trigger endpoints. + gemini_enterprise_app_name: The app_name to register with Gemini Enterprise + via https://docs.cloud.google.com/gemini/enterprise/docs/register-and-manage-an-adk-agent Returns: The configured FastAPI application instance. @@ -656,5 +673,139 @@ async def _get_a2a_runner_async() -> Runner: except Exception as e: logger.error("Failed to setup A2A agent %s: %s", app_name, e) # Continue with other agents even if one fails + if gemini_enterprise_app_name: + if gemini_enterprise_app_name not in agent_loader.list_agents(): + raise ValueError( + f"App {gemini_enterprise_app_name} not found in dir: {agents_dir}" + ) + + import inspect + import json + + from google.adk.agents import Agent + import vertexai + from vertexai import agent_engines + + project = os.environ.get("GOOGLE_CLOUD_PROJECT", None) + location = os.environ.get( + "GOOGLE_CLOUD_AGENT_ENGINE_LOCATION", + os.environ.get("GOOGLE_CLOUD_LOCATION", None), + ) + api_key = os.environ.get("GOOGLE_API_KEY", None) + if project: + vertexai.init(project=project, location=location) + elif api_key: + vertexai.init(api_key=api_key) + else: + raise ValueError( + "No GOOGLE_CLOUD_PROJECT or GOOGLE_API_KEY found in environment" + " variables." + ) + # The tmp agent will be replaced by the adk server's runner and services. + adk_app = agent_engines.AdkApp( + agent=Agent(name="tmp"), + enable_tracing=trace_to_cloud, + ) + adk_app.set_up() # for the (existing) necessary instrumentation setup. + adk_app._tmpl_attrs["runner"] = None + adk_app._tmpl_attrs["app_name"] = gemini_enterprise_app_name + adk_app._tmpl_attrs["session_service"] = session_service + adk_app._tmpl_attrs["memory_service"] = memory_service + adk_app._tmpl_attrs["artifact_service"] = artifact_service + + def _encode_chunk_to_json(chunk): + """Encodes a chunk to a JSON string with a newline.""" + try: + json_chunk = jsonable_encoder(chunk) + return f"{json.dumps(json_chunk)}\n" + except Exception: + logging.exception("Failed to encode chunk") + return None + + async def json_generator(output): + async for chunk in output: + encoded_chunk = _encode_chunk_to_json(chunk) # type: ignore[no-untyped-call] + if encoded_chunk is None: + break + yield encoded_chunk + + async def _invoke_callable_or_raise( + invocation_callable, invocation_payload + ): + if inspect.iscoroutinefunction(invocation_callable): + return await invocation_callable(**invocation_payload) + else: + return invocation_callable(**invocation_payload) + + # Implement a FastAPI middleware to extract and attach OpenTelemetry trace + # context from a custom Google-Agent-Engine-Traceparent header in incoming + # requests. This enables distributed tracing. + tracer_provider = trace.get_tracer_provider() + if isinstance(tracer_provider, TracerProvider): + tracer_provider.add_span_processor(telemetry_utils.TopSpanProcessor()) + else: + logging.warning( + "OpenTelemetry tracing is not enabled. Please set the" + " `OTEL_PYTHON_TRACER_PROVIDER` environment variable to enable" + " tracing." + ) + + @app.middleware("context_propagation") + async def context_propagation(request: Request, call_next): + ctx = telemetry_utils.get_propagated_context(request) + context.attach(ctx) + response = await call_next(request) + return response + + @app.post( + "/api/reasoning_engine", + response_model_exclude_none=True, + response_class=JSONResponse, + ) + async def query(request: _QueryRequest): + if not adk_app._tmpl_attrs.get("runner"): + adk_app._tmpl_attrs["runner"] = await adk_web_server.get_runner_async( + app_name=gemini_enterprise_app_name + ) + if request.class_method is None: + raise HTTPException( + status_code=400, detail="class_method cannot be None" + ) + method = getattr(adk_app, request.class_method) + output = await _invoke_callable_or_raise(method, request.input or {}) # type: ignore[no-untyped-call] + + try: + json_serialized_content = jsonable_encoder({"output": output}) + except ValueError as encoding_error: + logging.exception( + "FastAPI could not JSON-encode the response from invocation method" + " %s. Error: %s. Invocation method's original response: %r", + request.class_method, + encoding_error, + output, + ) + raise encoding_error + return JSONResponse(content=json_serialized_content) + + @app.post( + "/api/stream_reasoning_engine", + response_model_exclude_none=True, + response_class=StreamingResponse, + ) + async def stream_query(request: _QueryRequest): + if not adk_app._tmpl_attrs.get("runner"): + adk_app._tmpl_attrs["runner"] = await adk_web_server.get_runner_async( + app_name=gemini_enterprise_app_name + ) + if request.class_method is None: + raise HTTPException( + status_code=400, detail="class_method cannot be None" + ) + method = getattr(adk_app, request.class_method) + output = await _invoke_callable_or_raise(method, request.input or {}) # type: ignore[no-untyped-call] + return StreamingResponse( + content=json_generator(output), # type: ignore[no-untyped-call] + media_type="application/json", + ) return app diff --git a/src/google/adk/cli/utils/_telemetry.py b/src/google/adk/cli/utils/_telemetry.py new file mode 100644 index 0000000000..070cb45526 --- /dev/null +++ b/src/google/adk/cli/utils/_telemetry.py @@ -0,0 +1,106 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +from typing import Mapping +from typing import Optional + +import fastapi +from opentelemetry import baggage +from opentelemetry import context +from opentelemetry.sdk import trace +from opentelemetry.trace.propagation import tracecontext + +_GOOGLE_AE_TRACEPARENT_HEADER = "Google-Agent-Engine-Traceparent" +_TRACEPARENT_BAGGAGE_KEY = "traceparent" +_GOOGLE_TRACEPARENT_HEADER = "traceparent" +_GOOGLE_TRACEPARENT_BAGGAGE_KEY = "google_traceparent" +_GOOGLE_TRACEPARENT_SUPPORT_ATTRIBUTE_KEY = "supportID" + + +def get_propagated_context(request: fastapi.Request) -> context.Context: + """Propagates context from the request headers.""" + ctx = context.get_current() + + if _GOOGLE_TRACEPARENT_HEADER in request.headers: + original_traceparent = request.headers[_GOOGLE_TRACEPARENT_HEADER] + ctx = baggage.set_baggage( + _GOOGLE_TRACEPARENT_BAGGAGE_KEY, + original_traceparent, + context=ctx, + ) + + if _GOOGLE_AE_TRACEPARENT_HEADER in request.headers: + carrier = {"traceparent": request.headers[_GOOGLE_AE_TRACEPARENT_HEADER]} + ctx = baggage.set_baggage( + _TRACEPARENT_BAGGAGE_KEY, + request.headers[_GOOGLE_AE_TRACEPARENT_HEADER], + context=ctx, + ) + ctx = tracecontext.TraceContextTextMapPropagator().extract( + carrier=carrier, context=ctx + ) + + return ctx + + +class TopSpanProcessor(trace.SpanProcessor): + """Top span processor.""" + + def on_start( + self, span: trace.Span, parent_context: Optional[context.Context] = None + ): + """Adds support ID to the top span.""" + baggage_items = baggage.get_all(context=parent_context) + if self._is_top_span(span, baggage_items) and ( + baggage_trace_header := baggage_items.get( + _GOOGLE_TRACEPARENT_BAGGAGE_KEY + ) + ): + span.set_attribute( + _GOOGLE_TRACEPARENT_SUPPORT_ATTRIBUTE_KEY, baggage_trace_header + ) + + def on_end(self, span: trace.ReadableSpan) -> None: + pass + + def shutdown(self) -> None: + pass + + def force_flush(self, timeout_millis: int = 30000) -> bool: + return True + + def _is_top_span( + self, span: trace.Span, baggage_items: Mapping[str, object] + ) -> bool: + """Returns true if the span is a top span. + + Args: + span: The span to check. + baggage_items: The baggage items that carry the context. + + Top span (e.g. "Invocation" span) is defined as the first span generated in + trace generation. + Top span could have an empty parent or the parent could be the span + provided by traceparent propagation. + """ + if span.parent is None or span.parent.span_id == 0: + return True + if _TRACEPARENT_BAGGAGE_KEY in baggage_items: + parent_id_hex = str(baggage_items[_TRACEPARENT_BAGGAGE_KEY]).split("-")[2] + parent_id_int = int(parent_id_hex, 16) + if span.parent.span_id == parent_id_int: + return True + return False diff --git a/tests/unittests/cli/test_fast_api.py b/tests/unittests/cli/test_fast_api.py index 3e63f31222..c1a061bc59 100755 --- a/tests/unittests/cli/test_fast_api.py +++ b/tests/unittests/cli/test_fast_api.py @@ -22,6 +22,7 @@ from typing import Any from typing import Optional from unittest.mock import AsyncMock +from unittest.mock import call from unittest.mock import MagicMock from unittest.mock import patch @@ -889,6 +890,67 @@ def test_app_with_a2a( yield client +@pytest.fixture +def test_app_with_gemini_enterprise( + mock_session_service, + mock_artifact_service, + mock_memory_service, + mock_agent_loader, + mock_eval_sets_manager, + mock_eval_set_results_manager, + monkeypatch, +): + """Create a TestClient with gemini_enterprise_app_name set.""" + monkeypatch.setenv("GOOGLE_CLOUD_PROJECT", "test-project") + mock_agent_loader.list_agents = MagicMock( + return_value=["test_app", "gemini_app"] + ) + + mock_adk_app_instance = MagicMock() + mock_adk_app_instance._tmpl_attrs = {} + + async def my_method_impl(**kwargs): + return {"result": "success", "kwargs": kwargs} + + mock_adk_app_instance.my_method = my_method_impl + + async def my_stream_method_impl(**kwargs): + yield {"chunk": 1, "kwargs": kwargs} + await asyncio.sleep(0) + yield {"chunk": 2, "kwargs": kwargs} + + mock_adk_app_instance.my_stream_method = my_stream_method_impl + + with ( + patch("vertexai.init", new_callable=MagicMock) as mock_vertexai_init, + patch( + "vertexai.agent_engines.AdkApp", return_value=mock_adk_app_instance + ) as mock_adk_app_cls, + patch("google.adk.agents.Agent", new_callable=MagicMock), + patch( + "google.adk.cli.utils._telemetry.TopSpanProcessor", + new_callable=MagicMock, + ), + patch( + "google.adk.cli.utils._telemetry.get_propagated_context", + new_callable=MagicMock, + ), + ): + client = _create_test_client( + mock_session_service, + mock_artifact_service, + mock_memory_service, + mock_agent_loader, + mock_eval_sets_manager, + mock_eval_set_results_manager, + gemini_enterprise_app_name="gemini_app", + ) + client.mock_vertexai_init = mock_vertexai_init + client.mock_adk_app_cls = mock_adk_app_cls + client.mock_adk_app_instance = mock_adk_app_instance + yield client + + ################################################# # Test Cases ################################################# @@ -2438,5 +2500,203 @@ async def run_async_capture( assert captured_visual_builder_values.get("yaml_app_after_sleep") == True +################################################# +# Gemini Enterprise Tests +################################################# + + +def test_gemini_app_not_found_raises( + mock_session_service, + mock_artifact_service, + mock_memory_service, + mock_agent_loader, + mock_eval_sets_manager, + mock_eval_set_results_manager, + monkeypatch, +): + """Test get_fast_api_app raises ValueError if gemini_enterprise_app_name not found.""" + monkeypatch.setenv("GOOGLE_CLOUD_PROJECT", "test-project") + mock_agent_loader.list_agents = MagicMock(return_value=["test_app"]) + with pytest.raises(ValueError, match="not found in dir"): + _create_test_client( + mock_session_service, + mock_artifact_service, + mock_memory_service, + mock_agent_loader, + mock_eval_sets_manager, + mock_eval_set_results_manager, + gemini_enterprise_app_name="nonexistent_app", + ) + + +def test_gemini_missing_credentials_raises( + mock_session_service, + mock_artifact_service, + mock_memory_service, + mock_agent_loader, + mock_eval_sets_manager, + mock_eval_set_results_manager, + monkeypatch, +): + """Test get_fast_api_app raises ValueError if no credentials are provided.""" + monkeypatch.delenv("GOOGLE_CLOUD_PROJECT", raising=False) + monkeypatch.delenv("GOOGLE_API_KEY", raising=False) + mock_agent_loader.list_agents = MagicMock(return_value=["gemini_app"]) + with pytest.raises( + ValueError, match="No GOOGLE_CLOUD_PROJECT or GOOGLE_API_KEY" + ): + with ( + patch("vertexai.init"), + patch("vertexai.agent_engines.AdkApp"), + patch("google.adk.agents.Agent"), + patch( + "google.adk.cli.utils._telemetry.TopSpanProcessor", + new_callable=MagicMock, + ), + patch( + "google.adk.cli.utils._telemetry.get_propagated_context", + new_callable=MagicMock, + ), + ): + _create_test_client( + mock_session_service, + mock_artifact_service, + mock_memory_service, + mock_agent_loader, + mock_eval_sets_manager, + mock_eval_set_results_manager, + gemini_enterprise_app_name="gemini_app", + ) + + +def test_gemini_init_with_project_id( + mock_session_service, + mock_artifact_service, + mock_memory_service, + mock_agent_loader, + mock_eval_sets_manager, + mock_eval_set_results_manager, + monkeypatch, +): + """Test vertexai.init is called with project_id.""" + monkeypatch.setenv("GOOGLE_CLOUD_PROJECT", "test-project") + monkeypatch.setenv("GOOGLE_CLOUD_LOCATION", "test-location") + monkeypatch.delenv("GOOGLE_API_KEY", raising=False) + mock_agent_loader.list_agents = MagicMock(return_value=["gemini_app"]) + with ( + patch("vertexai.init") as mock_init, + patch("vertexai.agent_engines.AdkApp"), + patch("google.adk.agents.Agent"), + patch( + "google.adk.cli.utils._telemetry.TopSpanProcessor", + new_callable=MagicMock, + ), + patch( + "google.adk.cli.utils._telemetry.get_propagated_context", + new_callable=MagicMock, + ), + ): + _create_test_client( + mock_session_service, + mock_artifact_service, + mock_memory_service, + mock_agent_loader, + mock_eval_sets_manager, + mock_eval_set_results_manager, + gemini_enterprise_app_name="gemini_app", + ) + mock_init.assert_called_once_with( + project="test-project", + location="test-location", + ) + + +def test_gemini_init_with_api_key( + mock_session_service, + mock_artifact_service, + mock_memory_service, + mock_agent_loader, + mock_eval_sets_manager, + mock_eval_set_results_manager, + monkeypatch, +): + """Test vertexai.init is called with api_key.""" + monkeypatch.delenv("GOOGLE_CLOUD_PROJECT", raising=False) + monkeypatch.setenv("GOOGLE_API_KEY", "test-api-key") + mock_agent_loader.list_agents = MagicMock(return_value=["gemini_app"]) + with ( + patch("vertexai.init") as mock_init, + patch("vertexai.agent_engines.AdkApp"), + patch("google.adk.agents.Agent"), + patch( + "google.adk.cli.utils._telemetry.TopSpanProcessor", + new_callable=MagicMock, + ), + patch( + "google.adk.cli.utils._telemetry.get_propagated_context", + new_callable=MagicMock, + ), + ): + _create_test_client( + mock_session_service, + mock_artifact_service, + mock_memory_service, + mock_agent_loader, + mock_eval_sets_manager, + mock_eval_set_results_manager, + gemini_enterprise_app_name="gemini_app", + ) + mock_init.assert_called_once_with(api_key="test-api-key") + + +def test_gemini_reasoning_engine_success(test_app_with_gemini_enterprise): + """Test POST /api/reasoning_engine success case.""" + response = test_app_with_gemini_enterprise.post( + "/api/reasoning_engine", + json={"class_method": "my_method", "input": {"arg1": 1}}, + ) + assert response.status_code == 200 + assert response.json() == { + "output": {"result": "success", "kwargs": {"arg1": 1}} + } + + +def test_gemini_reasoning_engine_missing_class_method( + test_app_with_gemini_enterprise, +): + """Test POST /api/reasoning_engine with missing class_method.""" + response = test_app_with_gemini_enterprise.post( + "/api/reasoning_engine", + json={"input": {"arg1": 1}}, + ) + assert response.status_code == 400 + + +def test_gemini_stream_reasoning_engine_success( + test_app_with_gemini_enterprise, +): + """Test POST /api/stream_reasoning_engine success case.""" + response = test_app_with_gemini_enterprise.post( + "/api/stream_reasoning_engine", + json={"class_method": "my_stream_method", "input": {"arg1": 1}}, + ) + assert response.status_code == 200 + lines = response.text.strip().split("\n") + assert len(lines) == 2 + assert json.loads(lines[0]) == {"chunk": 1, "kwargs": {"arg1": 1}} + assert json.loads(lines[1]) == {"chunk": 2, "kwargs": {"arg1": 1}} + + +def test_gemini_stream_reasoning_engine_missing_class_method( + test_app_with_gemini_enterprise, +): + """Test POST /api/stream_reasoning_engine with missing class_method.""" + response = test_app_with_gemini_enterprise.post( + "/api/stream_reasoning_engine", + json={"input": {"arg1": 1}}, + ) + assert response.status_code == 400 + + if __name__ == "__main__": pytest.main(["-xvs", __file__]) diff --git a/tests/unittests/cli/utils/test_cli_deploy.py b/tests/unittests/cli/utils/test_cli_deploy.py index b79a70238e..a75e8946ec 100644 --- a/tests/unittests/cli/utils/test_cli_deploy.py +++ b/tests/unittests/cli/utils/test_cli_deploy.py @@ -226,19 +226,6 @@ def test_get_service_option_by_adk_version( assert actual.rstrip() == expected.rstrip() -def test_agent_engine_app_template_compiles_with_windows_paths() -> None: - """It should not emit invalid Python when paths contain `\\u` segments.""" - rendered = cli_deploy._AGENT_ENGINE_APP_TEMPLATE.format( - is_config_agent=True, - agent_folder=r".\user_agent_tmp20260101_000000", - adk_app_object="root_agent", - adk_app_type="agent", - trace_to_cloud_option=False, - express_mode=False, - ) - compile(rendered, "", "exec") - - def test_print_agent_engine_url() -> None: """It should print the correct URL for a fully-qualified resource name.""" with mock.patch("click.secho") as mocked_secho: @@ -268,8 +255,8 @@ def test_to_agent_engine_happy_path( class _FakeAgentEngines: - def create(self, *, config: Dict[str, Any]) -> Any: - create_recorder(config=config) + def create(self, **kwargs: Any) -> Any: + create_recorder(**kwargs) return types.SimpleNamespace( api_resource=types.SimpleNamespace( name="projects/p/locations/l/reasoningEngines/e" @@ -294,29 +281,17 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: cli_deploy.to_agent_engine( agent_folder=str(src_dir), temp_folder="tmp", - adk_app="my_adk_app", trace_to_cloud=True, project="my-gcp-project", region="us-central1", display_name="My Test Agent", description="A test agent.", + adk_version="1.2.0", ) - agent_file = tmp_dir / "agent.py" + agent_file = tmp_dir / "Dockerfile" assert agent_file.is_file() - init_file = tmp_dir / "__init__.py" - assert init_file.is_file() - adk_app_file = tmp_dir / "my_adk_app.py" - assert adk_app_file.is_file() - content = adk_app_file.read_text() - assert "from .agent import root_agent" in content - assert "adk_app = AdkApp(" in content - assert "agent=root_agent" in content - assert "enable_tracing=True" in content - reqs_path = tmp_dir / "requirements.txt" - assert reqs_path.is_file() - assert "google-cloud-aiplatform[adk,agent_engines]" in reqs_path.read_text() assert len(create_recorder.calls) == 1 - assert str(rmtree_recorder.get_last_call_args()[0]) == str(tmp_dir) + assert str(rmtree_recorder.get_last_call_args()[0]) == "tmp" def test_to_agent_engine_raises_when_explicit_config_file_missing( @@ -341,116 +316,13 @@ def test_to_agent_engine_raises_when_explicit_config_file_missing( display_name="My Test Agent", description="A test agent.", agent_engine_config_file=str(missing_config), + adk_version="1.2.0", ) assert "Agent engine config file not found" in str(exc_info.value) assert expected_abs in str(exc_info.value) -def test_to_agent_engine_skips_agent_import_validation_by_default( - monkeypatch: pytest.MonkeyPatch, - agent_dir: Callable[[bool, bool], Path], -) -> None: - """It should skip agent.py import validation by default.""" - validate_recorder = _Recorder() - - def _validate_agent_import(*args: Any, **kwargs: Any) -> None: - validate_recorder(*args, **kwargs) - raise AssertionError("_validate_agent_import should not be called") - - monkeypatch.setattr( - cli_deploy, "_validate_agent_import", _validate_agent_import - ) - - fake_vertexai = types.ModuleType("vertexai") - - class _FakeAgentEngines: - - def create(self, *, config: Dict[str, Any]) -> Any: - del config - return types.SimpleNamespace( - api_resource=types.SimpleNamespace( - name="projects/p/locations/l/reasoningEngines/e" - ) - ) - - class _FakeVertexClient: - - def __init__(self, *args: Any, **kwargs: Any) -> None: - del args - del kwargs - self.agent_engines = _FakeAgentEngines() - - fake_vertexai.Client = _FakeVertexClient - monkeypatch.setitem(sys.modules, "vertexai", fake_vertexai) - - src_dir = agent_dir(False, False) - cli_deploy.to_agent_engine( - agent_folder=str(src_dir), - temp_folder="tmp", - adk_app="my_adk_app", - trace_to_cloud=True, - project="my-gcp-project", - region="us-central1", - display_name="My Test Agent", - description="A test agent.", - ) - - assert validate_recorder.calls == [] - - -def test_to_agent_engine_validates_agent_import_when_enabled( - monkeypatch: pytest.MonkeyPatch, - agent_dir: Callable[[bool, bool], Path], -) -> None: - """It should run agent.py import validation when enabled.""" - validate_recorder = _Recorder() - - def _validate_agent_import(*args: Any, **kwargs: Any) -> None: - validate_recorder(*args, **kwargs) - - monkeypatch.setattr( - cli_deploy, "_validate_agent_import", _validate_agent_import - ) - - fake_vertexai = types.ModuleType("vertexai") - - class _FakeAgentEngines: - - def create(self, *, config: Dict[str, Any]) -> Any: - del config - return types.SimpleNamespace( - api_resource=types.SimpleNamespace( - name="projects/p/locations/l/reasoningEngines/e" - ) - ) - - class _FakeVertexClient: - - def __init__(self, *args: Any, **kwargs: Any) -> None: - del args - del kwargs - self.agent_engines = _FakeAgentEngines() - - fake_vertexai.Client = _FakeVertexClient - monkeypatch.setitem(sys.modules, "vertexai", fake_vertexai) - - src_dir = agent_dir(False, False) - cli_deploy.to_agent_engine( - agent_folder=str(src_dir), - temp_folder="tmp", - adk_app="my_adk_app", - trace_to_cloud=True, - project="my-gcp-project", - region="us-central1", - display_name="My Test Agent", - description="A test agent.", - skip_agent_import_validation=False, - ) - - assert len(validate_recorder.calls) == 1 - - @pytest.mark.parametrize("include_requirements", [True, False]) def test_to_gke_happy_path( monkeypatch: pytest.MonkeyPatch,