diff --git a/tests/unit/vertexai/genai/test_evals.py b/tests/unit/vertexai/genai/test_evals.py index 5882d5a990..920868daa8 100644 --- a/tests/unit/vertexai/genai/test_evals.py +++ b/tests/unit/vertexai/genai/test_evals.py @@ -1247,6 +1247,143 @@ def test_run_inference_with_agent_engine_with_response_column_raises_error( "'intermediate_events' or 'response' columns" ) in str(excinfo.value) + @mock.patch.object(_evals_utils, "EvalDatasetLoader") + @mock.patch( + "vertexai._genai._evals_common.InMemorySessionService" + ) + @mock.patch("vertexai._genai._evals_common.Runner") + @mock.patch("vertexai._genai._evals_common.LlmAgent") + def test_run_inference_with_local_agent( + self, + mock_llm_agent, + mock_runner, + mock_session_service, + mock_eval_dataset_loader, + ): + mock_df = pd.DataFrame( + { + "prompt": ["agent prompt", "agent prompt 2"], + "session_inputs": [ + { + "user_id": "123", + "state": {"a": "1"}, + }, + { + "user_id": "456", + "state": {"b": "2"}, + }, + ], + } + ) + mock_eval_dataset_loader.return_value.load.return_value = mock_df.to_dict( + orient="records" + ) + + mock_agent_instance = mock.Mock() + mock_llm_agent.return_value = mock_agent_instance + mock_session_service.return_value.create_session = mock.AsyncMock() + mock_runner_instance = mock_runner.return_value + stream_run_return_value_1 = [ + mock.Mock( + model_dump=lambda: { + "id": "1", + "content": {"parts": [{"text": "intermediate1"}]}, + "timestamp": 123, + "author": "model", + } + ), + mock.Mock( + model_dump=lambda: { + "id": "2", + "content": {"parts": [{"text": "agent response"}]}, + "timestamp": 124, + "author": "model", + } + ), + ] + stream_run_return_value_2 = [ + mock.Mock( + model_dump=lambda: { + "id": "3", + "content": {"parts": [{"text": "intermediate2"}]}, + "timestamp": 125, + "author": "model", + } + ), + mock.Mock( + model_dump=lambda: { + "id": "4", + "content": {"parts": [{"text": "agent response 2"}]}, + "timestamp": 126, + "author": "model", + } + ), + ] + + async def async_iterator(items): + for item in items: + yield item + + mock_runner_instance.run_async.side_effect = [ + async_iterator(stream_run_return_value_1), + async_iterator(stream_run_return_value_2), + ] + + inference_result = self.client.evals.run_inference( + agent=mock_agent_instance, + src=mock_df, + ) + + mock_eval_dataset_loader.return_value.load.assert_called_once_with(mock_df) + assert mock_session_service.call_count == 2 + mock_runner.assert_called_with( + agent=mock_agent_instance, + app_name="local agent run", + session_service=mock_session_service.return_value, + ) + assert mock_runner.call_count == 2 + assert mock_runner_instance.run_async.call_count == 2 + + pd.testing.assert_frame_equal( + inference_result.eval_dataset_df, + pd.DataFrame( + { + "prompt": ["agent prompt", "agent prompt 2"], + "session_inputs": [ + { + "user_id": "123", + "state": {"a": "1"}, + }, + { + "user_id": "456", + "state": {"b": "2"}, + }, + ], + "intermediate_events": [ + [ + { + "event_id": "1", + "content": {"parts": [{"text": "intermediate1"}]}, + "creation_timestamp": 123, + "author": "model", + } + ], + [ + { + "event_id": "3", + "content": {"parts": [{"text": "intermediate2"}]}, + "creation_timestamp": 125, + "author": "model", + } + ], + ], + "response": ["agent response", "agent response 2"], + } + ), + ) + assert inference_result.candidate_name is None + assert inference_result.gcs_source is None + def test_run_inference_with_litellm_string_prompt_format( self, mock_api_client_fixture, @@ -1599,6 +1736,7 @@ def test_run_agent_internal_success(self, mock_run_agent): result_df = _evals_common._run_agent_internal( api_client=mock_api_client, agent_engine=mock_agent_engine, + agent=None, prompt_dataset=prompt_dataset, ) @@ -1629,6 +1767,7 @@ def test_run_agent_internal_error_response(self, mock_run_agent): result_df = _evals_common._run_agent_internal( api_client=mock_api_client, agent_engine=mock_agent_engine, + agent=None, prompt_dataset=prompt_dataset, ) @@ -1655,6 +1794,7 @@ def test_run_agent_internal_malformed_event(self, mock_run_agent): result_df = _evals_common._run_agent_internal( api_client=mock_api_client, agent_engine=mock_agent_engine, + agent=None, prompt_dataset=prompt_dataset, ) assert "response" in result_df.columns @@ -4984,7 +5124,9 @@ def test_execute_evaluation_adds_creation_timestamp( frozenset(["summarization_quality"]), ) @mock.patch("time.sleep", return_value=None) - @mock.patch("vertexai._genai.evals.Evals._evaluate_instances") + @mock.patch( + "vertexai._genai.evals.Evals._evaluate_instances" + ) def test_predefined_metric_retry_on_resource_exhausted( self, mock_private_evaluate_instances, @@ -5037,7 +5179,9 @@ def test_predefined_metric_retry_on_resource_exhausted( frozenset(["summarization_quality"]), ) @mock.patch("time.sleep", return_value=None) - @mock.patch("vertexai._genai.evals.Evals._evaluate_instances") + @mock.patch( + "vertexai._genai.evals.Evals._evaluate_instances" + ) def test_predefined_metric_retry_fail_on_resource_exhausted( self, mock_private_evaluate_instances, diff --git a/vertexai/_genai/_evals_common.py b/vertexai/_genai/_evals_common.py index ed85715986..a7ff4a432c 100644 --- a/vertexai/_genai/_evals_common.py +++ b/vertexai/_genai/_evals_common.py @@ -17,12 +17,14 @@ import base64 import collections import concurrent.futures +import contextlib import datetime import json import logging import os import threading import time +import uuid from typing import Any, Callable, Literal, Optional, Union from google.api_core import exceptions as api_exceptions @@ -48,12 +50,38 @@ except ImportError: litellm = None # type: ignore[assignment] +try: + from google.adk.agents import LlmAgent + from google.adk.runners import Runner + from google.adk.sessions import InMemorySessionService + from google.adk.plugins.reflect_retry_tool_plugin import ReflectAndRetryToolPlugin +except ImportError: + LlmAgent = None # type: ignore[assignment] + Runner = None # type: ignore[assignment] + InMemorySessionService = None # type: ignore[assignment] + ReflectAndRetryToolPlugin = None # type: ignore[assignment] + logger = logging.getLogger(__name__) _thread_local_data = threading.local() MAX_WORKERS = 100 AGENT_MAX_WORKERS = 10 +CONTENT = _evals_constant.CONTENT +PARTS = _evals_constant.PARTS +USER_AUTHOR = _evals_constant.USER_AUTHOR + + +@contextlib.contextmanager +def _temp_logger_level(logger_name: str, level: int): + """Temporarily sets the level of a logger.""" + logger_instance = logging.getLogger(logger_name) + original_level = logger_instance.getEffectiveLevel() + logger_instance.setLevel(level) + try: + yield + finally: + logger_instance.setLevel(original_level) def _get_agent_engine_instance( @@ -216,6 +244,7 @@ def _execute_inference_concurrently( gemini_config: Optional[genai_types.GenerateContentConfig] = None, inference_fn: Optional[Callable[..., Any]] = None, agent_engine: Optional[Union[str, types.AgentEngine]] = None, + agent: Optional[LlmAgent] = None, ) -> list[ Union[genai_types.GenerateContentResponse, dict[str, Any], list[dict[str, Any]]] ]: @@ -244,7 +273,7 @@ def _execute_inference_concurrently( f" Found: {prompt_dataset.columns.tolist()}" ) - max_workers = AGENT_MAX_WORKERS if agent_engine else MAX_WORKERS + max_workers = AGENT_MAX_WORKERS if agent_engine or agent else MAX_WORKERS with tqdm(total=len(prompt_dataset), desc=progress_desc) as pbar: with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: for index, row in prompt_dataset.iterrows(): @@ -261,35 +290,42 @@ def _execute_inference_concurrently( pbar.update(1) continue - if agent_engine: + if agent_engine or agent: def agent_run_wrapper( # type: ignore[no-untyped-def] row_arg, contents_arg, - agent_engine, + agent_engine_arg, + agent_arg, inference_fn_arg, api_client_arg, ) -> Any: - if isinstance(agent_engine, str): - agent_engine_instance = _get_agent_engine_instance( - agent_engine, api_client_arg + if agent_engine_arg: + if isinstance(agent_engine_arg, str): + agent_engine_instance = _get_agent_engine_instance( + agent_engine_arg, api_client_arg + ) + else: + agent_engine_instance = agent_engine_arg + + return inference_fn_arg( + row=row_arg, + contents=contents_arg, + agent_engine=agent_engine_instance, + ) + elif agent_arg: + return inference_fn_arg( + row=row_arg, + contents=contents_arg, + agent=agent_arg, ) - elif ( - hasattr(agent_engine, "api_client") - and type(agent_engine).__name__ == "AgentEngine" - ): - agent_engine_instance = agent_engine - return inference_fn_arg( - row=row_arg, - contents=contents_arg, - agent_engine=agent_engine_instance, - ) future = executor.submit( agent_run_wrapper, row, contents, agent_engine, + agent, inference_fn, api_client, ) @@ -378,14 +414,14 @@ def _convert_prompt_row_to_litellm_messages(row: pd.Series) -> list[dict[str, An request_body.get("contents"), list ): for content in request_body["contents"]: - role = content.get("role", "user") + role = content.get("role", USER_AUTHOR) text_parts = [part.get("text", "") for part in content.get("parts", [])] messages.append({"role": role, "content": " ".join(text_parts)}) return messages # Case 4: Fallback to a simple 'prompt' key with a raw string. elif "prompt" in row_dict and isinstance(row_dict.get("prompt"), str): - return [{"role": "user", "content": row_dict["prompt"]}] + return [{"role": USER_AUTHOR, "content": row_dict["prompt"]}] raise ValueError( "Could not determine prompt/messages format from input row. " @@ -712,6 +748,7 @@ def _execute_inference( src: Union[str, pd.DataFrame], model: Optional[Union[Callable[[Any], Any], str]] = None, agent_engine: Optional[Union[str, types.AgentEngine]] = None, + agent: Optional[LlmAgent] = None, dest: Optional[str] = None, config: Optional[genai_types.GenerateContentConfig] = None, prompt_template: Optional[Union[str, types.PromptTemplateOrDict]] = None, @@ -726,6 +763,7 @@ def _execute_inference( string representing a model. agent_engine: The agent engine to use for inference. Can be a resource name string or an `AgentEngine` instance. + agent: The local agent to use for inference. Can be an ADK agent instance. dest: The destination to save the inference results. Can be a string representing a file path or a GCS URI. config: The generation configuration for the model. @@ -736,6 +774,12 @@ def _execute_inference( """ if not api_client: raise ValueError("'api_client' instance must be provided.") + + if sum(x is not None for x in [model, agent_engine, agent]) != 1: + raise ValueError( + "Exactly one of model, agent_engine, or agent must be provided." + ) + prompt_dataset = _load_dataframe(api_client, src) if prompt_template: logger.info("Applying prompt template...") @@ -768,10 +812,14 @@ def _execute_inference( eval_dataset_df=results_df, candidate_name=candidate_name, ) - elif agent_engine: - if not isinstance(agent_engine, str) and not ( - hasattr(agent_engine, "api_client") - and type(agent_engine).__name__ == "AgentEngine" + elif agent_engine or agent: + if ( + agent_engine + and not isinstance(agent_engine, str) + and not ( + hasattr(agent_engine, "api_client") + and type(agent_engine).__name__ == "AgentEngine" + ) ): raise TypeError( f"Unsupported agent_engine type: {type(agent_engine)}. Expecting a" @@ -794,6 +842,7 @@ def _execute_inference( results_df = _run_agent_internal( api_client=api_client, agent_engine=agent_engine, + agent=agent, prompt_dataset=prompt_dataset, ) end_time = time.time() @@ -803,7 +852,7 @@ def _execute_inference( eval_dataset_df=results_df, ) else: - raise ValueError("Either model or agent_engine must be provided.") + raise ValueError("Either model, agent_engine or agent must be provided.") if dest: file_name = "inference_results.jsonl" if model else "agent_run_results.jsonl" @@ -1175,14 +1224,35 @@ def _execute_evaluation( # type: ignore[no-untyped-def] return evaluation_result +def _get_session_inputs(row: pd.Series) -> types.evals.SessionInput: + """Parses session inputs from a row.""" + if isinstance(row["session_inputs"], str): + return types.evals.SessionInput.model_validate( + json.loads(row["session_inputs"]) + ) + elif isinstance(row["session_inputs"], dict): + return types.evals.SessionInput.model_validate(row["session_inputs"]) + elif isinstance(row["session_inputs"], types.evals.SessionInput): + return row["session_inputs"] + else: + raise TypeError( + f"Unsupported session_inputs type: {type(row['session_inputs'])}. " + "Expecting string or dict in types.evals.SessionInput format." + ) + + def _run_agent_internal( api_client: BaseApiClient, - agent_engine: Union[str, types.AgentEngine], + agent_engine: Optional[Union[str, types.AgentEngine]], + agent: Optional[LlmAgent], prompt_dataset: pd.DataFrame, ) -> pd.DataFrame: """Runs an agent.""" raw_responses = _run_agent( - api_client=api_client, agent_engine=agent_engine, prompt_dataset=prompt_dataset + api_client=api_client, + agent_engine=agent_engine, + agent=agent, + prompt_dataset=prompt_dataset, ) processed_intermediate_events = [] processed_responses = [] @@ -1251,20 +1321,33 @@ def _run_agent_internal( def _run_agent( api_client: BaseApiClient, - agent_engine: Union[str, types.AgentEngine], + agent_engine: Optional[Union[str, types.AgentEngine]], + agent: Optional[LlmAgent], prompt_dataset: pd.DataFrame, ) -> list[ Union[list[dict[str, Any]], dict[str, Any], genai_types.GenerateContentResponse] ]: """Internal helper to run inference using Gemini model with concurrency.""" - return _execute_inference_concurrently( - api_client=api_client, - agent_engine=agent_engine, - prompt_dataset=prompt_dataset, - progress_desc="Agent Run", - gemini_config=None, - inference_fn=_execute_agent_run_with_retry, - ) + if agent_engine: + return _execute_inference_concurrently( + api_client=api_client, + agent_engine=agent_engine, + prompt_dataset=prompt_dataset, + progress_desc="Agent Run", + gemini_config=None, + inference_fn=_execute_agent_run_with_retry, + ) + elif agent: + return _execute_inference_concurrently( + api_client=api_client, + agent=agent, + prompt_dataset=prompt_dataset, + progress_desc="Local Agent Run", + gemini_config=None, + inference_fn=_execute_local_agent_run_with_retry, + ) + else: + raise ValueError("Neither agent_engine nor agent is provided.") def _execute_agent_run_with_retry( @@ -1273,23 +1356,9 @@ def _execute_agent_run_with_retry( agent_engine: types.AgentEngine, max_retries: int = 3, ) -> Union[list[dict[str, Any]], dict[str, Any]]: - """Executes agent run for a single prompt.""" + """Executes agent run over agent engine for a single prompt.""" try: - if isinstance(row["session_inputs"], str): - session_inputs = types.evals.SessionInput.model_validate( - json.loads(row["session_inputs"]) - ) - elif isinstance(row["session_inputs"], dict): - session_inputs = types.evals.SessionInput.model_validate( - row["session_inputs"] - ) - elif isinstance(row["session_inputs"], types.evals.SessionInput): - session_inputs = row["session_inputs"] - else: - raise TypeError( - f"Unsupported session_inputs type: {type(row['session_inputs'])}. " - "Expecting string or dict in types.evals.SessionInput format." - ) + session_inputs = _get_session_inputs(row) user_id = session_inputs.user_id session_state = session_inputs.state session = agent_engine.create_session( # type: ignore[attr-defined] @@ -1308,7 +1377,7 @@ def _execute_agent_run_with_retry( session_id=session["id"], message=contents, ): - if event and "content" in event and "parts" in event["content"]: + if event and CONTENT in event and PARTS in event[CONTENT]: responses.append(event) return responses except api_exceptions.ResourceExhausted as e: @@ -1337,6 +1406,73 @@ def _execute_agent_run_with_retry( return {"error": f"Failed to get agent run results after {max_retries} retries"} +def _execute_local_agent_run_with_retry( + row: pd.Series, + contents: Union[genai_types.ContentListUnion, genai_types.ContentListUnionDict], + agent: LlmAgent, + max_retries: int = 3, +) -> Union[list[dict[str, Any]], dict[str, Any]]: + """Executes agent run locally for a single prompt synchronously.""" + return asyncio.run( + _execute_local_agent_run_with_retry_async(row, contents, agent, max_retries) + ) + + +async def _execute_local_agent_run_with_retry_async( + row: pd.Series, + contents: Union[genai_types.ContentListUnion, genai_types.ContentListUnionDict], + agent: LlmAgent, + max_retries: int = 3, +) -> Union[list[dict[str, Any]], dict[str, Any]]: + """Executes agent run locally for a single prompt asynchronously.""" + session_inputs = _get_session_inputs(row) + user_id = session_inputs.user_id + session_id = str(uuid.uuid4()) + app_name = session_inputs.app_name or "local agent run" + # TODO: Enable user to set up session service. + session_service = InMemorySessionService() + await session_service.create_session( + app_name=app_name, user_id=user_id, session_id=session_id + ) + plugins = [] + if ReflectAndRetryToolPlugin: + plugins.append(ReflectAndRetryToolPlugin(name="reflect_retry_tool_plugin")) + agent_runner = Runner( + agent=agent, + app_name=app_name, + session_service=session_service, + plugins=plugins, + ) + # Avoid printing out warning from agent_runner.run() + # WARNING:google_genai.types:Warning: there are non-text parts in the + # response: ['function_call'], returning concatenated text result from + # text parts. Check the full candidates.content.parts accessor to get + # the full model response. + with _temp_logger_level("google_genai.types", logging.ERROR): + try: + events = [] + new_message_content = genai_types.Content( + role=USER_AUTHOR, + parts=[genai_types.Part(text=contents)], + ) + async for event in agent_runner.run_async( + user_id=user_id, + session_id=session_id, + new_message=new_message_content, + ): + if event: + event = event.model_dump() + if event and CONTENT in event and PARTS in event[CONTENT]: + events.append(event) + return events + except Exception as e: # pylint: disable=broad-exception-caught + logger.error( + "Unexpected error during local agent run: %s", + e, + ) + return {"error": f"Failed during local agent run: {e}"} + + def _convert_gcs_to_evaluation_item_result( api_client: BaseApiClient, gcs_uri: str, @@ -1678,8 +1814,8 @@ def _create_evaluation_set_from_dataframe( and len(row[_evals_constant.INTERMEDIATE_EVENTS]) > 0 ): for event in row[_evals_constant.INTERMEDIATE_EVENTS]: - if "content" in event: - intermediate_events.append(event["content"]) + if CONTENT in event: + intermediate_events.append(event[CONTENT]) eval_item_requests.append( types.EvaluationItemRequest( prompt=( diff --git a/vertexai/_genai/_evals_constant.py b/vertexai/_genai/_evals_constant.py index a92c8a70dc..e2eadb4469 100644 --- a/vertexai/_genai/_evals_constant.py +++ b/vertexai/_genai/_evals_constant.py @@ -50,6 +50,9 @@ REFERENCE = "reference" SESSION_INPUT = "session_inputs" CONTEXT = "context" +CONTENT = "content" +PARTS = "parts" +USER_AUTHOR = "user" COMMON_DATASET_COLUMNS = frozenset( { @@ -58,5 +61,7 @@ REFERENCE, SESSION_INPUT, CONTEXT, + CONTENT, + PARTS, } ) diff --git a/vertexai/_genai/_evals_utils.py b/vertexai/_genai/_evals_utils.py index 36c94f02a4..1ffad4b078 100644 --- a/vertexai/_genai/_evals_utils.py +++ b/vertexai/_genai/_evals_utils.py @@ -19,6 +19,7 @@ import os from typing import Any, Optional, Union +from google.genai import types as genai_types from google.genai._api_client import BaseApiClient from google.genai._common import get_value_by_path as getv from google.genai._common import set_value_by_path as setv diff --git a/vertexai/_genai/evals.py b/vertexai/_genai/evals.py index 4d328df784..57b63da9eb 100644 --- a/vertexai/_genai/evals.py +++ b/vertexai/_genai/evals.py @@ -33,6 +33,11 @@ from . import _transformers as t from . import types +try: + from google.adk.agents import LlmAgent +except ImportError: + LlmAgent = None # type: ignore[assignment] + logger = logging.getLogger("vertexai_genai.evals") @@ -903,7 +908,7 @@ def run_inference( *, src: Union[str, pd.DataFrame, types.EvaluationDataset], model: Optional[Union[str, Callable[[Any], Any]]] = None, - agent: Optional[Union[str, types.AgentEngine]] = None, + agent: Optional[Union[str, types.AgentEngine, LlmAgent]] = None, config: Optional[types.EvalRunInferenceConfigOrDict] = None, ) -> types.EvaluationDataset: """Runs inference on a dataset for evaluation. @@ -923,11 +928,12 @@ def run_inference( - For custom logic, provide a callable function that accepts a prompt and returns a response. agent: This field is experimental and may change in future versions - The agent engine used to run agent, optional for non-agent evaluations. + The agent engine used or local agent to run agent, optional for non-agent evaluations. - agent engine resource name in str type, with format `projects/{project}/locations/{location}/reasoningEngines/{reasoning_engine_id}`, run_inference will fetch the agent engine from the resource name. - Or `types.AgentEngine` object. + - Or ADK agent in LlMAgent type. config: The optional configuration for the inference run. Must be a dict or `types.EvalRunInferenceConfig` type. - dest: The destination path for storage of the inference results. @@ -949,10 +955,19 @@ def run_inference( ) src = src.eval_dataset_df + agent_engine_instance = None + agent_instance = None + if agent: + if isinstance(agent, str) or isinstance(agent, types.AgentEngine): + agent_engine_instance = agent + else: + agent_instance = agent + return _evals_common._execute_inference( # type: ignore[no-any-return] api_client=self._api_client, model=model, - agent_engine=agent, + agent_engine=agent_engine_instance, + agent=agent_instance, src=src, dest=config.dest, config=config.generate_content_config, diff --git a/vertexai/_genai/types/evals.py b/vertexai/_genai/types/evals.py index b301270b3c..ce554d3110 100644 --- a/vertexai/_genai/types/evals.py +++ b/vertexai/_genai/types/evals.py @@ -420,6 +420,10 @@ class SessionInput(_common.BaseModel): state: Optional[dict[str, str]] = Field( default=None, description="""The state of the session.""" ) + app_name: Optional[str] = Field( + default=None, + description="""The name of the app, used for local ADK agent run Runner and Session.""", + ) class SessionInputDict(TypedDict, total=False): @@ -434,6 +438,9 @@ class SessionInputDict(TypedDict, total=False): state: Optional[dict[str, str]] """The state of the session.""" + app_name: Optional[str] + """The name of the app, used for local ADK agent run Runner and Session.""" + SessionInputOrDict = Union[SessionInput, SessionInputDict]