Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
77 commits
Select commit Hold shift + click to select a range
cbe98dc
Initial asyncio commit
yuce Sep 19, 2025
162fd17
Updates
yuce Sep 19, 2025
9931956
Updates
yuce Sep 19, 2025
856e3df
Merge branch 'master' into asyncio-module
yuce Sep 19, 2025
35384bf
Black
yuce Sep 19, 2025
fdda120
Updates
yuce Sep 19, 2025
fee5b45
Updates
yuce Sep 19, 2025
fc2c38b
Updates
yuce Sep 19, 2025
1772031
Removed docs, include HazelcastClient/Map in public API
yuce Sep 19, 2025
170cf89
Updates
yuce Sep 19, 2025
539c904
Merge branch 'master' into asyncio-module
yuce Sep 22, 2025
22449a8
black
yuce Sep 22, 2025
5406bc6
Ignore incorrect mypy errors
yuce Sep 22, 2025
a417a4a
Updates
yuce Sep 24, 2025
d00c480
Updates
yuce Sep 25, 2025
baa3bc1
Annotate optional params
yuce Sep 29, 2025
ebfc9e2
black
yuce Sep 29, 2025
6928837
Remove update to test util
yuce Sep 30, 2025
3e03cbf
black
yuce Sep 30, 2025
51ced7a
black
yuce Sep 30, 2025
e635b94
update
yuce Sep 30, 2025
4f103f6
Added support for SSL
yuce Sep 30, 2025
042cc58
Added SSL tests
yuce Sep 30, 2025
265a2b4
Added mutual authentication test
yuce Sep 30, 2025
293975d
Added hostname verification tests
yuce Oct 1, 2025
2718478
black
yuce Oct 1, 2025
58783dc
Ported more integration tests
yuce Oct 1, 2025
3cf9982
Ported hazelcast json value test
yuce Oct 2, 2025
7e97ec7
Merge branch 'master' into asyncio-module-integration-tests1
yuce Oct 2, 2025
6a558e8
Merge branch 'master' into asyncio-module-ssl
yuce Oct 2, 2025
a630706
Merge branch 'master' into asyncio-module
yuce Oct 2, 2025
c1798ea
Ported heart beat test
yuce Oct 2, 2025
e92936a
Ported more tests
yuce Oct 20, 2025
6ced889
Merge branch 'master' into asyncio-module
yuce Oct 20, 2025
c313bfa
Merge branch 'master' into asyncio-module-ssl
yuce Oct 20, 2025
6222c6b
Merge branch 'master' into asyncio-module-integration-tests1
yuce Oct 20, 2025
120a58a
black
yuce Oct 22, 2025
80880b8
Fixed type hints
yuce Oct 30, 2025
6431acc
type hints
yuce Nov 14, 2025
e9a9b5e
Ported more tests
yuce Nov 17, 2025
5334cd1
Added near cache, statistics, statistics tests
yuce Nov 17, 2025
a14290a
Black
yuce Nov 17, 2025
492ccc1
Fixed getting local address
yuce Nov 18, 2025
e8a2600
Fixed getting local address, take 2
yuce Nov 18, 2025
24eb6bf
Added nearcache tests
yuce Nov 18, 2025
6ab9365
Ported missing nearcache test
yuce Nov 18, 2025
3f3a9c5
Ported VectorCollection and its tests
yuce Nov 19, 2025
bfb805d
Black
yuce Nov 19, 2025
91bf1d1
Addressed review comment
yuce Nov 21, 2025
2128f5e
Removed unnecessary code
yuce Nov 21, 2025
62697e3
Add BETA warning
yuce Nov 21, 2025
a87a5c6
Black
yuce Nov 21, 2025
8d7eede
Merge branch 'asyncio-module' into asyncio-module-ssl
yuce Nov 24, 2025
00a2d12
Merge branch 'asyncio-module-ssl' into asyncio-module-integration-tests1
yuce Nov 24, 2025
539466b
Merge branch 'asyncio-module-integration-tests1' into asyncio-module-…
yuce Nov 24, 2025
2aff5e4
Merge branch 'asyncio-module-integration-tests2' into asyncio-module-…
yuce Nov 24, 2025
767bfd5
Fix test_map_smart_listener_local_only
yuce Nov 24, 2025
e673679
Updated test_heartbeat_stopped_and_restored
yuce Nov 25, 2025
ab4a746
Merge branch 'asyncio-module-integration-tests1' into asyncio-module-…
yuce Nov 25, 2025
319bb35
Fixed tests
yuce Nov 25, 2025
8e325ea
Linter
yuce Nov 25, 2025
eed53b3
Test updates
yuce Nov 25, 2025
a6d5949
Merge branch 'asyncio-module-integration-tests2' into asyncio-module-…
yuce Nov 26, 2025
f61ec8e
Test updates
yuce Nov 26, 2025
1ca7fd6
Addressed review comment
yuce Nov 26, 2025
d9acede
updates
yuce Nov 28, 2025
bd23f41
Merge branch 'asyncio-module-ssl' into asyncio-module-integration-tests1
yuce Nov 28, 2025
76759ec
Merge branch 'asyncio-module-integration-tests1' into asyncio-module-…
yuce Nov 28, 2025
74a9aca
Merge branch 'asyncio-module-integration-tests2' into asyncio-module-…
yuce Nov 28, 2025
2a2d6e8
Merge branch 'master' into asyncio-module-integration-tests1
yuce Dec 1, 2025
6da4226
Merge branch 'asyncio-module-integration-tests1' into asyncio-module-…
yuce Dec 1, 2025
0a829d4
Merge branch 'asyncio-module-integration-tests2' into asyncio-module-…
yuce Dec 2, 2025
550a006
Merge branch 'master' into asyncio-module-integration-tests2
yuce Dec 4, 2025
2c8f10e
Prevent deadlock
yuce Dec 4, 2025
7f92a93
Merge branch 'asyncio-module-integration-tests2' into asyncio-module-…
yuce Dec 4, 2025
4a741ae
Merge branch 'master' into asyncio-module-vc-support
yuce Dec 4, 2025
df561e1
Refactored Proxy.destroy to clarify
yuce Dec 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion hazelcast/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@

