Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 228 additions & 0 deletions tests/unit/vertex_adk/test_agent_engine_templates_adk.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,25 @@
import importlib
import json
import os
import cloudpickle
import sys
from unittest import mock
from typing import Optional

from google import auth
from google.auth import credentials as auth_credentials
from google.cloud import storage
import vertexai
from google.cloud import aiplatform
from google.cloud.aiplatform_v1 import types as aip_types
from google.cloud.aiplatform_v1.services import reasoning_engine_service
from google.cloud.aiplatform import base
from google.cloud.aiplatform import initializer
from vertexai.agent_engines import _utils
from vertexai import agent_engines
from vertexai.agent_engines.templates import adk as adk_template
from vertexai.agent_engines import _agent_engines
from google.api_core import operation as ga_operation
from google.genai import types
import pytest
import uuid
Expand Down Expand Up @@ -75,6 +86,52 @@ def __init__(self, name: str, model: str):
"streaming_mode": "sse",
"max_llm_calls": 500,
}
_TEST_STAGING_BUCKET = "gs://test-bucket"
_TEST_CREDENTIALS = mock.Mock(spec=auth_credentials.AnonymousCredentials())
_TEST_PARENT = f"projects/{_TEST_PROJECT}/locations/{_TEST_LOCATION}"
_TEST_RESOURCE_ID = "1028944691210842416"
_TEST_AGENT_ENGINE_RESOURCE_NAME = (
f"{_TEST_PARENT}/reasoningEngines/{_TEST_RESOURCE_ID}"
)
_TEST_AGENT_ENGINE_DISPLAY_NAME = "Agent Engine Display Name"
_TEST_GCS_DIR_NAME = _agent_engines._DEFAULT_GCS_DIR_NAME
_TEST_BLOB_FILENAME = _agent_engines._BLOB_FILENAME
_TEST_REQUIREMENTS_FILE = _agent_engines._REQUIREMENTS_FILE
_TEST_EXTRA_PACKAGES_FILE = _agent_engines._EXTRA_PACKAGES_FILE
_TEST_AGENT_ENGINE_GCS_URI = "{}/{}/{}".format(
_TEST_STAGING_BUCKET,
_TEST_GCS_DIR_NAME,
_TEST_BLOB_FILENAME,
)
_TEST_AGENT_ENGINE_DEPENDENCY_FILES_GCS_URI = "{}/{}/{}".format(
_TEST_STAGING_BUCKET,
_TEST_GCS_DIR_NAME,
_TEST_EXTRA_PACKAGES_FILE,
)
_TEST_AGENT_ENGINE_REQUIREMENTS_GCS_URI = "{}/{}/{}".format(
_TEST_STAGING_BUCKET,
_TEST_GCS_DIR_NAME,
_TEST_REQUIREMENTS_FILE,
)
_TEST_AGENT_ENGINE_PACKAGE_SPEC = aip_types.ReasoningEngineSpec.PackageSpec(
python_version=f"{sys.version_info.major}.{sys.version_info.minor}",
pickle_object_gcs_uri=_TEST_AGENT_ENGINE_GCS_URI,
dependency_files_gcs_uri=_TEST_AGENT_ENGINE_DEPENDENCY_FILES_GCS_URI,
requirements_gcs_uri=_TEST_AGENT_ENGINE_REQUIREMENTS_GCS_URI,
)
_ADK_AGENT_FRAMEWORK = adk_template.AdkApp.agent_framework
_TEST_AGENT_ENGINE_OBJ = aip_types.ReasoningEngine(
name=_TEST_AGENT_ENGINE_RESOURCE_NAME,
display_name=_TEST_AGENT_ENGINE_DISPLAY_NAME,
spec=aip_types.ReasoningEngineSpec(
package_spec=_TEST_AGENT_ENGINE_PACKAGE_SPEC,
agent_framework=_ADK_AGENT_FRAMEWORK,
),
)

GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY = (
"GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY"
)


@pytest.fixture(scope="module")
Expand Down Expand Up @@ -727,3 +784,174 @@ async def test_async_stream_query_invalid_message_type(self):
):
async for _ in app.async_stream_query(user_id=_TEST_USER_ID, message=123):
pass


@pytest.fixture(scope="module")
def create_agent_engine_mock():
with mock.patch.object(
reasoning_engine_service.ReasoningEngineServiceClient,
"create_reasoning_engine",
) as create_agent_engine_mock:
create_agent_engine_lro_mock = mock.Mock(ga_operation.Operation)
create_agent_engine_lro_mock.result.return_value = _TEST_AGENT_ENGINE_OBJ
create_agent_engine_mock.return_value = create_agent_engine_lro_mock
yield create_agent_engine_mock


@pytest.fixture(scope="module")
def get_agent_engine_mock():
with mock.patch.object(
reasoning_engine_service.ReasoningEngineServiceClient,
"get_reasoning_engine",
) as get_agent_engine_mock:
api_client_mock = mock.Mock()
api_client_mock.get_reasoning_engine.return_value = _TEST_AGENT_ENGINE_OBJ
get_agent_engine_mock.return_value = api_client_mock
yield get_agent_engine_mock


