From 95eb10f7a9572553f2f238e95252619b8a340736 Mon Sep 17 00:00:00 2001 From: A Vertex SDK engineer Date: Wed, 3 Dec 2025 10:31:59 -0800 Subject: [PATCH] feat: GenAI SDK client(memory): Add PurgeMemories PiperOrigin-RevId: 839815069 --- .../test_generate_agent_engine_memories.py | 8 - .../test_purge_agent_engine_memories.py | 157 ++++++++++++ vertexai/_genai/memories.py | 234 +++++++++++++++++- vertexai/_genai/types/__init__.py | 20 ++ vertexai/_genai/types/common.py | 136 ++++++++++ 5 files changed, 545 insertions(+), 10 deletions(-) create mode 100644 tests/unit/vertexai/genai/replays/test_purge_agent_engine_memories.py diff --git a/tests/unit/vertexai/genai/replays/test_generate_agent_engine_memories.py b/tests/unit/vertexai/genai/replays/test_generate_agent_engine_memories.py index a22be6c44c..dbd6cc855e 100644 --- a/tests/unit/vertexai/genai/replays/test_generate_agent_engine_memories.py +++ b/tests/unit/vertexai/genai/replays/test_generate_agent_engine_memories.py @@ -23,10 +23,6 @@ def test_generate_and_rollback_memories(client): - # TODO(): Use prod endpoint once experiment is fully rolled out. - client._api_client._http_options.base_url = ( - "https://us-central1-autopush-aiplatform.sandbox.googleapis.com/" - ) agent_engine = client.agent_engines.create() assert not list( client.agent_engines.memories.list( @@ -161,10 +157,6 @@ def test_generate_memories_direct_memories_source(client): @pytest.mark.asyncio async def test_generate_and_rollback_memories_async(client): - # TODO(): Use prod endpoint once revisions experiment is fully rolled out. - client._api_client._http_options.base_url = ( - "https://us-central1-autopush-aiplatform.sandbox.googleapis.com/" - ) agent_engine = client.agent_engines.create() await client.aio.agent_engines.memories.generate( name=agent_engine.api_resource.name, diff --git a/tests/unit/vertexai/genai/replays/test_purge_agent_engine_memories.py b/tests/unit/vertexai/genai/replays/test_purge_agent_engine_memories.py new file mode 100644 index 0000000000..b4615b6b92 --- /dev/null +++ b/tests/unit/vertexai/genai/replays/test_purge_agent_engine_memories.py @@ -0,0 +1,157 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# pylint: disable=protected-access,bad-continuation,missing-function-docstring + +import pytest + + +from tests.unit.vertexai.genai.replays import pytest_helper + + +def test_purge_memories(client): + """Tests purging memories.""" + agent_engine = client.agent_engines.create() + try: + client.agent_engines.memories.create( + name=agent_engine.api_resource.name, + fact="memory_fact_1", + scope={"user_id": "123"}, + config={"wait_for_completion": True}, + ) + client.agent_engines.memories.create( + name=agent_engine.api_resource.name, + fact="memory_fact_2", + scope={"user_id": "123"}, + config={"wait_for_completion": True}, + ) + client.agent_engines.memories.create( + name=agent_engine.api_resource.name, + fact="memory_fact_3", + scope={"user_id": "456"}, + config={"wait_for_completion": True}, + ) + operation = client.agent_engines.memories.purge( + name=agent_engine.api_resource.name, + filter="scope.user_id=123", + config={"wait_for_completion": True}, + ) + assert operation.done + assert operation.response.purge_count == 2 + # Memories were not actually purged, because `force` was False. + assert ( + len( + list( + client.agent_engines.memories.list( + name=agent_engine.api_resource.name + ) + ) + ) + == 3 + ) + # Now, actually purge the memories. + operation = client.agent_engines.memories.purge( + name=agent_engine.api_resource.name, + filter="scope.user_id=123", + force=True, + config={"wait_for_completion": True}, + ) + assert operation.done + assert operation.response.purge_count == 2 + assert ( + len( + list( + client.agent_engines.memories.list( + name=agent_engine.api_resource.name + ) + ) + ) + == 1 + ) + finally: + client.agent_engines.delete(name=agent_engine.api_resource.name, force=True) + + +pytestmark = pytest_helper.setup( + file=__file__, + globals_for_file=globals(), + test_method="agent_engines.memories.purge", +) + + +pytest_plugins = ("pytest_asyncio",) + + +@pytest.mark.asyncio +async def test_purge_memories_async(client): + agent_engine = client.agent_engines.create() + try: + client.agent_engines.memories.create( + name=agent_engine.api_resource.name, + fact="memory_fact_1", + scope={"user_id": "123"}, + config={"wait_for_completion": True}, + ) + client.agent_engines.memories.create( + name=agent_engine.api_resource.name, + fact="memory_fact_2", + scope={"user_id": "123"}, + config={"wait_for_completion": True}, + ) + client.agent_engines.memories.create( + name=agent_engine.api_resource.name, + fact="memory_fact_3", + scope={"user_id": "456"}, + config={"wait_for_completion": True}, + ) + + operation = await client.aio.agent_engines.memories.purge( + name=agent_engine.api_resource.name, + filter="scope.user_id=123", + config={"wait_for_completion": True}, + ) + assert operation.done + assert operation.response.purge_count == 2 + # Memories were not actually purged, because `force` was False. + assert ( + len( + list( + client.agent_engines.memories.list( + name=agent_engine.api_resource.name + ) + ) + ) + == 3 + ) + # Now, actually purge the memories. + operation = await client.aio.agent_engines.memories.purge( + name=agent_engine.api_resource.name, + filter="scope.user_id=123", + force=True, + config={"wait_for_completion": True}, + ) + assert operation.done + assert operation.response.purge_count == 2 + assert ( + len( + list( + client.agent_engines.memories.list( + name=agent_engine.api_resource.name + ) + ) + ) + == 1 + ) + finally: + client.agent_engines.delete(name=agent_engine.api_resource.name, force=True) diff --git a/vertexai/_genai/memories.py b/vertexai/_genai/memories.py index 6e4e5f3b8c..c710e7a68e 100644 --- a/vertexai/_genai/memories.py +++ b/vertexai/_genai/memories.py @@ -287,6 +287,26 @@ def _ListAgentEngineMemoryRequestParameters_to_vertex( return to_object +def _PurgeAgentEngineMemoriesRequestParameters_to_vertex( + from_object: Union[dict[str, Any], object], + parent_object: Optional[dict[str, Any]] = None, +) -> dict[str, Any]: + to_object: dict[str, Any] = {} + if getv(from_object, ["name"]) is not None: + setv(to_object, ["_url", "name"], getv(from_object, ["name"])) + + if getv(from_object, ["filter"]) is not None: + setv(to_object, ["filter"], getv(from_object, ["filter"])) + + if getv(from_object, ["force"]) is not None: + setv(to_object, ["force"], getv(from_object, ["force"])) + + if getv(from_object, ["config"]) is not None: + setv(to_object, ["config"], getv(from_object, ["config"])) + + return to_object + + def _RetrieveAgentEngineMemoriesConfig_to_vertex( from_object: Union[dict[str, Any], object], parent_object: Optional[dict[str, Any]] = None, @@ -1019,6 +1039,65 @@ def _update( self._api_client._verify_response(return_value) return return_value + def _purge( + self, + *, + name: str, + filter: str, + force: Optional[bool] = None, + config: Optional[types.PurgeAgentEngineMemoriesConfigOrDict] = None, + ) -> types.AgentEnginePurgeMemoriesOperation: + """ + Purges memories from an Agent Engine. + """ + + parameter_model = types._PurgeAgentEngineMemoriesRequestParameters( + name=name, + filter=filter, + force=force, + config=config, + ) + + request_url_dict: Optional[dict[str, str]] + if not self._api_client.vertexai: + raise ValueError("This method is only supported in the Vertex AI client.") + else: + request_dict = _PurgeAgentEngineMemoriesRequestParameters_to_vertex( + parameter_model + ) + request_url_dict = request_dict.get("_url") + if request_url_dict: + path = "{name}/memories:purge".format_map(request_url_dict) + else: + path = "{name}/memories:purge" + + query_params = request_dict.get("_query") + if query_params: + path = f"{path}?{urlencode(query_params)}" + # TODO: remove the hack that pops config. + request_dict.pop("config", None) + + http_options: Optional[types.HttpOptions] = None + if ( + parameter_model.config is not None + and parameter_model.config.http_options is not None + ): + http_options = parameter_model.config.http_options + + request_dict = _common.convert_to_dict(request_dict) + request_dict = _common.encode_unserializable_types(request_dict) + + response = self._api_client.request("post", path, request_dict, http_options) + + response_dict = {} if not response.body else json.loads(response.body) + + return_value = types.AgentEnginePurgeMemoriesOperation._from_response( + response=response_dict, kwargs=parameter_model.model_dump() + ) + + self._api_client._verify_response(return_value) + return return_value + _revisions = None @property @@ -1265,13 +1344,58 @@ def rollback( if config.wait_for_completion and not operation.done: operation = _agent_engines_utils._await_operation( operation_name=operation.name, - get_operation_fn=self._get_rollback_memory_operation, + get_operation_fn=self._get_memory_operation, poll_interval_seconds=0.5, ) if operation.error: raise RuntimeError(f"Failed to rollback memory: {operation.error}") return operation + def purge( + self, + *, + name: str, + filter: str, + force: bool = False, + config: Optional[types.PurgeAgentEngineMemoriesConfigOrDict] = None, + ) -> types.AgentEnginePurgeMemoriesOperation: + """Purges memories from an Agent Engine. + + Args: + name (str): + Required. The name of the Agent Engine to purge memories from. + filter (str): + Required. The standard list filter to determine which memories to purge. + force (bool): + Optional. Whether to force the purge operation. If false, the + operation will be staged but not executed. + config (PurgeAgentEngineMemoriesConfig): + Optional. The configuration for the purge operation. + + Returns: + AgentEnginePurgeMemoriesOperation: + The operation for purging the memories. + """ + if config is None: + config = types.PurgeAgentEngineMemoriesConfig() + elif isinstance(config, dict): + config = types.PurgeAgentEngineMemoriesConfig.model_validate(config) + operation = self._purge( + name=name, + filter=filter, + force=force, + config=config, + ) + if config.wait_for_completion and not operation.done: + operation = _agent_engines_utils._await_operation( + operation_name=operation.name, + get_operation_fn=self._get_memory_operation, + poll_interval_seconds=0.5, + ) + if operation.error: + raise RuntimeError(f"Failed to purge memories: {operation.error}") + return operation + class AsyncMemories(_api_module.BaseModule): @@ -1885,6 +2009,67 @@ async def _update( self._api_client._verify_response(return_value) return return_value + async def _purge( + self, + *, + name: str, + filter: str, + force: Optional[bool] = None, + config: Optional[types.PurgeAgentEngineMemoriesConfigOrDict] = None, + ) -> types.AgentEnginePurgeMemoriesOperation: + """ + Purges memories from an Agent Engine. + """ + + parameter_model = types._PurgeAgentEngineMemoriesRequestParameters( + name=name, + filter=filter, + force=force, + config=config, + ) + + request_url_dict: Optional[dict[str, str]] + if not self._api_client.vertexai: + raise ValueError("This method is only supported in the Vertex AI client.") + else: + request_dict = _PurgeAgentEngineMemoriesRequestParameters_to_vertex( + parameter_model + ) + request_url_dict = request_dict.get("_url") + if request_url_dict: + path = "{name}/memories:purge".format_map(request_url_dict) + else: + path = "{name}/memories:purge" + + query_params = request_dict.get("_query") + if query_params: + path = f"{path}?{urlencode(query_params)}" + # TODO: remove the hack that pops config. + request_dict.pop("config", None) + + http_options: Optional[types.HttpOptions] = None + if ( + parameter_model.config is not None + and parameter_model.config.http_options is not None + ): + http_options = parameter_model.config.http_options + + request_dict = _common.convert_to_dict(request_dict) + request_dict = _common.encode_unserializable_types(request_dict) + + response = await self._api_client.async_request( + "post", path, request_dict, http_options + ) + + response_dict = {} if not response.body else json.loads(response.body) + + return_value = types.AgentEnginePurgeMemoriesOperation._from_response( + response=response_dict, kwargs=parameter_model.model_dump() + ) + + self._api_client._verify_response(return_value) + return return_value + _revisions = None @property @@ -2131,9 +2316,54 @@ async def rollback( if config.wait_for_completion and not operation.done: operation = await _agent_engines_utils._await_async_operation( operation_name=operation.name, - get_operation_fn=self._get_rollback_memory_operation, + get_operation_fn=self._get_memory_operation, poll_interval_seconds=0.5, ) if operation.error: raise RuntimeError(f"Failed to rollback memory: {operation.error}") return operation + + async def purge( + self, + *, + name: str, + filter: str, + force: bool = False, + config: Optional[types.PurgeAgentEngineMemoriesConfigOrDict] = None, + ) -> types.AgentEnginePurgeMemoriesOperation: + """Purges memories from an Agent Engine. + + Args: + name (str): + Required. The name of the Agent Engine to purge memories from. + filter (str): + Required. The standard list filter to determine which memories to purge. + force (bool): + Optional. Whether to force the purge operation. If false, the + operation will be staged but not executed. + config (PurgeAgentEngineMemoriesConfig): + Optional. The configuration for the purge operation. + + Returns: + AgentEnginePurgeMemoriesOperation: + The operation for purging the memories. + """ + if config is None: + config = types.PurgeAgentEngineMemoriesConfig() + elif isinstance(config, dict): + config = types.PurgeAgentEngineMemoriesConfig.model_validate(config) + operation = await self._purge( + name=name, + filter=filter, + force=force, + config=config, + ) + if config.wait_for_completion and not operation.done: + operation = await _agent_engines_utils._await_async_operation( + operation_name=operation.name, + get_operation_fn=self._get_memory_operation, + poll_interval_seconds=0.5, + ) + if operation.error: + raise RuntimeError(f"Failed to purge memories: {operation.error}") + return operation diff --git a/vertexai/_genai/types/__init__.py b/vertexai/_genai/types/__init__.py index 8582001816..e460024dc5 100644 --- a/vertexai/_genai/types/__init__.py +++ b/vertexai/_genai/types/__init__.py @@ -75,6 +75,7 @@ from .common import _ListDatasetVersionsRequestParameters from .common import _ListMultimodalDatasetsRequestParameters from .common import _OptimizeRequestParameters +from .common import _PurgeAgentEngineMemoriesRequestParameters from .common import _QueryAgentEngineRequestParameters from .common import _RestoreVersionRequestParameters from .common import _RetrieveAgentEngineMemoriesRequestParameters @@ -103,6 +104,9 @@ from .common import AgentEngineOperationDict from .common import AgentEngineOperationOrDict from .common import AgentEngineOrDict +from .common import AgentEnginePurgeMemoriesOperation +from .common import AgentEnginePurgeMemoriesOperationDict +from .common import AgentEnginePurgeMemoriesOperationOrDict from .common import AgentEngineRollbackMemoryOperation from .common import AgentEngineRollbackMemoryOperationDict from .common import AgentEngineRollbackMemoryOperationOrDict @@ -668,6 +672,12 @@ from .common import PscInterfaceConfig from .common import PscInterfaceConfigDict from .common import PscInterfaceConfigOrDict +from .common import PurgeAgentEngineMemoriesConfig +from .common import PurgeAgentEngineMemoriesConfigDict +from .common import PurgeAgentEngineMemoriesConfigOrDict +from .common import PurgeMemoriesResponse +from .common import PurgeMemoriesResponseDict +from .common import PurgeMemoriesResponseOrDict from .common import PythonPackageSpec from .common import PythonPackageSpecDict from .common import PythonPackageSpecOrDict @@ -1517,6 +1527,15 @@ "UpdateAgentEngineMemoryConfig", "UpdateAgentEngineMemoryConfigDict", "UpdateAgentEngineMemoryConfigOrDict", + "PurgeAgentEngineMemoriesConfig", + "PurgeAgentEngineMemoriesConfigDict", + "PurgeAgentEngineMemoriesConfigOrDict", + "PurgeMemoriesResponse", + "PurgeMemoriesResponseDict", + "PurgeMemoriesResponseOrDict", + "AgentEnginePurgeMemoriesOperation", + "AgentEnginePurgeMemoriesOperationDict", + "AgentEnginePurgeMemoriesOperationOrDict", "GetAgentEngineMemoryRevisionConfig", "GetAgentEngineMemoryRevisionConfigDict", "GetAgentEngineMemoryRevisionConfigOrDict", @@ -1940,6 +1959,7 @@ "_RetrieveAgentEngineMemoriesRequestParameters", "_RollbackAgentEngineMemoryRequestParameters", "_UpdateAgentEngineMemoryRequestParameters", + "_PurgeAgentEngineMemoriesRequestParameters", "_GetAgentEngineMemoryRevisionRequestParameters", "_ListAgentEngineMemoryRevisionsRequestParameters", "_CreateAgentEngineSandboxRequestParameters", diff --git a/vertexai/_genai/types/common.py b/vertexai/_genai/types/common.py index a2a3c497b5..03b97aa203 100644 --- a/vertexai/_genai/types/common.py +++ b/vertexai/_genai/types/common.py @@ -7851,6 +7851,142 @@ class _UpdateAgentEngineMemoryRequestParametersDict(TypedDict, total=False): ] +class PurgeAgentEngineMemoriesConfig(_common.BaseModel): + """Config for purging memories.""" + + http_options: Optional[genai_types.HttpOptions] = Field( + default=None, description="""Used to override HTTP request options.""" + ) + wait_for_completion: Optional[bool] = Field( + default=True, + description="""Waits for the operation to complete before returning.""", + ) + + +class PurgeAgentEngineMemoriesConfigDict(TypedDict, total=False): + """Config for purging memories.""" + + http_options: Optional[genai_types.HttpOptionsDict] + """Used to override HTTP request options.""" + + wait_for_completion: Optional[bool] + """Waits for the operation to complete before returning.""" + + +PurgeAgentEngineMemoriesConfigOrDict = Union[ + PurgeAgentEngineMemoriesConfig, PurgeAgentEngineMemoriesConfigDict +] + + +class _PurgeAgentEngineMemoriesRequestParameters(_common.BaseModel): + """Parameters for purging agent engine memories.""" + + name: Optional[str] = Field( + default=None, description="""Name of the Agent Engine to purge memories from.""" + ) + filter: Optional[str] = Field( + default=None, + description="""The standard list filter to determine which memories to purge. + More detail in [AIP-160](https://google.aip.dev/160).""", + ) + force: Optional[bool] = Field( + default=None, + description="""If true, the memories will actually be purged. If false, the purge request will be validated but not executed.""", + ) + config: Optional[PurgeAgentEngineMemoriesConfig] = Field( + default=None, description="""""" + ) + + +class _PurgeAgentEngineMemoriesRequestParametersDict(TypedDict, total=False): + """Parameters for purging agent engine memories.""" + + name: Optional[str] + """Name of the Agent Engine to purge memories from.""" + + filter: Optional[str] + """The standard list filter to determine which memories to purge. + More detail in [AIP-160](https://google.aip.dev/160).""" + + force: Optional[bool] + """If true, the memories will actually be purged. If false, the purge request will be validated but not executed.""" + + config: Optional[PurgeAgentEngineMemoriesConfigDict] + """""" + + +_PurgeAgentEngineMemoriesRequestParametersOrDict = Union[ + _PurgeAgentEngineMemoriesRequestParameters, + _PurgeAgentEngineMemoriesRequestParametersDict, +] + + +class PurgeMemoriesResponse(_common.BaseModel): + """The response for purging memories.""" + + purge_count: Optional[int] = Field( + default=None, description="""The number of memories that were purged.""" + ) + + +class PurgeMemoriesResponseDict(TypedDict, total=False): + """The response for purging memories.""" + + purge_count: Optional[int] + """The number of memories that were purged.""" + + +PurgeMemoriesResponseOrDict = Union[PurgeMemoriesResponse, PurgeMemoriesResponseDict] + + +class AgentEnginePurgeMemoriesOperation(_common.BaseModel): + """Operation that purges memories from an agent engine.""" + + name: Optional[str] = Field( + default=None, + description="""The server-assigned name, which is only unique within the same service that originally returns it. If you use the default HTTP mapping, the `name` should be a resource name ending with `operations/{unique_id}`.""", + ) + metadata: Optional[dict[str, Any]] = Field( + default=None, + description="""Service-specific metadata associated with the operation. It typically contains progress information and common metadata such as create time. Some services might not provide such metadata. Any method that returns a long-running operation should document the metadata type, if any.""", + ) + done: Optional[bool] = Field( + default=None, + description="""If the value is `false`, it means the operation is still in progress. If `true`, the operation is completed, and either `error` or `response` is available.""", + ) + error: Optional[dict[str, Any]] = Field( + default=None, + description="""The error result of the operation in case of failure or cancellation.""", + ) + response: Optional[PurgeMemoriesResponse] = Field( + default=None, description="""The response for purging memories.""" + ) + + +class AgentEnginePurgeMemoriesOperationDict(TypedDict, total=False): + """Operation that purges memories from an agent engine.""" + + name: Optional[str] + """The server-assigned name, which is only unique within the same service that originally returns it. If you use the default HTTP mapping, the `name` should be a resource name ending with `operations/{unique_id}`.""" + + metadata: Optional[dict[str, Any]] + """Service-specific metadata associated with the operation. It typically contains progress information and common metadata such as create time. Some services might not provide such metadata. Any method that returns a long-running operation should document the metadata type, if any.""" + + done: Optional[bool] + """If the value is `false`, it means the operation is still in progress. If `true`, the operation is completed, and either `error` or `response` is available.""" + + error: Optional[dict[str, Any]] + """The error result of the operation in case of failure or cancellation.""" + + response: Optional[PurgeMemoriesResponseDict] + """The response for purging memories.""" + + +AgentEnginePurgeMemoriesOperationOrDict = Union[ + AgentEnginePurgeMemoriesOperation, AgentEnginePurgeMemoriesOperationDict +] + + class GetAgentEngineMemoryRevisionConfig(_common.BaseModel): """Config for getting an Agent Engine Memory Revision."""