From c99b3245a79acee2a72db07b8920856c97abfe60 Mon Sep 17 00:00:00 2001 From: A Vertex SDK engineer Date: Fri, 14 Nov 2025 15:33:04 -0800 Subject: [PATCH] This CL introduces the convenience method to_a2a enabling easier deployment of agents as a2a agents. feat: to_a2a on Agent Engine PiperOrigin-RevId: 832480210 --- .../reasoning_engines/templates/a2a.py | 182 ++++++++++++++++++ .../reasoning_engines/templates/adk.py | 9 + .../reasoning_engines/templates/langgraph.py | 9 + 3 files changed, 200 insertions(+) diff --git a/vertexai/preview/reasoning_engines/templates/a2a.py b/vertexai/preview/reasoning_engines/templates/a2a.py index 48e59cd55e..cdbcab5e84 100644 --- a/vertexai/preview/reasoning_engines/templates/a2a.py +++ b/vertexai/preview/reasoning_engines/templates/a2a.py @@ -151,6 +151,188 @@ async def cancel( ) +def to_a2a( + agent_engine_app: object, + agent_card: "AgentCard", + user_id: str = None, + task_store_builder: Callable[..., "TaskStore"] = None, + task_store_kwargs: Optional[Mapping[str, Any]] = None, + agent_executor_kwargs: Optional[Mapping[str, Any]] = None, + agent_executor_builder: Optional[Callable[..., "AgentExecutor"]] = None, + request_handler_kwargs: Optional[Mapping[str, Any]] = None, + request_handler_builder: Optional[Callable[..., "RequestHandler"]] = None, + extended_agent_card: "AgentCard" = None, +): + """Converts an existing Agent Engine application to be compatible with A2A. + + This function wraps an `agent_engine_app` with A2A functionalities, allowing it + to handle A2A protocol requests. It augments the app's setup and operation + registration to include A2A-specific handlers. + + Args: + agent_engine_app (object): The Agent Engine application instance. + agent_card (AgentCard): The AgentCard describing the agent. + user_id (str): The user ID. + task_store_builder (Callable[..., TaskStore], optional): A callable to build the + TaskStore. + task_store_kwargs (Optional[Mapping[str, Any]], optional): + Keyword arguments for the TaskStore builder. + agent_executor_kwargs (Optional[Mapping[str, Any]], optional): + Keyword arguments for the AgentExecutor builder. + agent_executor_builder (Optional[Callable[..., AgentExecutor]], optional): + A callable to build the AgentExecutor. + If not provided, a default `AdkAgentExecutor` will be used. + request_handler_kwargs (Optional[Mapping[str, Any]], optional): + Keyword arguments for the RequestHandler builder. + request_handler_builder (Optional[Callable[..., RequestHandler]], optional): + A callable to build the RequestHandler. + extended_agent_card (AgentCard, optional): An extended AgentCard. + + Returns: + object: The augmented Agent Engine application instance. + """ + from a2a.server.agent_execution import AgentExecutor, RequestContext + from a2a.server.events import EventQueue + from a2a.server.tasks import TaskUpdater + from google.genai import types + from a2a.types import ( + TextPart, + FilePart, + FileWithBytes, + FileWithUri, + Part, + ) + + class DefaultAgentExecutor(AgentExecutor): + """Agent Executor for adapting an AE application to the A2A protocol.""" + + def __init__(self, adk_app_instance): + self.adk_app = adk_app_instance + + def convert_genai_parts_to_a2a( + self, parts: list[types.Part] + ) -> list[Part]: + """Convert a list of Google Gen AI Part types into a list of A2A Part types.""" + return [ + self.convert_genai_part_to_a2a(part) + for part in parts + if (part.text or part.file_data or part.inline_data) + ] + + def convert_genai_part_to_a2a(self, part: types.Part) -> Part: + """Convert a single Google Gen AI Part type into an A2A Part type.""" + if part.text: + return TextPart(text=part.text) + if part.file_data: + return FilePart( + file=FileWithUri( + uri=part.file_data.file_uri, + mime_type=part.file_data.mime_type, + ) + ) + if part.inline_data: + return Part( + root=FilePart( + file=FileWithBytes( + bytes=part.inline_data.data, + mime_type=part.inline_data.mime_type, + ) + ) + ) + raise ValueError(f"Unsupported part type: {part}") + + async def execute( + self, context: RequestContext, event_queue: EventQueue + ) -> None: + from google.adk.events.event import Event + + query = context.get_user_input() + + updater = TaskUpdater( + event_queue, context.task_id, context.context_id + ) + + if not context.current_task: + await updater.submit() + + await updater.start_work() + + parts = [] + for stream_event in self.adk_app.stream_query( + user_id=user_id, message=query + ): + event = Event(**stream_event) + parts.extend( + self.convert_genai_parts_to_a2a(event.content.parts) + ) + + await updater.add_artifact( + parts, + name="result", + ) + await updater.complete() + + async def cancel( + self, context: RequestContext, event_queue: EventQueue + ) -> None: + raise Exception("Cancel not supported for this ADK agent") + + if agent_executor_builder: + agent_executor_kwargs["adk_app_instance"] = agent_engine_app + a2a_mixin = A2aAgent( + agent_card=agent_card, + task_store_builder=task_store_builder, + task_store_kwargs=task_store_kwargs, + agent_executor_builder=agent_executor_builder, + agent_executor_kwargs=agent_executor_kwargs, + request_handler_builder=request_handler_builder, + request_handler_kwargs=request_handler_kwargs, + extended_agent_card=extended_agent_card, + ) + else: + a2a_mixin = A2aAgent( + agent_card=agent_card, + agent_executor_builder=DefaultAgentExecutor, + agent_executor_kwargs={"adk_app_instance": agent_engine_app}, + ) + + def augmented_register_operations(): + routes = agent_engine_app.register_operations() + a2a_routes = a2a_mixin.register_operations() + for group, ops in a2a_routes.items(): + if group not in routes: + routes[group] = [] + for op in ops: + if op not in routes[group]: + routes[group].append(op) + return routes + + original_setup = agent_engine_app.set_up + + def augmented_setup(): + original_setup() + a2a_mixin.set_up() + + agent_engine_app.set_up = augmented_setup + + agent_engine_app.register_operations = augmented_register_operations + agent_engine_app.a2a_mixin = a2a_mixin + + _original_getattr = agent_engine_app.__class__.__getattribute__ + + def __getattr__(instance, name): + try: + return _original_getattr(instance, name) + except AttributeError: + if hasattr(instance.a2a_mixin, name): + return getattr(instance.a2a_mixin, name) + raise + + agent_engine_app.__class__.__getattr__ = __getattr__ + + return agent_engine_app + + class A2aAgent: """A class to initialize and set up an Agent-to-Agent application.""" diff --git a/vertexai/preview/reasoning_engines/templates/adk.py b/vertexai/preview/reasoning_engines/templates/adk.py index 38065d1cbc..ac8c10b6fe 100644 --- a/vertexai/preview/reasoning_engines/templates/adk.py +++ b/vertexai/preview/reasoning_engines/templates/adk.py @@ -1534,3 +1534,12 @@ def _warn_if_telemetry_api_disabled(self): r = session.post("https://telemetry.googleapis.com/v1/traces", data=None) if "Telemetry API has not been used in project" in r.text: _warn(_TELEMETRY_API_DISABLED_WARNING % (project, project)) + + def to_a2a(self, agent_card: "AgentCard"): + """Converts an existing ADK application to be compatible with A2A.""" + + from vertexai.preview.reasoning_engines.templates import a2a + + return a2a.to_a2a( + agent_engine_app=self, agent_card=agent_card, user_id="123" + ) diff --git a/vertexai/preview/reasoning_engines/templates/langgraph.py b/vertexai/preview/reasoning_engines/templates/langgraph.py index dcc711c867..9daf4b4808 100644 --- a/vertexai/preview/reasoning_engines/templates/langgraph.py +++ b/vertexai/preview/reasoning_engines/templates/langgraph.py @@ -658,3 +658,12 @@ def register_operations(self) -> Mapping[str, Sequence[str]]: "": ["query", "get_state", "update_state"], "stream": ["stream_query", "get_state_history"], } + + def to_a2a(self, agent_card: "AgentCard"): + """Converts an existing Langraph application to be compatible with A2A.""" + + from vertexai.preview.reasoning_engines.templates import a2a + + return a2a.to_a2a( + agent_engine_app=self, agent_card=agent_card + )