From 33b3d2b05e232be9aa8080e1ad8919454ddd0eca Mon Sep 17 00:00:00 2001 From: A Vertex SDK engineer Date: Thu, 30 Oct 2025 11:39:16 -0700 Subject: [PATCH] chore: Enable default-on telemetry for ADK agents. PiperOrigin-RevId: 826121066 --- .../test_agent_engine_templates_adk.py | 228 ------------------ .../vertex_langchain/test_agent_engines.py | 2 +- .../unit/vertexai/genai/test_agent_engines.py | 47 ---- vertexai/_genai/_agent_engines_utils.py | 47 ---- vertexai/_genai/agent_engines.py | 2 - vertexai/agent_engines/_agent_engines.py | 81 +------ 6 files changed, 2 insertions(+), 405 deletions(-) diff --git a/tests/unit/vertex_adk/test_agent_engine_templates_adk.py b/tests/unit/vertex_adk/test_agent_engine_templates_adk.py index bd80f2ef97..4dfcf5acdd 100644 --- a/tests/unit/vertex_adk/test_agent_engine_templates_adk.py +++ b/tests/unit/vertex_adk/test_agent_engine_templates_adk.py @@ -16,26 +16,15 @@ import importlib import json import os -import cloudpickle -import sys from unittest import mock from typing import Optional import dataclasses from google import auth -from google.auth import credentials as auth_credentials -from google.cloud import storage import vertexai -from google.cloud import aiplatform -from google.cloud.aiplatform_v1 import types as aip_types -from google.cloud.aiplatform_v1.services import reasoning_engine_service -from google.cloud.aiplatform import base from google.cloud.aiplatform import initializer from vertexai.agent_engines import _utils from vertexai import agent_engines -from vertexai.agent_engines.templates import adk as adk_template -from vertexai.agent_engines import _agent_engines -from google.api_core import operation as ga_operation from google.genai import types import pytest import uuid @@ -87,52 +76,6 @@ def __init__(self, name: str, model: str): "streaming_mode": "sse", "max_llm_calls": 500, } -_TEST_STAGING_BUCKET = "gs://test-bucket" -_TEST_CREDENTIALS = mock.Mock(spec=auth_credentials.AnonymousCredentials()) -_TEST_PARENT = f"projects/{_TEST_PROJECT}/locations/{_TEST_LOCATION}" -_TEST_RESOURCE_ID = "1028944691210842416" -_TEST_AGENT_ENGINE_RESOURCE_NAME = ( - f"{_TEST_PARENT}/reasoningEngines/{_TEST_RESOURCE_ID}" -) -_TEST_AGENT_ENGINE_DISPLAY_NAME = "Agent Engine Display Name" -_TEST_GCS_DIR_NAME = _agent_engines._DEFAULT_GCS_DIR_NAME -_TEST_BLOB_FILENAME = _agent_engines._BLOB_FILENAME -_TEST_REQUIREMENTS_FILE = _agent_engines._REQUIREMENTS_FILE -_TEST_EXTRA_PACKAGES_FILE = _agent_engines._EXTRA_PACKAGES_FILE -_TEST_AGENT_ENGINE_GCS_URI = "{}/{}/{}".format( - _TEST_STAGING_BUCKET, - _TEST_GCS_DIR_NAME, - _TEST_BLOB_FILENAME, -) -_TEST_AGENT_ENGINE_DEPENDENCY_FILES_GCS_URI = "{}/{}/{}".format( - _TEST_STAGING_BUCKET, - _TEST_GCS_DIR_NAME, - _TEST_EXTRA_PACKAGES_FILE, -) -_TEST_AGENT_ENGINE_REQUIREMENTS_GCS_URI = "{}/{}/{}".format( - _TEST_STAGING_BUCKET, - _TEST_GCS_DIR_NAME, - _TEST_REQUIREMENTS_FILE, -) -_TEST_AGENT_ENGINE_PACKAGE_SPEC = aip_types.ReasoningEngineSpec.PackageSpec( - python_version=f"{sys.version_info.major}.{sys.version_info.minor}", - pickle_object_gcs_uri=_TEST_AGENT_ENGINE_GCS_URI, - dependency_files_gcs_uri=_TEST_AGENT_ENGINE_DEPENDENCY_FILES_GCS_URI, - requirements_gcs_uri=_TEST_AGENT_ENGINE_REQUIREMENTS_GCS_URI, -) -_ADK_AGENT_FRAMEWORK = adk_template.AdkApp.agent_framework -_TEST_AGENT_ENGINE_OBJ = aip_types.ReasoningEngine( - name=_TEST_AGENT_ENGINE_RESOURCE_NAME, - display_name=_TEST_AGENT_ENGINE_DISPLAY_NAME, - spec=aip_types.ReasoningEngineSpec( - package_spec=_TEST_AGENT_ENGINE_PACKAGE_SPEC, - agent_framework=_ADK_AGENT_FRAMEWORK, - ), -) - -GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY = ( - "GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY" -) @pytest.fixture(scope="module") @@ -809,174 +752,3 @@ async def test_async_stream_query_invalid_message_type(self): ): async for _ in app.async_stream_query(user_id=_TEST_USER_ID, message=123): pass - - -@pytest.fixture(scope="module") -def create_agent_engine_mock(): - with mock.patch.object( - reasoning_engine_service.ReasoningEngineServiceClient, - "create_reasoning_engine", - ) as create_agent_engine_mock: - create_agent_engine_lro_mock = mock.Mock(ga_operation.Operation) - create_agent_engine_lro_mock.result.return_value = _TEST_AGENT_ENGINE_OBJ - create_agent_engine_mock.return_value = create_agent_engine_lro_mock - yield create_agent_engine_mock - - -@pytest.fixture(scope="module") -def get_agent_engine_mock(): - with mock.patch.object( - reasoning_engine_service.ReasoningEngineServiceClient, - "get_reasoning_engine", - ) as get_agent_engine_mock: - api_client_mock = mock.Mock() - api_client_mock.get_reasoning_engine.return_value = _TEST_AGENT_ENGINE_OBJ - get_agent_engine_mock.return_value = api_client_mock - yield get_agent_engine_mock - - -@pytest.fixture(scope="module") -def cloud_storage_create_bucket_mock(): - with mock.patch.object(storage, "Client") as cloud_storage_mock: - bucket_mock = mock.Mock(spec=storage.Bucket) - bucket_mock.blob.return_value.open.return_value = "blob_file" - bucket_mock.blob.return_value.upload_from_filename.return_value = None - bucket_mock.blob.return_value.upload_from_string.return_value = None - - cloud_storage_mock.get_bucket = mock.Mock( - side_effect=ValueError("bucket not found") - ) - cloud_storage_mock.bucket.return_value = bucket_mock - cloud_storage_mock.create_bucket.return_value = bucket_mock - - yield cloud_storage_mock - - -@pytest.fixture(scope="module") -def cloudpickle_dump_mock(): - with mock.patch.object(cloudpickle, "dump") as cloudpickle_dump_mock: - yield cloudpickle_dump_mock - - -@pytest.fixture(scope="module") -def cloudpickle_load_mock(): - with mock.patch.object(cloudpickle, "load") as cloudpickle_load_mock: - yield cloudpickle_load_mock - - -@pytest.fixture(scope="function") -def get_gca_resource_mock(): - with mock.patch.object( - base.VertexAiResourceNoun, - "_get_gca_resource", - ) as get_gca_resource_mock: - get_gca_resource_mock.return_value = _TEST_AGENT_ENGINE_OBJ - yield get_gca_resource_mock - - -# Function scope is required for the pytest parameterized tests. -@pytest.fixture(scope="function") -def update_agent_engine_mock(): - with mock.patch.object( - reasoning_engine_service.ReasoningEngineServiceClient, - "update_reasoning_engine", - ) as update_agent_engine_mock: - yield update_agent_engine_mock - - -@pytest.mark.usefixtures("google_auth_mock") -class TestAgentEngines: - def setup_method(self): - importlib.reload(initializer) - importlib.reload(aiplatform) - aiplatform.init( - project=_TEST_PROJECT, - location=_TEST_LOCATION, - credentials=_TEST_CREDENTIALS, - staging_bucket=_TEST_STAGING_BUCKET, - ) - - def teardown_method(self): - initializer.global_pool.shutdown(wait=True) - - @pytest.mark.parametrize( - "env_vars,expected_env_vars", - [ - ({}, {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "true"}), - (None, {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "true"}), - ( - {"some_env": "some_val"}, - { - "some_env": "some_val", - GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "true", - }, - ), - ( - {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "false"}, - {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "false"}, - ), - ], - ) - def test_create_default_telemetry_enablement( - self, - create_agent_engine_mock: mock.Mock, - cloud_storage_create_bucket_mock: mock.Mock, - cloudpickle_dump_mock: mock.Mock, - cloudpickle_load_mock: mock.Mock, - get_gca_resource_mock: mock.Mock, - env_vars: dict[str, str], - expected_env_vars: dict[str, str], - ): - agent_engines.create( - agent_engine=agent_engines.AdkApp(agent=_TEST_AGENT), - env_vars=env_vars, - ) - create_agent_engine_mock.assert_called_once() - deployment_spec = create_agent_engine_mock.call_args.kwargs[ - "reasoning_engine" - ].spec.deployment_spec - assert _utils.to_dict(deployment_spec)["env"] == [ - {"name": key, "value": value} for key, value in expected_env_vars.items() - ] - - @pytest.mark.parametrize( - "env_vars,expected_env_vars", - [ - ({}, {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "true"}), - (None, {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "true"}), - ( - {"some_env": "some_val"}, - { - "some_env": "some_val", - GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "true", - }, - ), - ( - {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "false"}, - {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "false"}, - ), - ], - ) - def test_update_default_telemetry_enablement( - self, - update_agent_engine_mock: mock.Mock, - cloud_storage_create_bucket_mock: mock.Mock, - cloudpickle_dump_mock: mock.Mock, - cloudpickle_load_mock: mock.Mock, - get_gca_resource_mock: mock.Mock, - get_agent_engine_mock: mock.Mock, - env_vars: dict[str, str], - expected_env_vars: dict[str, str], - ): - agent_engines.update( - resource_name=_TEST_AGENT_ENGINE_RESOURCE_NAME, - description="foobar", # avoid "At least one of ... must be specified" errors. - env_vars=env_vars, - ) - update_agent_engine_mock.assert_called_once() - deployment_spec = update_agent_engine_mock.call_args.kwargs[ - "request" - ].reasoning_engine.spec.deployment_spec - assert _utils.to_dict(deployment_spec)["env"] == [ - {"name": key, "value": value} for key, value in expected_env_vars.items() - ] diff --git a/tests/unit/vertex_langchain/test_agent_engines.py b/tests/unit/vertex_langchain/test_agent_engines.py index 8c3610577e..133b2d3b19 100644 --- a/tests/unit/vertex_langchain/test_agent_engines.py +++ b/tests/unit/vertex_langchain/test_agent_engines.py @@ -3260,7 +3260,7 @@ def test_create_agent_engine_with_invalid_type_env_var( "TEST_ENV_VAR": 0.01, # should be a string or dict or SecretRef }, ) - with pytest.raises(TypeError, match="env_vars must be a list, tuple or a dict"): + with pytest.raises(TypeError, match="env_vars must be a list or a dict"): agent_engines.create( self.test_agent, display_name=_TEST_AGENT_ENGINE_DISPLAY_NAME, diff --git a/tests/unit/vertexai/genai/test_agent_engines.py b/tests/unit/vertexai/genai/test_agent_engines.py index 37e678f7f6..bc204d5f79 100644 --- a/tests/unit/vertexai/genai/test_agent_engines.py +++ b/tests/unit/vertexai/genai/test_agent_engines.py @@ -31,7 +31,6 @@ from google.cloud import aiplatform import vertexai from google.cloud.aiplatform import initializer -from vertexai.agent_engines.templates import adk from vertexai._genai import _agent_engines_utils from vertexai._genai import agent_engines from vertexai._genai import types as _genai_types @@ -41,9 +40,6 @@ _TEST_AGENT_FRAMEWORK = "test-agent-framework" -GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY = ( - "GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY" -) class CapitalizeEngine: @@ -852,49 +848,6 @@ def test_create_agent_engine_config_lightweight(self, mock_prepare): "description": _TEST_AGENT_ENGINE_DESCRIPTION, } - @mock.patch.object(_agent_engines_utils, "_prepare") - @pytest.mark.parametrize( - "env_vars,expected_env_vars", - [ - ({}, {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "true"}), - (None, {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "true"}), - ( - {"some_env": "some_val"}, - { - "some_env": "some_val", - GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "true", - }, - ), - ( - {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "false"}, - {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "false"}, - ), - ], - ) - def test_agent_engine_adk_telemetry_enablement( - self, - mock_prepare: mock.Mock, - env_vars: dict[str, str], - expected_env_vars: dict[str, str], - ): - agent = mock.Mock(spec=adk.AdkApp) - agent.clone = lambda: agent - agent.register_operations = lambda: {} - - config = self.client.agent_engines._create_config( - mode="create", - agent=agent, - staging_bucket=_TEST_STAGING_BUCKET, - display_name=_TEST_AGENT_ENGINE_DISPLAY_NAME, - description=_TEST_AGENT_ENGINE_DESCRIPTION, - env_vars=env_vars, - ) - assert config["display_name"] == _TEST_AGENT_ENGINE_DISPLAY_NAME - assert config["description"] == _TEST_AGENT_ENGINE_DESCRIPTION - assert config["spec"]["deployment_spec"]["env"] == [ - {"name": key, "value": value} for key, value in expected_env_vars.items() - ] - @mock.patch.object(_agent_engines_utils, "_prepare") def test_create_agent_engine_config_full(self, mock_prepare): config = self.client.agent_engines._create_config( diff --git a/vertexai/_genai/_agent_engines_utils.py b/vertexai/_genai/_agent_engines_utils.py index 8364212528..52cb009df2 100644 --- a/vertexai/_genai/_agent_engines_utils.py +++ b/vertexai/_genai/_agent_engines_utils.py @@ -1845,50 +1845,3 @@ def _validate_resource_limits_or_raise(resource_limits: dict[str, str]) -> None: f"Memory size of {memory_str} requires at least {min_cpu} CPUs." f" Got {cpu}" ) - - -def _is_adk_agent(agent_engine: _AgentEngineInterface) -> bool: - """Checks if the agent engine is an ADK agent. - - Args: - agent_engine: The agent engine to check. - - Returns: - True if the agent engine is an ADK agent, False otherwise. - """ - - from vertexai.agent_engines.templates import adk - - return isinstance(agent_engine, adk.AdkApp) - - -def _add_telemetry_enablement_env( - env_vars: Optional[Dict[str, Union[str, Any]]] -) -> Optional[Dict[str, Union[str, Any]]]: - """Adds telemetry enablement env var to the env vars. - - This is in order to achieve default-on telemetry. - If the telemetry enablement env var is already set, we do not override it. - - Args: - env_vars: The env vars to add the telemetry enablement env var to. - - Returns: - The env vars with the telemetry enablement env var added. - """ - - GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY = ( - "GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY" - ) - env_to_add = {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "true"} - - if env_vars is None: - return env_to_add - - if not isinstance(env_vars, dict): - raise TypeError(f"env_vars must be a dict, but got {type(env_vars)}.") - - if GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY in env_vars: - return env_vars - - return env_vars | env_to_add diff --git a/vertexai/_genai/agent_engines.py b/vertexai/_genai/agent_engines.py index 2b3ad56d8f..98367a9421 100644 --- a/vertexai/_genai/agent_engines.py +++ b/vertexai/_genai/agent_engines.py @@ -1032,8 +1032,6 @@ def _create_config( raise ValueError("location must be set using `vertexai.Client`.") gcs_dir_name = gcs_dir_name or _agent_engines_utils._DEFAULT_GCS_DIR_NAME agent = _agent_engines_utils._validate_agent_or_raise(agent=agent) - if _agent_engines_utils._is_adk_agent(agent): - env_vars = _agent_engines_utils._add_telemetry_enablement_env(env_vars) staging_bucket = _agent_engines_utils._validate_staging_bucket_or_raise( staging_bucket=staging_bucket, ) diff --git a/vertexai/agent_engines/_agent_engines.py b/vertexai/agent_engines/_agent_engines.py index 596056b15a..9173883d3d 100644 --- a/vertexai/agent_engines/_agent_engines.py +++ b/vertexai/agent_engines/_agent_engines.py @@ -514,13 +514,9 @@ def create( _validate_sys_version_or_raise(sys_version) gcs_dir_name = gcs_dir_name or _DEFAULT_GCS_DIR_NAME staging_bucket = initializer.global_config.staging_bucket - if agent_engine is not None: agent_engine = _validate_agent_engine_or_raise(agent_engine) staging_bucket = _validate_staging_bucket_or_raise(staging_bucket) - if _is_adk_agent(None, agent_engine): - env_vars = _add_telemetry_enablement_env(env_vars=env_vars) - if agent_engine is None: if requirements is not None: raise ValueError("requirements must be None if agent_engine is None.") @@ -537,7 +533,6 @@ def create( sdk_resource = cls.__new__(cls) base.VertexAiResourceNounWithFutureManager.__init__(sdk_resource) - # Prepares the Agent Engine for creation in Vertex AI. # This involves packaging and uploading the artifacts for # agent_engine, requirements and extra_packages to @@ -803,9 +798,6 @@ def update( if agent_engine is not None: agent_engine = _validate_agent_engine_or_raise(agent_engine) - if _is_adk_agent(self, agent_engine): - env_vars = _add_telemetry_enablement_env(env_vars=env_vars) - # Prepares the Agent Engine for update in Vertex AI. This involves # packaging and uploading the artifacts for agent_engine, requirements # and extra_packages to `staging_bucket/gcs_dir_name`. @@ -1064,77 +1056,6 @@ def _validate_agent_engine_or_raise( return agent_engine -def _is_adk_agent( - agent_engine_to_update: Optional[AgentEngine], - new_agent_engine: Optional[_AgentEngineInterface], -) -> bool: - """Checks if the agent engine is an ADK agent. - - Args: - agent_engine_to_update: Existing agent engine, None if creating new one. - new_agent_engine: The new agent engine to deploy. Can be None during an update, if the Python agent implementation is not provided, and should remain unchanged. - - Returns: - True if the agent after the create/update operation, will be an ADK agent. - """ - - from vertexai.agent_engines.templates import adk - - if new_agent_engine is not None: - return ( - getattr(new_agent_engine, "agent_framework", None) - == adk.AdkApp.agent_framework - ) - if agent_engine_to_update is not None: - return ( - agent_engine_to_update.gca_resource.spec.agent_framework - == adk.AdkApp.agent_framework - ) - return False - - -EnvVars = Optional[Union[Sequence[str], Dict[str, Union[str, aip_types.SecretRef]]]] - - -def _add_telemetry_enablement_env(*, env_vars: EnvVars) -> EnvVars: - """Adds telemetry enablement env var to the env vars. - - This is in order to achieve default-on telemetry. - If the telemetry enablement env var is already set, we do not override it. - - Args: - env_vars: The env vars to add the telemetry enablement env var to. - - Returns: - The env vars with the telemetry enablement env var added. - """ - - GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY = ( - "GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY" - ) - - if env_vars is None: - return {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "true"} - if isinstance(env_vars, dict): - return ( - env_vars - if GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY in env_vars - else env_vars | {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "true"} - ) - if isinstance(env_vars, list) or isinstance(env_vars, tuple): - if GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY not in os.environ: - os.environ[GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY] = "true" - - if isinstance(env_vars, list): - return env_vars + [GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY] - else: - return env_vars + (GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY,) - - raise TypeError( - f"env_vars must be a list, tuple or a dict, but got {type(env_vars)}." - ) - - def _validate_requirements_or_raise( *, agent_engine: _AgentEngineInterface, @@ -1388,7 +1309,7 @@ def _generate_deployment_spec_or_raise( ) else: raise TypeError( - f"env_vars must be a list, tuple or a dict, but got {type(env_vars)}." + f"env_vars must be a list or a dict, but got {type(env_vars)}." ) if deployment_spec.env: update_masks.append("spec.deployment_spec.env")