from hazelcast.internal.asyncio_cluster import ClusterService, _InternalClusterService
from hazelcast.internal.asyncio_compact import CompactSchemaService
from hazelcast.config import Config
from hazelcast.config import Config, IndexConfig
from hazelcast.internal.asyncio_connection import ConnectionManager, DefaultAddressProvider
from hazelcast.core import DistributedObjectEvent, DistributedObjectInfo
from hazelcast.cp import CPSubsystem, ProxySessionManager
from hazelcast.discovery import HazelcastCloudAddressProvider
from hazelcast.errors import IllegalStateError, InvalidConfigurationError
from hazelcast.internal.asyncio_invocation import InvocationService, Invocation
from hazelcast.internal.asyncio_proxy.vector_collection import VectorCollection
from hazelcast.lifecycle import LifecycleService, LifecycleState, _InternalLifecycleService
from hazelcast.internal.asyncio_listener import ClusterViewListenerService, ListenerService
from hazelcast.near_cache import NearCacheManager
Expand All @@ -20,10 +21,12 @@
client_add_distributed_object_listener_codec,
client_get_distributed_objects_codec,
client_remove_distributed_object_listener_codec,
dynamic_config_add_vector_collection_config_codec,
)
from hazelcast.internal.asyncio_proxy.manager import (
MAP_SERVICE,
ProxyManager,
VECTOR_SERVICE,
)
from hazelcast.internal.asyncio_proxy.base import Proxy
from hazelcast.internal.asyncio_proxy.map import Map
Expand Down Expand Up @@ -185,6 +188,37 @@ async def _start(self):
async def get_map(self, name: str) -> Map[KeyType, ValueType]:
return await self._proxy_manager.get_or_create(MAP_SERVICE, name)

async def create_vector_collection_config(
self,
name: str,
indexes: typing.List[IndexConfig],
backup_count: int = 1,
async_backup_count: int = 0,
split_brain_protection_name: typing.Optional[str] = None,
merge_policy: str = "PutIfAbsentMergePolicy",
merge_batch_size: int = 100,
) -> None:
# check that indexes have different names
if indexes:
index_names = set(index.name for index in indexes)
if len(index_names) != len(indexes):
raise AssertionError("index names must be unique")

request = dynamic_config_add_vector_collection_config_codec.encode_request(
name,
indexes,
backup_count,
async_backup_count,
split_brain_protection_name,
merge_policy,
merge_batch_size,
)
invocation = Invocation(request, response_handler=lambda m: m)
await self._invocation_service.ainvoke(invocation)

async def get_vector_collection(self, name: str) -> VectorCollection:
return await self._proxy_manager.get_or_create(VECTOR_SERVICE, name)

