diff --git a/tests/unit/vertex_adk/test_agent_engine_templates_adk.py b/tests/unit/vertex_adk/test_agent_engine_templates_adk.py index 2b5d8fab21..1eb91046bb 100644 --- a/tests/unit/vertex_adk/test_agent_engine_templates_adk.py +++ b/tests/unit/vertex_adk/test_agent_engine_templates_adk.py @@ -16,14 +16,25 @@ import importlib import json import os +import cloudpickle +import sys from unittest import mock from typing import Optional from google import auth +from google.auth import credentials as auth_credentials +from google.cloud import storage import vertexai +from google.cloud import aiplatform +from google.cloud.aiplatform_v1 import types as aip_types +from google.cloud.aiplatform_v1.services import reasoning_engine_service +from google.cloud.aiplatform import base from google.cloud.aiplatform import initializer from vertexai.agent_engines import _utils from vertexai import agent_engines +from vertexai.agent_engines.templates import adk as adk_template +from vertexai.agent_engines import _agent_engines +from google.api_core import operation as ga_operation from google.genai import types import pytest import uuid @@ -75,6 +86,52 @@ def __init__(self, name: str, model: str): "streaming_mode": "sse", "max_llm_calls": 500, } +_TEST_STAGING_BUCKET = "gs://test-bucket" +_TEST_CREDENTIALS = mock.Mock(spec=auth_credentials.AnonymousCredentials()) +_TEST_PARENT = f"projects/{_TEST_PROJECT}/locations/{_TEST_LOCATION}" +_TEST_RESOURCE_ID = "1028944691210842416" +_TEST_AGENT_ENGINE_RESOURCE_NAME = ( + f"{_TEST_PARENT}/reasoningEngines/{_TEST_RESOURCE_ID}" +) +_TEST_AGENT_ENGINE_DISPLAY_NAME = "Agent Engine Display Name" +_TEST_GCS_DIR_NAME = _agent_engines._DEFAULT_GCS_DIR_NAME +_TEST_BLOB_FILENAME = _agent_engines._BLOB_FILENAME +_TEST_REQUIREMENTS_FILE = _agent_engines._REQUIREMENTS_FILE +_TEST_EXTRA_PACKAGES_FILE = _agent_engines._EXTRA_PACKAGES_FILE +_TEST_AGENT_ENGINE_GCS_URI = "{}/{}/{}".format( + _TEST_STAGING_BUCKET, + _TEST_GCS_DIR_NAME, + _TEST_BLOB_FILENAME, +) +_TEST_AGENT_ENGINE_DEPENDENCY_FILES_GCS_URI = "{}/{}/{}".format( + _TEST_STAGING_BUCKET, + _TEST_GCS_DIR_NAME, + _TEST_EXTRA_PACKAGES_FILE, +) +_TEST_AGENT_ENGINE_REQUIREMENTS_GCS_URI = "{}/{}/{}".format( + _TEST_STAGING_BUCKET, + _TEST_GCS_DIR_NAME, + _TEST_REQUIREMENTS_FILE, +) +_TEST_AGENT_ENGINE_PACKAGE_SPEC = aip_types.ReasoningEngineSpec.PackageSpec( + python_version=f"{sys.version_info.major}.{sys.version_info.minor}", + pickle_object_gcs_uri=_TEST_AGENT_ENGINE_GCS_URI, + dependency_files_gcs_uri=_TEST_AGENT_ENGINE_DEPENDENCY_FILES_GCS_URI, + requirements_gcs_uri=_TEST_AGENT_ENGINE_REQUIREMENTS_GCS_URI, +) +_ADK_AGENT_FRAMEWORK = adk_template.AdkApp.agent_framework +_TEST_AGENT_ENGINE_OBJ = aip_types.ReasoningEngine( + name=_TEST_AGENT_ENGINE_RESOURCE_NAME, + display_name=_TEST_AGENT_ENGINE_DISPLAY_NAME, + spec=aip_types.ReasoningEngineSpec( + package_spec=_TEST_AGENT_ENGINE_PACKAGE_SPEC, + agent_framework=_ADK_AGENT_FRAMEWORK, + ), +) + +GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY = ( + "GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY" +) @pytest.fixture(scope="module") @@ -727,3 +784,174 @@ async def test_async_stream_query_invalid_message_type(self): ): async for _ in app.async_stream_query(user_id=_TEST_USER_ID, message=123): pass + + +@pytest.fixture(scope="module") +def create_agent_engine_mock(): + with mock.patch.object( + reasoning_engine_service.ReasoningEngineServiceClient, + "create_reasoning_engine", + ) as create_agent_engine_mock: + create_agent_engine_lro_mock = mock.Mock(ga_operation.Operation) + create_agent_engine_lro_mock.result.return_value = _TEST_AGENT_ENGINE_OBJ + create_agent_engine_mock.return_value = create_agent_engine_lro_mock + yield create_agent_engine_mock + + +@pytest.fixture(scope="module") +def get_agent_engine_mock(): + with mock.patch.object( + reasoning_engine_service.ReasoningEngineServiceClient, + "get_reasoning_engine", + ) as get_agent_engine_mock: + api_client_mock = mock.Mock() + api_client_mock.get_reasoning_engine.return_value = _TEST_AGENT_ENGINE_OBJ + get_agent_engine_mock.return_value = api_client_mock + yield get_agent_engine_mock + + +@pytest.fixture(scope="module") +def cloud_storage_create_bucket_mock(): + with mock.patch.object(storage, "Client") as cloud_storage_mock: + bucket_mock = mock.Mock(spec=storage.Bucket) + bucket_mock.blob.return_value.open.return_value = "blob_file" + bucket_mock.blob.return_value.upload_from_filename.return_value = None + bucket_mock.blob.return_value.upload_from_string.return_value = None + + cloud_storage_mock.get_bucket = mock.Mock( + side_effect=ValueError("bucket not found") + ) + cloud_storage_mock.bucket.return_value = bucket_mock + cloud_storage_mock.create_bucket.return_value = bucket_mock + + yield cloud_storage_mock + + +@pytest.fixture(scope="module") +def cloudpickle_dump_mock(): + with mock.patch.object(cloudpickle, "dump") as cloudpickle_dump_mock: + yield cloudpickle_dump_mock + + +@pytest.fixture(scope="module") +def cloudpickle_load_mock(): + with mock.patch.object(cloudpickle, "load") as cloudpickle_load_mock: + yield cloudpickle_load_mock + + +@pytest.fixture(scope="function") +def get_gca_resource_mock(): + with mock.patch.object( + base.VertexAiResourceNoun, + "_get_gca_resource", + ) as get_gca_resource_mock: + get_gca_resource_mock.return_value = _TEST_AGENT_ENGINE_OBJ + yield get_gca_resource_mock + + +# Function scope is required for the pytest parameterized tests. +@pytest.fixture(scope="function") +def update_agent_engine_mock(): + with mock.patch.object( + reasoning_engine_service.ReasoningEngineServiceClient, + "update_reasoning_engine", + ) as update_agent_engine_mock: + yield update_agent_engine_mock + + +@pytest.mark.usefixtures("google_auth_mock") +class TestAgentEngines: + def setup_method(self): + importlib.reload(initializer) + importlib.reload(aiplatform) + aiplatform.init( + project=_TEST_PROJECT, + location=_TEST_LOCATION, + credentials=_TEST_CREDENTIALS, + staging_bucket=_TEST_STAGING_BUCKET, + ) + + def teardown_method(self): + initializer.global_pool.shutdown(wait=True) + + @pytest.mark.parametrize( + "env_vars,expected_env_vars", + [ + ({}, {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "true"}), + (None, {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "true"}), + ( + {"some_env": "some_val"}, + { + "some_env": "some_val", + GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "true", + }, + ), + ( + {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "false"}, + {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "false"}, + ), + ], + ) + def test_create_default_telemetry_enablement( + self, + create_agent_engine_mock: mock.Mock, + cloud_storage_create_bucket_mock: mock.Mock, + cloudpickle_dump_mock: mock.Mock, + cloudpickle_load_mock: mock.Mock, + get_gca_resource_mock: mock.Mock, + env_vars: dict[str, str], + expected_env_vars: dict[str, str], + ): + agent_engines.create( + agent_engine=agent_engines.AdkApp(agent=_TEST_AGENT), + env_vars=env_vars, + ) + create_agent_engine_mock.assert_called_once() + deployment_spec = create_agent_engine_mock.call_args.kwargs[ + "reasoning_engine" + ].spec.deployment_spec + assert _utils.to_dict(deployment_spec)["env"] == [ + {"name": key, "value": value} for key, value in expected_env_vars.items() + ] + + @pytest.mark.parametrize( + "env_vars,expected_env_vars", + [ + ({}, {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "true"}), + (None, {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "true"}), + ( + {"some_env": "some_val"}, + { + "some_env": "some_val", + GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "true", + }, + ), + ( + {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "false"}, + {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "false"}, + ), + ], + ) + def test_update_default_telemetry_enablement( + self, + update_agent_engine_mock: mock.Mock, + cloud_storage_create_bucket_mock: mock.Mock, + cloudpickle_dump_mock: mock.Mock, + cloudpickle_load_mock: mock.Mock, + get_gca_resource_mock: mock.Mock, + get_agent_engine_mock: mock.Mock, + env_vars: dict[str, str], + expected_env_vars: dict[str, str], + ): + agent_engines.update( + resource_name=_TEST_AGENT_ENGINE_RESOURCE_NAME, + description="foobar", # avoid "At least one of ... must be specified" errors. + env_vars=env_vars, + ) + update_agent_engine_mock.assert_called_once() + deployment_spec = update_agent_engine_mock.call_args.kwargs[ + "request" + ].reasoning_engine.spec.deployment_spec + assert _utils.to_dict(deployment_spec)["env"] == [ + {"name": key, "value": value} for key, value in expected_env_vars.items() + ] diff --git a/tests/unit/vertex_langchain/test_agent_engines.py b/tests/unit/vertex_langchain/test_agent_engines.py index 133b2d3b19..8c3610577e 100644 --- a/tests/unit/vertex_langchain/test_agent_engines.py +++ b/tests/unit/vertex_langchain/test_agent_engines.py @@ -3260,7 +3260,7 @@ def test_create_agent_engine_with_invalid_type_env_var( "TEST_ENV_VAR": 0.01, # should be a string or dict or SecretRef }, ) - with pytest.raises(TypeError, match="env_vars must be a list or a dict"): + with pytest.raises(TypeError, match="env_vars must be a list, tuple or a dict"): agent_engines.create( self.test_agent, display_name=_TEST_AGENT_ENGINE_DISPLAY_NAME, diff --git a/tests/unit/vertexai/genai/test_agent_engines.py b/tests/unit/vertexai/genai/test_agent_engines.py index bc204d5f79..37e678f7f6 100644 --- a/tests/unit/vertexai/genai/test_agent_engines.py +++ b/tests/unit/vertexai/genai/test_agent_engines.py @@ -31,6 +31,7 @@ from google.cloud import aiplatform import vertexai from google.cloud.aiplatform import initializer +from vertexai.agent_engines.templates import adk from vertexai._genai import _agent_engines_utils from vertexai._genai import agent_engines from vertexai._genai import types as _genai_types @@ -40,6 +41,9 @@ _TEST_AGENT_FRAMEWORK = "test-agent-framework" +GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY = ( + "GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY" +) class CapitalizeEngine: @@ -848,6 +852,49 @@ def test_create_agent_engine_config_lightweight(self, mock_prepare): "description": _TEST_AGENT_ENGINE_DESCRIPTION, } + @mock.patch.object(_agent_engines_utils, "_prepare") + @pytest.mark.parametrize( + "env_vars,expected_env_vars", + [ + ({}, {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "true"}), + (None, {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "true"}), + ( + {"some_env": "some_val"}, + { + "some_env": "some_val", + GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "true", + }, + ), + ( + {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "false"}, + {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "false"}, + ), + ], + ) + def test_agent_engine_adk_telemetry_enablement( + self, + mock_prepare: mock.Mock, + env_vars: dict[str, str], + expected_env_vars: dict[str, str], + ): + agent = mock.Mock(spec=adk.AdkApp) + agent.clone = lambda: agent + agent.register_operations = lambda: {} + + config = self.client.agent_engines._create_config( + mode="create", + agent=agent, + staging_bucket=_TEST_STAGING_BUCKET, + display_name=_TEST_AGENT_ENGINE_DISPLAY_NAME, + description=_TEST_AGENT_ENGINE_DESCRIPTION, + env_vars=env_vars, + ) + assert config["display_name"] == _TEST_AGENT_ENGINE_DISPLAY_NAME + assert config["description"] == _TEST_AGENT_ENGINE_DESCRIPTION + assert config["spec"]["deployment_spec"]["env"] == [ + {"name": key, "value": value} for key, value in expected_env_vars.items() + ] + @mock.patch.object(_agent_engines_utils, "_prepare") def test_create_agent_engine_config_full(self, mock_prepare): config = self.client.agent_engines._create_config( diff --git a/vertexai/_genai/_agent_engines_utils.py b/vertexai/_genai/_agent_engines_utils.py index 52cb009df2..8364212528 100644 --- a/vertexai/_genai/_agent_engines_utils.py +++ b/vertexai/_genai/_agent_engines_utils.py @@ -1845,3 +1845,50 @@ def _validate_resource_limits_or_raise(resource_limits: dict[str, str]) -> None: f"Memory size of {memory_str} requires at least {min_cpu} CPUs." f" Got {cpu}" ) + + +def _is_adk_agent(agent_engine: _AgentEngineInterface) -> bool: + """Checks if the agent engine is an ADK agent. + + Args: + agent_engine: The agent engine to check. + + Returns: + True if the agent engine is an ADK agent, False otherwise. + """ + + from vertexai.agent_engines.templates import adk + + return isinstance(agent_engine, adk.AdkApp) + + +def _add_telemetry_enablement_env( + env_vars: Optional[Dict[str, Union[str, Any]]] +) -> Optional[Dict[str, Union[str, Any]]]: + """Adds telemetry enablement env var to the env vars. + + This is in order to achieve default-on telemetry. + If the telemetry enablement env var is already set, we do not override it. + + Args: + env_vars: The env vars to add the telemetry enablement env var to. + + Returns: + The env vars with the telemetry enablement env var added. + """ + + GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY = ( + "GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY" + ) + env_to_add = {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "true"} + + if env_vars is None: + return env_to_add + + if not isinstance(env_vars, dict): + raise TypeError(f"env_vars must be a dict, but got {type(env_vars)}.") + + if GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY in env_vars: + return env_vars + + return env_vars | env_to_add diff --git a/vertexai/_genai/agent_engines.py b/vertexai/_genai/agent_engines.py index 98367a9421..2b3ad56d8f 100644 --- a/vertexai/_genai/agent_engines.py +++ b/vertexai/_genai/agent_engines.py @@ -1032,6 +1032,8 @@ def _create_config( raise ValueError("location must be set using `vertexai.Client`.") gcs_dir_name = gcs_dir_name or _agent_engines_utils._DEFAULT_GCS_DIR_NAME agent = _agent_engines_utils._validate_agent_or_raise(agent=agent) + if _agent_engines_utils._is_adk_agent(agent): + env_vars = _agent_engines_utils._add_telemetry_enablement_env(env_vars) staging_bucket = _agent_engines_utils._validate_staging_bucket_or_raise( staging_bucket=staging_bucket, ) diff --git a/vertexai/agent_engines/_agent_engines.py b/vertexai/agent_engines/_agent_engines.py index 9173883d3d..596056b15a 100644 --- a/vertexai/agent_engines/_agent_engines.py +++ b/vertexai/agent_engines/_agent_engines.py @@ -514,9 +514,13 @@ def create( _validate_sys_version_or_raise(sys_version) gcs_dir_name = gcs_dir_name or _DEFAULT_GCS_DIR_NAME staging_bucket = initializer.global_config.staging_bucket + if agent_engine is not None: agent_engine = _validate_agent_engine_or_raise(agent_engine) staging_bucket = _validate_staging_bucket_or_raise(staging_bucket) + if _is_adk_agent(None, agent_engine): + env_vars = _add_telemetry_enablement_env(env_vars=env_vars) + if agent_engine is None: if requirements is not None: raise ValueError("requirements must be None if agent_engine is None.") @@ -533,6 +537,7 @@ def create( sdk_resource = cls.__new__(cls) base.VertexAiResourceNounWithFutureManager.__init__(sdk_resource) + # Prepares the Agent Engine for creation in Vertex AI. # This involves packaging and uploading the artifacts for # agent_engine, requirements and extra_packages to @@ -798,6 +803,9 @@ def update( if agent_engine is not None: agent_engine = _validate_agent_engine_or_raise(agent_engine) + if _is_adk_agent(self, agent_engine): + env_vars = _add_telemetry_enablement_env(env_vars=env_vars) + # Prepares the Agent Engine for update in Vertex AI. This involves # packaging and uploading the artifacts for agent_engine, requirements # and extra_packages to `staging_bucket/gcs_dir_name`. @@ -1056,6 +1064,77 @@ def _validate_agent_engine_or_raise( return agent_engine +def _is_adk_agent( + agent_engine_to_update: Optional[AgentEngine], + new_agent_engine: Optional[_AgentEngineInterface], +) -> bool: + """Checks if the agent engine is an ADK agent. + + Args: + agent_engine_to_update: Existing agent engine, None if creating new one. + new_agent_engine: The new agent engine to deploy. Can be None during an update, if the Python agent implementation is not provided, and should remain unchanged. + + Returns: + True if the agent after the create/update operation, will be an ADK agent. + """ + + from vertexai.agent_engines.templates import adk + + if new_agent_engine is not None: + return ( + getattr(new_agent_engine, "agent_framework", None) + == adk.AdkApp.agent_framework + ) + if agent_engine_to_update is not None: + return ( + agent_engine_to_update.gca_resource.spec.agent_framework + == adk.AdkApp.agent_framework + ) + return False + + +EnvVars = Optional[Union[Sequence[str], Dict[str, Union[str, aip_types.SecretRef]]]] + + +def _add_telemetry_enablement_env(*, env_vars: EnvVars) -> EnvVars: + """Adds telemetry enablement env var to the env vars. + + This is in order to achieve default-on telemetry. + If the telemetry enablement env var is already set, we do not override it. + + Args: + env_vars: The env vars to add the telemetry enablement env var to. + + Returns: + The env vars with the telemetry enablement env var added. + """ + + GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY = ( + "GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY" + ) + + if env_vars is None: + return {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "true"} + if isinstance(env_vars, dict): + return ( + env_vars + if GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY in env_vars + else env_vars | {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "true"} + ) + if isinstance(env_vars, list) or isinstance(env_vars, tuple): + if GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY not in os.environ: + os.environ[GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY] = "true" + + if isinstance(env_vars, list): + return env_vars + [GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY] + else: + return env_vars + (GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY,) + + raise TypeError( + f"env_vars must be a list, tuple or a dict, but got {type(env_vars)}." + ) + + def _validate_requirements_or_raise( *, agent_engine: _AgentEngineInterface, @@ -1309,7 +1388,7 @@ def _generate_deployment_spec_or_raise( ) else: raise TypeError( - f"env_vars must be a list or a dict, but got {type(env_vars)}." + f"env_vars must be a list, tuple or a dict, but got {type(env_vars)}." ) if deployment_spec.env: update_masks.append("spec.deployment_spec.env")