-
Notifications
You must be signed in to change notification settings - Fork 73
Asyncio Module VectorCollection Support [5/6] #751
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
77 commits
Select commit
Hold shift + click to select a range
cbe98dc
Initial asyncio commit
yuce 162fd17
Updates
yuce 9931956
Updates
yuce 856e3df
Merge branch 'master' into asyncio-module
yuce 35384bf
Black
yuce fdda120
Updates
yuce fee5b45
Updates
yuce fc2c38b
Updates
yuce 1772031
Removed docs, include HazelcastClient/Map in public API
yuce 170cf89
Updates
yuce 539c904
Merge branch 'master' into asyncio-module
yuce 22449a8
black
yuce 5406bc6
Ignore incorrect mypy errors
yuce a417a4a
Updates
yuce d00c480
Updates
yuce baa3bc1
Annotate optional params
yuce ebfc9e2
black
yuce 6928837
Remove update to test util
yuce 3e03cbf
black
yuce 51ced7a
black
yuce e635b94
update
yuce 4f103f6
Added support for SSL
yuce 042cc58
Added SSL tests
yuce 265a2b4
Added mutual authentication test
yuce 293975d
Added hostname verification tests
yuce 2718478
black
yuce 58783dc
Ported more integration tests
yuce 3cf9982
Ported hazelcast json value test
yuce 7e97ec7
Merge branch 'master' into asyncio-module-integration-tests1
yuce 6a558e8
Merge branch 'master' into asyncio-module-ssl
yuce a630706
Merge branch 'master' into asyncio-module
yuce c1798ea
Ported heart beat test
yuce e92936a
Ported more tests
yuce 6ced889
Merge branch 'master' into asyncio-module
yuce c313bfa
Merge branch 'master' into asyncio-module-ssl
yuce 6222c6b
Merge branch 'master' into asyncio-module-integration-tests1
yuce 120a58a
black
yuce 80880b8
Fixed type hints
yuce 6431acc
type hints
yuce e9a9b5e
Ported more tests
yuce 5334cd1
Added near cache, statistics, statistics tests
yuce a14290a
Black
yuce 492ccc1
Fixed getting local address
yuce e8a2600
Fixed getting local address, take 2
yuce 24eb6bf
Added nearcache tests
yuce 6ab9365
Ported missing nearcache test
yuce 3f3a9c5
Ported VectorCollection and its tests
yuce bfb805d
Black
yuce 91bf1d1
Addressed review comment
yuce 2128f5e
Removed unnecessary code
yuce 62697e3
Add BETA warning
yuce a87a5c6
Black
yuce 8d7eede
Merge branch 'asyncio-module' into asyncio-module-ssl
yuce 00a2d12
Merge branch 'asyncio-module-ssl' into asyncio-module-integration-tests1
yuce 539466b
Merge branch 'asyncio-module-integration-tests1' into asyncio-module-…
yuce 2aff5e4
Merge branch 'asyncio-module-integration-tests2' into asyncio-module-…
yuce 767bfd5
Fix test_map_smart_listener_local_only
yuce e673679
Updated test_heartbeat_stopped_and_restored
yuce ab4a746
Merge branch 'asyncio-module-integration-tests1' into asyncio-module-…
yuce 319bb35
Fixed tests
yuce 8e325ea
Linter
yuce eed53b3
Test updates
yuce a6d5949
Merge branch 'asyncio-module-integration-tests2' into asyncio-module-…
yuce f61ec8e
Test updates
yuce 1ca7fd6
Addressed review comment
yuce d9acede
updates
yuce bd23f41
Merge branch 'asyncio-module-ssl' into asyncio-module-integration-tests1
yuce 76759ec
Merge branch 'asyncio-module-integration-tests1' into asyncio-module-…
yuce 74a9aca
Merge branch 'asyncio-module-integration-tests2' into asyncio-module-…
yuce 2a2d6e8
Merge branch 'master' into asyncio-module-integration-tests1
yuce 6da4226
Merge branch 'asyncio-module-integration-tests1' into asyncio-module-…
yuce 0a829d4
Merge branch 'asyncio-module-integration-tests2' into asyncio-module-…
yuce 550a006
Merge branch 'master' into asyncio-module-integration-tests2
yuce 2c8f10e
Prevent deadlock
yuce 7f92a93
Merge branch 'asyncio-module-integration-tests2' into asyncio-module-…
yuce 4a741ae
Merge branch 'master' into asyncio-module-vc-support
yuce df561e1
Refactored Proxy.destroy to clarify
yuce File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,257 @@ | ||
| import asyncio | ||
| import copy | ||
| import typing | ||
| import uuid | ||
| from typing import Any, Dict, List, Optional, Tuple | ||
|
|
||
| from hazelcast.protocol.codec import ( | ||
| vector_collection_set_codec, | ||
| vector_collection_get_codec, | ||
| vector_collection_search_near_vector_codec, | ||
| vector_collection_delete_codec, | ||
| vector_collection_put_codec, | ||
| vector_collection_put_if_absent_codec, | ||
| vector_collection_remove_codec, | ||
| vector_collection_put_all_codec, | ||
| vector_collection_clear_codec, | ||
| vector_collection_optimize_codec, | ||
| vector_collection_size_codec, | ||
| ) | ||
| from hazelcast.internal.asyncio_proxy.base import Proxy | ||
| from hazelcast.serialization.compact import SchemaNotReplicatedError | ||
| from hazelcast.serialization.data import Data | ||
| from hazelcast.types import KeyType, ValueType | ||
| from hazelcast.util import check_not_none | ||
| from hazelcast.vector import ( | ||
| Document, | ||
| SearchResult, | ||
| Vector, | ||
| VectorType, | ||
| VectorSearchOptions, | ||
| ) | ||
|
|
||
|
|
||
| class VectorCollection(Proxy, typing.Generic[KeyType, ValueType]): | ||
| def __init__(self, service_name, name, context): | ||
| super(VectorCollection, self).__init__(service_name, name, context) | ||
|
|
||
| async def get(self, key: Any) -> Document | None: | ||
| check_not_none(key, "key can't be None") | ||
| return await self._get_internal(key) | ||
|
|
||
| async def set(self, key: Any, document: Document) -> None: | ||
| check_not_none(key, "key can't be None") | ||
| check_not_none(document, "document can't be None") | ||
| check_not_none(document.value, "document value can't be None") | ||
| return await self._set_internal(key, document) | ||
|
|
||
| async def put(self, key: Any, document: Document) -> Document | None: | ||
| check_not_none(key, "key can't be None") | ||
| check_not_none(document, "document can't be None") | ||
| check_not_none(document.value, "document value can't be None") | ||
| return await self._put_internal(key, document) | ||
|
|
||
| async def put_all(self, map: Dict[Any, Document]) -> None: | ||
| check_not_none(map, "map can't be None") | ||
| if not map: | ||
| return None | ||
| partition_service = self._context.partition_service | ||
| partition_map: Dict[int, List[Tuple[Data, Document]]] = {} | ||
| for key, doc in map.items(): | ||
| check_not_none(key, "key can't be None") | ||
| check_not_none(doc, "value can't be None") | ||
| doc = copy.copy(doc) | ||
| try: | ||
| entry = (self._to_data(key), doc) | ||
| doc.value = self._to_data(doc.value) | ||
| except SchemaNotReplicatedError as e: | ||
| return await self._send_schema_and_retry(e, self.put_all, map) | ||
|
|
||
| partition_id = partition_service.get_partition_id(entry[0]) | ||
| partition_map.setdefault(partition_id, []).append(entry) | ||
|
|
||
| async with asyncio.TaskGroup() as tg: # type: ignore[attr-defined] | ||
| for partition_id, entry_list in partition_map.items(): | ||
| request = vector_collection_put_all_codec.encode_request(self.name, entry_list) | ||
| tg.create_task(self._ainvoke_on_partition(request, partition_id)) | ||
|
|
||
| return None | ||
|
|
||
| async def put_if_absent(self, key: Any, document: Document) -> Document | None: | ||
| check_not_none(key, "key can't be None") | ||
| check_not_none(document, "document can't be None") | ||
| check_not_none(document.value, "document value can't be None") | ||
| return await self._put_if_absent_internal(key, document) | ||
|
|
||
| async def search_near_vector( | ||
| self, | ||
| vector: Vector, | ||
| *, | ||
| include_value: bool = False, | ||
| include_vectors: bool = False, | ||
| limit: int = 10, | ||
| hints: Dict[str, str] = None | ||
| ) -> List[SearchResult]: | ||
| check_not_none(vector, "vector can't be None") | ||
| if limit <= 0: | ||
| raise AssertionError("limit must be positive") | ||
| return await self._search_near_vector_internal( | ||
| vector, | ||
| include_value=include_value, | ||
| include_vectors=include_vectors, | ||
| limit=limit, | ||
| hints=hints, | ||
| ) | ||
|
|
||
| async def remove(self, key: Any) -> Document | None: | ||
| check_not_none(key, "key can't be None") | ||
| return await self._remove_internal(key) | ||
|
|
||
| async def delete(self, key: Any) -> None: | ||
| check_not_none(key, "key can't be None") | ||
| return await self._delete_internal(key) | ||
|
|
||
| async def optimize(self, index_name: str = None) -> None: | ||
| request = vector_collection_optimize_codec.encode_request( | ||
| self.name, index_name, uuid.uuid4() | ||
| ) | ||
| return await self._invoke(request) | ||
|
|
||
| async def clear(self) -> None: | ||
| request = vector_collection_clear_codec.encode_request(self.name) | ||
| return await self._invoke(request) | ||
|
|
||
| async def size(self) -> int: | ||
| request = vector_collection_size_codec.encode_request(self.name) | ||
| return await self._invoke(request, vector_collection_size_codec.decode_response) | ||
|
|
||
| def _set_internal(self, key: Any, document: Document) -> asyncio.Future[None]: | ||
| try: | ||
| key_data = self._to_data(key) | ||
| value_data = self._to_data(document.value) | ||
| except SchemaNotReplicatedError as e: | ||
| return self._send_schema_and_retry(e, self.set, key, document) | ||
| document = copy.copy(document) | ||
| document.value = value_data | ||
| request = vector_collection_set_codec.encode_request( | ||
| self.name, | ||
| key_data, | ||
| document, | ||
| ) | ||
| return self._invoke_on_key(request, key_data) | ||
|
|
||
| def _get_internal(self, key: Any) -> asyncio.Future[Any]: | ||
| def handler(message): | ||
| doc = vector_collection_get_codec.decode_response(message) | ||
| return self._transform_document(doc) | ||
|
|
||
| try: | ||
| key_data = self._to_data(key) | ||
| except SchemaNotReplicatedError as e: | ||
| return self._send_schema_and_retry(e, self.get, key) | ||
| request = vector_collection_get_codec.encode_request( | ||
| self.name, | ||
| key_data, | ||
| ) | ||
| return self._invoke_on_key(request, key_data, response_handler=handler) | ||
|
|
||
| def _search_near_vector_internal( | ||
| self, | ||
| vector: Vector, | ||
| *, | ||
| include_value: bool = False, | ||
| include_vectors: bool = False, | ||
| limit: int = 10, | ||
| hints: Dict[str, str] = None | ||
| ) -> asyncio.Future[List[SearchResult]]: | ||
| def handler(message): | ||
| results: List[ | ||
| SearchResult | ||
| ] = vector_collection_search_near_vector_codec.decode_response(message) | ||
| for result in results: | ||
yuce marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if result.key is not None: | ||
| result.key = self._to_object(result.key) | ||
| if result.value is not None: | ||
| result.value = self._to_object(result.value) | ||
| if result.vectors: | ||
| for vec in result.vectors: | ||
| vec.type = VectorType(vec.type) | ||
| return results | ||
|
|
||
| options = VectorSearchOptions( | ||
| include_value=include_value, | ||
| include_vectors=include_vectors, | ||
| limit=limit, | ||
| hints=hints or {}, | ||
| ) | ||
| request = vector_collection_search_near_vector_codec.encode_request( | ||
| self.name, | ||
| [vector], | ||
| options, | ||
| ) | ||
| return self._invoke(request, response_handler=handler) | ||
|
|
||
| def _delete_internal(self, key: Any) -> asyncio.Future[None]: | ||
| key_data = self._to_data(key) | ||
| request = vector_collection_delete_codec.encode_request(self.name, key_data) | ||
| return self._invoke_on_key(request, key_data) | ||
|
|
||
| def _remove_internal(self, key: Any) -> asyncio.Future[Document | None]: | ||
| def handler(message): | ||
| doc = vector_collection_remove_codec.decode_response(message) | ||
| return self._transform_document(doc) | ||
|
|
||
| key_data = self._to_data(key) | ||
| request = vector_collection_remove_codec.encode_request(self.name, key_data) | ||
| return self._invoke_on_key(request, key_data, response_handler=handler) | ||
|
|
||
| def _put_internal(self, key: Any, document: Document) -> asyncio.Future[Document | None]: | ||
| def handler(message): | ||
| doc = vector_collection_put_codec.decode_response(message) | ||
| return self._transform_document(doc) | ||
|
|
||
| try: | ||
| key_data = self._to_data(key) | ||
| value_data = self._to_data(document.value) | ||
| except SchemaNotReplicatedError as e: | ||
| return self._send_schema_and_retry(e, self.set, key, document) | ||
| document = copy.copy(document) | ||
| document.value = value_data | ||
| request = vector_collection_put_codec.encode_request( | ||
| self.name, | ||
| key_data, | ||
| document, | ||
| ) | ||
| return self._invoke_on_key(request, key_data, response_handler=handler) | ||
|
|
||
| def _put_if_absent_internal( | ||
| self, key: Any, document: Document | ||
| ) -> asyncio.Future[Document | None]: | ||
| def handler(message): | ||
| doc = vector_collection_put_if_absent_codec.decode_response(message) | ||
| return self._transform_document(doc) | ||
|
|
||
| try: | ||
| key_data = self._to_data(key) | ||
| value_data = self._to_data(document.value) | ||
| except SchemaNotReplicatedError as e: | ||
| return self._send_schema_and_retry(e, self.set, key, document) | ||
| document.value = value_data | ||
| request = vector_collection_put_if_absent_codec.encode_request( | ||
| self.name, | ||
| key_data, | ||
| document, | ||
| ) | ||
| return self._invoke_on_key(request, key_data, response_handler=handler) | ||
|
|
||
| def _transform_document(self, doc: Optional[Document]) -> Optional[Document]: | ||
| if doc is not None: | ||
| if doc.value is not None: | ||
| doc.value = self._to_object(doc.value) | ||
| for vec in doc.vectors: | ||
| vec.type = VectorType(vec.type) | ||
| return doc | ||
|
|
||
|
|
||
| async def create_vector_collection_proxy(service_name, name, context): | ||
| return VectorCollection(service_name, name, context) | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.