async def add_distributed_object_listener(
self, listener_func: typing.Callable[[DistributedObjectEvent], None]
) -> str:
Expand Down
4 changes: 3 additions & 1 deletion hazelcast/internal/asyncio_proxy/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,13 @@ async def destroy(self) -> bool:
``True`` if this proxy is destroyed successfully, ``False``
otherwise.
"""
destroyed = False
async with asyncio.TaskGroup() as tg: # type: ignore[attr-defined]
tg.create_task(self._on_destroy())
return await tg.create_task(
destroyed = await tg.create_task(
self._context.proxy_manager.destroy_proxy(self.service_name, self.name)
)
return destroyed

async def _on_destroy(self):
pass
Expand Down
6 changes: 6 additions & 0 deletions hazelcast/internal/asyncio_proxy/manager.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
import typing

from hazelcast.internal.asyncio_proxy.vector_collection import (
VectorCollection,
create_vector_collection_proxy,
)
from hazelcast.protocol.codec import client_create_proxy_codec, client_destroy_proxy_codec
from hazelcast.internal.asyncio_invocation import Invocation
from hazelcast.internal.asyncio_proxy.base import Proxy
from hazelcast.internal.asyncio_proxy.map import create_map_proxy
from hazelcast.util import to_list

MAP_SERVICE = "hz:impl:mapService"
VECTOR_SERVICE = "hz:service:vector"

_proxy_init: typing.Dict[
str,
typing.Callable[[str, str, typing.Any], typing.Coroutine[typing.Any, typing.Any, typing.Any]],
] = {
MAP_SERVICE: create_map_proxy,
VECTOR_SERVICE: create_vector_collection_proxy,
}


Expand Down
1 change: 0 additions & 1 deletion hazelcast/internal/asyncio_proxy/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from hazelcast.aggregator import Aggregator
from hazelcast.config import IndexUtil, IndexType, IndexConfig
from hazelcast.core import SimpleEntryView
from hazelcast.errors import InvalidConfigurationError
from hazelcast.projection import Projection
from hazelcast.protocol import PagingPredicateHolder
from hazelcast.protocol.codec import (
Expand Down
257 changes: 257 additions & 0 deletions hazelcast/internal/asyncio_proxy/vector_collection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
import asyncio
import copy
import typing
import uuid
from typing import Any, Dict, List, Optional, Tuple

from hazelcast.protocol.codec import (
vector_collection_set_codec,
vector_collection_get_codec,
vector_collection_search_near_vector_codec,
vector_collection_delete_codec,
vector_collection_put_codec,
vector_collection_put_if_absent_codec,
vector_collection_remove_codec,
vector_collection_put_all_codec,
vector_collection_clear_codec,
vector_collection_optimize_codec,
vector_collection_size_codec,
)
from hazelcast.internal.asyncio_proxy.base import Proxy
from hazelcast.serialization.compact import SchemaNotReplicatedError
from hazelcast.serialization.data import Data
from hazelcast.types import KeyType, ValueType
from hazelcast.util import check_not_none
from hazelcast.vector import (
Document,
SearchResult,
Vector,
VectorType,
VectorSearchOptions,
)


class VectorCollection(Proxy, typing.Generic[KeyType, ValueType]):
def __init__(self, service_name, name, context):
super(VectorCollection, self).__init__(service_name, name, context)

async def get(self, key: Any) -> Document | None:
check_not_none(key, "key can't be None")
return await self._get_internal(key)

async def set(self, key: Any, document: Document) -> None:
check_not_none(key, "key can't be None")
check_not_none(document, "document can't be None")
check_not_none(document.value, "document value can't be None")
return await self._set_internal(key, document)

async def put(self, key: Any, document: Document) -> Document | None:
check_not_none(key, "key can't be None")
check_not_none(document, "document can't be None")
check_not_none(document.value, "document value can't be None")
return await self._put_internal(key, document)

async def put_all(self, map: Dict[Any, Document]) -> None:
check_not_none(map, "map can't be None")
if not map:
return None
partition_service = self._context.partition_service
partition_map: Dict[int, List[Tuple[Data, Document]]] = {}
for key, doc in map.items():
check_not_none(key, "key can't be None")
check_not_none(doc, "value can't be None")
doc = copy.copy(doc)
try:
entry = (self._to_data(key), doc)
doc.value = self._to_data(doc.value)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.put_all, map)

partition_id = partition_service.get_partition_id(entry[0])
partition_map.setdefault(partition_id, []).append(entry)

async with asyncio.TaskGroup() as tg: # type: ignore[attr-defined]
for partition_id, entry_list in partition_map.items():
request = vector_collection_put_all_codec.encode_request(self.name, entry_list)
tg.create_task(self._ainvoke_on_partition(request, partition_id))

return None

async def put_if_absent(self, key: Any, document: Document) -> Document | None:
check_not_none(key, "key can't be None")
check_not_none(document, "document can't be None")
check_not_none(document.value, "document value can't be None")
return await self._put_if_absent_internal(key, document)

async def search_near_vector(
self,
vector: Vector,
*,
include_value: bool = False,
include_vectors: bool = False,
limit: int = 10,
hints: Dict[str, str] = None
) -> List[SearchResult]:
check_not_none(vector, "vector can't be None")
if limit <= 0:
raise AssertionError("limit must be positive")
return await self._search_near_vector_internal(
vector,
include_value=include_value,
include_vectors=include_vectors,
limit=limit,
hints=hints,
)

async def remove(self, key: Any) -> Document | None:
check_not_none(key, "key can't be None")
return await self._remove_internal(key)

async def delete(self, key: Any) -> None:
check_not_none(key, "key can't be None")
return await self._delete_internal(key)

async def optimize(self, index_name: str = None) -> None:
request = vector_collection_optimize_codec.encode_request(
self.name, index_name, uuid.uuid4()
)
return await self._invoke(request)

async def clear(self) -> None:
request = vector_collection_clear_codec.encode_request(self.name)
return await self._invoke(request)

async def size(self) -> int:
request = vector_collection_size_codec.encode_request(self.name)
return await self._invoke(request, vector_collection_size_codec.decode_response)

def _set_internal(self, key: Any, document: Document) -> asyncio.Future[None]:
try:
key_data = self._to_data(key)
value_data = self._to_data(document.value)
except SchemaNotReplicatedError as e:
return self._send_schema_and_retry(e, self.set, key, document)
document = copy.copy(document)
document.value = value_data
request = vector_collection_set_codec.encode_request(
self.name,
key_data,
document,
)
return self._invoke_on_key(request, key_data)

def _get_internal(self, key: Any) -> asyncio.Future[Any]:
def handler(message):
doc = vector_collection_get_codec.decode_response(message)
return self._transform_document(doc)

try:
key_data = self._to_data(key)
except SchemaNotReplicatedError as e:
return self._send_schema_and_retry(e, self.get, key)
request = vector_collection_get_codec.encode_request(
self.name,
key_data,
)
return self._invoke_on_key(request, key_data, response_handler=handler)

def _search_near_vector_internal(
self,
vector: Vector,
*,
include_value: bool = False,
include_vectors: bool = False,
limit: int = 10,
hints: Dict[str, str] = None
) -> asyncio.Future[List[SearchResult]]:
def handler(message):
results: List[
SearchResult
] = vector_collection_search_near_vector_codec.decode_response(message)
for result in results:
if result.key is not None:
result.key = self._to_object(result.key)
if result.value is not None:
result.value = self._to_object(result.value)
if result.vectors:
for vec in result.vectors:
vec.type = VectorType(vec.type)
return results

options = VectorSearchOptions(
include_value=include_value,
include_vectors=include_vectors,
limit=limit,
hints=hints or {},
)
request = vector_collection_search_near_vector_codec.encode_request(
self.name,
[vector],
options,
)
return self._invoke(request, response_handler=handler)

def _delete_internal(self, key: Any) -> asyncio.Future[None]:
key_data = self._to_data(key)
request = vector_collection_delete_codec.encode_request(self.name, key_data)
return self._invoke_on_key(request, key_data)

def _remove_internal(self, key: Any) -> asyncio.Future[Document | None]:
def handler(message):
doc = vector_collection_remove_codec.decode_response(message)
return self._transform_document(doc)

key_data = self._to_data(key)
request = vector_collection_remove_codec.encode_request(self.name, key_data)
return self._invoke_on_key(request, key_data, response_handler=handler)

def _put_internal(self, key: Any, document: Document) -> asyncio.Future[Document | None]:
def handler(message):
doc = vector_collection_put_codec.decode_response(message)
return self._transform_document(doc)

try:
key_data = self._to_data(key)
value_data = self._to_data(document.value)
except SchemaNotReplicatedError as e:
return self._send_schema_and_retry(e, self.set, key, document)
document = copy.copy(document)
document.value = value_data
request = vector_collection_put_codec.encode_request(
self.name,
key_data,
document,
)
return self._invoke_on_key(request, key_data, response_handler=handler)

def _put_if_absent_internal(
self, key: Any, document: Document
) -> asyncio.Future[Document | None]:
def handler(message):
doc = vector_collection_put_if_absent_codec.decode_response(message)
return self._transform_document(doc)

try:
key_data = self._to_data(key)
value_data = self._to_data(document.value)
except SchemaNotReplicatedError as e:
return self._send_schema_and_retry(e, self.set, key, document)
document.value = value_data
request = vector_collection_put_if_absent_codec.encode_request(
self.name,
key_data,
document,
)
return self._invoke_on_key(request, key_data, response_handler=handler)

def _transform_document(self, doc: Optional[Document]) -> Optional[Document]:
if doc is not None:
if doc.value is not None:
doc.value = self._to_object(doc.value)
for vec in doc.vectors:
vec.type = VectorType(vec.type)
return doc


async def create_vector_collection_proxy(service_name, name, context):
return VectorCollection(service_name, name, context)
Loading