@pytest.fixture(scope="module")
def cloud_storage_create_bucket_mock():
with mock.patch.object(storage, "Client") as cloud_storage_mock:
bucket_mock = mock.Mock(spec=storage.Bucket)
bucket_mock.blob.return_value.open.return_value = "blob_file"
bucket_mock.blob.return_value.upload_from_filename.return_value = None
bucket_mock.blob.return_value.upload_from_string.return_value = None

cloud_storage_mock.get_bucket = mock.Mock(
side_effect=ValueError("bucket not found")
)
cloud_storage_mock.bucket.return_value = bucket_mock
cloud_storage_mock.create_bucket.return_value = bucket_mock

yield cloud_storage_mock


@pytest.fixture(scope="module")
def cloudpickle_dump_mock():
with mock.patch.object(cloudpickle, "dump") as cloudpickle_dump_mock:
yield cloudpickle_dump_mock


@pytest.fixture(scope="module")
def cloudpickle_load_mock():
with mock.patch.object(cloudpickle, "load") as cloudpickle_load_mock:
yield cloudpickle_load_mock


@pytest.fixture(scope="function")
def get_gca_resource_mock():
with mock.patch.object(
base.VertexAiResourceNoun,
"_get_gca_resource",
) as get_gca_resource_mock:
get_gca_resource_mock.return_value = _TEST_AGENT_ENGINE_OBJ
yield get_gca_resource_mock


# Function scope is required for the pytest parameterized tests.
@pytest.fixture(scope="function")
def update_agent_engine_mock():
with mock.patch.object(
reasoning_engine_service.ReasoningEngineServiceClient,
"update_reasoning_engine",
) as update_agent_engine_mock:
yield update_agent_engine_mock


@pytest.mark.usefixtures("google_auth_mock")
class TestAgentEngines:
def setup_method(self):
importlib.reload(initializer)
importlib.reload(aiplatform)
aiplatform.init(
project=_TEST_PROJECT,
location=_TEST_LOCATION,
credentials=_TEST_CREDENTIALS,
staging_bucket=_TEST_STAGING_BUCKET,
)

def teardown_method(self):
initializer.global_pool.shutdown(wait=True)

@pytest.mark.parametrize(
"env_vars,expected_env_vars",
[
({}, {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "true"}),
(None, {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "true"}),
(
{"some_env": "some_val"},
{
"some_env": "some_val",
GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "true",
},
),
(
{GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "false"},
{GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "false"},
),
],
)
def test_create_default_telemetry_enablement(
self,
create_agent_engine_mock: mock.Mock,
cloud_storage_create_bucket_mock: mock.Mock,
cloudpickle_dump_mock: mock.Mock,
cloudpickle_load_mock: mock.Mock,
get_gca_resource_mock: mock.Mock,
env_vars: dict[str, str],
expected_env_vars: dict[str, str],
):
agent_engines.create(
agent_engine=agent_engines.AdkApp(agent=_TEST_AGENT),
env_vars=env_vars,
)
create_agent_engine_mock.assert_called_once()
deployment_spec = create_agent_engine_mock.call_args.kwargs[
"reasoning_engine"
].spec.deployment_spec
assert _utils.to_dict(deployment_spec)["env"] == [
{"name": key, "value": value} for key, value in expected_env_vars.items()
]

@pytest.mark.parametrize(
"env_vars,expected_env_vars",
[
({}, {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "true"}),
(None, {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "true"}),
(
{"some_env": "some_val"},
{
"some_env": "some_val",
GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "true",
},
),
(
{GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "false"},
{GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "false"},
),
],
)
def test_update_default_telemetry_enablement(
self,
update_agent_engine_mock: mock.Mock,
cloud_storage_create_bucket_mock: mock.Mock,
cloudpickle_dump_mock: mock.Mock,
cloudpickle_load_mock: mock.Mock,
get_gca_resource_mock: mock.Mock,
get_agent_engine_mock: mock.Mock,
env_vars: dict[str, str],
expected_env_vars: dict[str, str],
):
agent_engines.update(
resource_name=_TEST_AGENT_ENGINE_RESOURCE_NAME,
description="foobar", # avoid "At least one of ... must be specified" errors.
env_vars=env_vars,
)
update_agent_engine_mock.assert_called_once()
deployment_spec = update_agent_engine_mock.call_args.kwargs[
"request"
].reasoning_engine.spec.deployment_spec
assert _utils.to_dict(deployment_spec)["env"] == [
{"name": key, "value": value} for key, value in expected_env_vars.items()
]
2 changes: 1 addition & 1 deletion tests/unit/vertex_langchain/test_agent_engines.py
Original file line number Diff line number Diff line change
Expand Up @@ -3260,7 +3260,7 @@ def test_create_agent_engine_with_invalid_type_env_var(
"TEST_ENV_VAR": 0.01, # should be a string or dict or SecretRef
},
)
with pytest.raises(TypeError, match="env_vars must be a list or a dict"):
with pytest.raises(TypeError, match="env_vars must be a list, tuple or a dict"):
agent_engines.create(
self.test_agent,
display_name=_TEST_AGENT_ENGINE_DISPLAY_NAME,
Expand Down
47 changes: 47 additions & 0 deletions tests/unit/vertexai/genai/test_agent_engines.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from google.cloud import aiplatform
import vertexai
from google.cloud.aiplatform import initializer
from vertexai.agent_engines.templates import adk
from vertexai._genai import _agent_engines_utils
from vertexai._genai import agent_engines
from vertexai._genai import types as _genai_types
Expand All @@ -40,6 +41,9 @@


_TEST_AGENT_FRAMEWORK = "test-agent-framework"
GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY = (
"GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY"
)


class CapitalizeEngine:
Expand Down Expand Up @@ -848,6 +852,49 @@ def test_create_agent_engine_config_lightweight(self, mock_prepare):
"description": _TEST_AGENT_ENGINE_DESCRIPTION,
}

@mock.patch.object(_agent_engines_utils, "_prepare")
@pytest.mark.parametrize(
"env_vars,expected_env_vars",
[
({}, {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "true"}),
(None, {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "true"}),
(
{"some_env": "some_val"},
{
"some_env": "some_val",
GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "true",
},
),
(
{GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "false"},
{GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "false"},
),
],
)
def test_agent_engine_adk_telemetry_enablement(
self,
mock_prepare: mock.Mock,
env_vars: dict[str, str],
expected_env_vars: dict[str, str],
):
agent = mock.Mock(spec=adk.AdkApp)
agent.clone = lambda: agent
agent.register_operations = lambda: {}

config = self.client.agent_engines._create_config(
mode="create",
agent=agent,
staging_bucket=_TEST_STAGING_BUCKET,
display_name=_TEST_AGENT_ENGINE_DISPLAY_NAME,
description=_TEST_AGENT_ENGINE_DESCRIPTION,
env_vars=env_vars,
)
assert config["display_name"] == _TEST_AGENT_ENGINE_DISPLAY_NAME
assert config["description"] == _TEST_AGENT_ENGINE_DESCRIPTION
assert config["spec"]["deployment_spec"]["env"] == [
{"name": key, "value": value} for key, value in expected_env_vars.items()
]

@mock.patch.object(_agent_engines_utils, "_prepare")
def test_create_agent_engine_config_full(self, mock_prepare):
config = self.client.agent_engines._create_config(
Expand Down
47 changes: 47 additions & 0 deletions vertexai/_genai/_agent_engines_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1845,3 +1845,50 @@ def _validate_resource_limits_or_raise(resource_limits: dict[str, str]) -> None:
f"Memory size of {memory_str} requires at least {min_cpu} CPUs."
f" Got {cpu}"
)


def _is_adk_agent(agent_engine: _AgentEngineInterface) -> bool:
"""Checks if the agent engine is an ADK agent.

Args:
agent_engine: The agent engine to check.

Returns:
True if the agent engine is an ADK agent, False otherwise.
"""

from vertexai.agent_engines.templates import adk

return isinstance(agent_engine, adk.AdkApp)


def _add_telemetry_enablement_env(
env_vars: Optional[Dict[str, Union[str, Any]]]
) -> Optional[Dict[str, Union[str, Any]]]:
"""Adds telemetry enablement env var to the env vars.

This is in order to achieve default-on telemetry.
If the telemetry enablement env var is already set, we do not override it.

Args:
env_vars: The env vars to add the telemetry enablement env var to.

Returns:
The env vars with the telemetry enablement env var added.
"""

GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY = (
"GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY"
)
env_to_add = {GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY: "true"}

if env_vars is None:
return env_to_add

if not isinstance(env_vars, dict):
raise TypeError(f"env_vars must be a dict, but got {type(env_vars)}.")

if GOOGLE_CLOUD_AGENT_ENGINE_ENABLE_TELEMETRY in env_vars:
return env_vars

return env_vars | env_to_add
2 changes: 2 additions & 0 deletions vertexai/_genai/agent_engines.py
Original file line number Diff line number Diff line change
Expand Up @@ -1032,6 +1032,8 @@ def _create_config(
raise ValueError("location must be set using `vertexai.Client`.")
gcs_dir_name = gcs_dir_name or _agent_engines_utils._DEFAULT_GCS_DIR_NAME
agent = _agent_engines_utils._validate_agent_or_raise(agent=agent)
if _agent_engines_utils._is_adk_agent(agent):
env_vars = _agent_engines_utils._add_telemetry_enablement_env(env_vars)
staging_bucket = _agent_engines_utils._validate_staging_bucket_or_raise(
staging_bucket=staging_bucket,
)
Expand Down
Loading
Loading