Skip to content

Commit bbd5a21

Browse files
authored
Asyncio Module Ported more tests [4/6] (#750)
1 parent c72eafa commit bbd5a21

File tree

16 files changed

+2162
-70
lines changed

16 files changed

+2162
-70
lines changed

hazelcast/asyncio/client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
from hazelcast.internal.asyncio_reactor import AsyncioReactor
3131
from hazelcast.serialization import SerializationServiceV1
3232
from hazelcast.sql import SqlService, _InternalSqlService
33-
from hazelcast.statistics import Statistics
33+
from hazelcast.internal.asyncio_statistics import Statistics
3434
from hazelcast.types import KeyType, ValueType, ItemType, MessageType
3535
from hazelcast.util import AtomicInteger, RoundRobinLB
3636

@@ -176,7 +176,7 @@ async def _start(self):
176176
self._listener_service.start()
177177
await self._invocation_service.add_backup_listener()
178178
self._load_balancer.init(self._cluster_service)
179-
self._statistics.start()
179+
await self._statistics.start()
180180
except Exception:
181181
await self.shutdown()
182182
raise

hazelcast/internal/asyncio_connection.py

Lines changed: 40 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -650,47 +650,50 @@ async def _handle_successful_auth(self, response, connection):
650650
connection.remote_uuid = remote_uuid
651651

652652
existing = self.active_connections.get(remote_uuid, None)
653-
if existing:
654-
await connection.close_connection(
655-
"Duplicate connection to same member with UUID: %s" % remote_uuid, None
656-
)
657-
return existing
658-
659-
new_cluster_id = response["cluster_id"]
660-
changed_cluster = self._cluster_id is not None and self._cluster_id != new_cluster_id
661-
if changed_cluster:
662-
await self._check_client_state_on_cluster_change(connection)
663-
_logger.warning(
664-
"Switching from current cluster: %s to new cluster: %s",
665-
self._cluster_id,
666-
new_cluster_id,
667-
)
668-
self._on_cluster_restart()
669653

654+
if existing:
655+
await connection.close_connection(
656+
"Duplicate connection to same member with UUID: %s" % remote_uuid, None
657+
)
658+
return existing
659+
660+
new_cluster_id = response["cluster_id"]
661+
changed_cluster = self._cluster_id is not None and self._cluster_id != new_cluster_id
662+
if changed_cluster:
663+
await self._check_client_state_on_cluster_change(connection)
664+
_logger.warning(
665+
"Switching from current cluster: %s to new cluster: %s",
666+
self._cluster_id,
667+
new_cluster_id,
668+
)
669+
self._on_cluster_restart()
670+
671+
async with self._lock:
670672
is_initial_connection = not self.active_connections
671673
self.active_connections[remote_uuid] = connection
672674
fire_connected_lifecycle_event = False
673-
if is_initial_connection:
674-
self._cluster_id = new_cluster_id
675-
# In split brain, the client might connect to the one half
676-
# of the cluster, and then later might reconnect to the
677-
# other half, after the half it was connected to is
678-
# completely dead. Since the cluster id is preserved in
679-
# split brain scenarios, it is impossible to distinguish
680-
# reconnection to the same cluster vs reconnection to the
681-
# other half of the split brain. However, in the latter,
682-
# we might need to send some state to the other half of
683-
# the split brain (like Compact schemas). That forces us
684-
# to send the client state to the cluster after the first
685-
# cluster connection, regardless the cluster id is
686-
# changed or not.
687-
if self._established_initial_cluster_connection:
688-
self._client_state = ClientState.CONNECTED_TO_CLUSTER
689-
await self._initialize_on_cluster(new_cluster_id)
690-
else:
691-
fire_connected_lifecycle_event = True
692-
self._established_initial_cluster_connection = True
693-
self._client_state = ClientState.INITIALIZED_ON_CLUSTER
675+
676+
if is_initial_connection:
677+
self._cluster_id = new_cluster_id
678+
# In split brain, the client might connect to the one half
679+
# of the cluster, and then later might reconnect to the
680+
# other half, after the half it was connected to is
681+
# completely dead. Since the cluster id is preserved in
682+
# split brain scenarios, it is impossible to distinguish
683+
# reconnection to the same cluster vs reconnection to the
684+
# other half of the split brain. However, in the latter,
685+
# we might need to send some state to the other half of
686+
# the split brain (like Compact schemas). That forces us
687+
# to send the client state to the cluster after the first
688+
# cluster connection, regardless the cluster id is
689+
# changed or not.
690+
if self._established_initial_cluster_connection:
691+
self._client_state = ClientState.CONNECTED_TO_CLUSTER
692+
await self._initialize_on_cluster(new_cluster_id)
693+
else:
694+
fire_connected_lifecycle_event = True
695+
self._established_initial_cluster_connection = True
696+
self._client_state = ClientState.INITIALIZED_ON_CLUSTER
694697

695698
if fire_connected_lifecycle_event:
696699
self._lifecycle_service.fire_lifecycle_event(LifecycleState.CONNECTED)

hazelcast/internal/asyncio_listener.py

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -175,10 +175,8 @@ async def _register_on_connection(
175175
self, user_registration_id, listener_registration, connection
176176
):
177177
registration_map = listener_registration.connection_registrations
178-
179178
if connection in registration_map:
180179
return
181-
182180
registration_request = listener_registration.registration_request.copy()
183181
invocation = Invocation(
184182
registration_request,
@@ -187,26 +185,20 @@ async def _register_on_connection(
187185
response_handler=lambda m: m,
188186
urgent=True,
189187
)
190-
self._invocation_service.invoke(invocation)
191-
192-
def callback(f):
193-
try:
194-
response = f.result()
195-
server_registration_id = listener_registration.decode_register_response(response)
196-
correlation_id = registration_request.get_correlation_id()
197-
registration = _EventRegistration(server_registration_id, correlation_id)
198-
registration_map[connection] = registration
199-
except Exception as e:
200-
if connection.live:
201-
_logger.exception(
202-
"Listener %s can not be added to a new connection: %s",
203-
user_registration_id,
204-
connection,
205-
)
206-
raise e
207-
208-
invocation.future.add_done_callback(callback)
209-
return await invocation.future
188+
response = await self._invocation_service.ainvoke(invocation)
189+
try:
190+
server_registration_id = listener_registration.decode_register_response(response)
191+
correlation_id = registration_request.get_correlation_id()
192+
registration = _EventRegistration(server_registration_id, correlation_id)
193+
registration_map[connection] = registration
194+
except Exception as e:
195+
if connection.live:
196+
_logger.exception(
197+
"Listener %s can not be added to a new connection: %s",
198+
user_registration_id,
199+
connection,
200+
)
201+
raise e
210202

211203
async def _connection_added(self, connection):
212204
async with self._registration_lock:

hazelcast/internal/asyncio_proxy/base.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,13 @@ async def destroy(self) -> bool:
4141
``True`` if this proxy is destroyed successfully, ``False``
4242
otherwise.
4343
"""
44-
self._on_destroy()
45-
return await self._context.proxy_manager.destroy_proxy(self.service_name, self.name)
44+
async with asyncio.TaskGroup() as tg: # type: ignore[attr-defined]
45+
tg.create_task(self._on_destroy())
46+
return await tg.create_task(
47+
self._context.proxy_manager.destroy_proxy(self.service_name, self.name)
48+
)
4649

47-
def _on_destroy(self):
50+
async def _on_destroy(self):
4851
pass
4952

5053
def __repr__(self) -> str:

hazelcast/internal/asyncio_proxy/manager.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@
88

99
MAP_SERVICE = "hz:impl:mapService"
1010

11-
_proxy_init: typing.Dict[str, typing.Callable[[str, str, typing.Any], Proxy]] = {
11+
_proxy_init: typing.Dict[
12+
str,
13+
typing.Callable[[str, str, typing.Any], typing.Coroutine[typing.Any, typing.Any, typing.Any]],
14+
] = {
1215
MAP_SERVICE: create_map_proxy,
1316
}
1417

@@ -34,7 +37,7 @@ async def _create_proxy(self, service_name, name, create_on_remote) -> Proxy:
3437
invocation_service = self._context.invocation_service
3538
await invocation_service.ainvoke(invocation)
3639

37-
return _proxy_init[service_name](service_name, name, self._context)
40+
return await _proxy_init[service_name](service_name, name, self._context)
3841

3942
async def destroy_proxy(self, service_name, name, destroy_on_remote=True):
4043
ns = (service_name, name)

hazelcast/internal/asyncio_proxy/map.py

Lines changed: 172 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
map_set_with_max_idle_codec,
6666
map_remove_interceptor_codec,
6767
map_remove_all_codec,
68+
map_add_near_cache_invalidation_listener_codec,
6869
)
6970
from hazelcast.internal.asyncio_proxy.base import (
7071
Proxy,
@@ -971,8 +972,177 @@ def handler(message):
971972
return self._invoke_on_key(request, key_data, handler)
972973

973974

974-
def create_map_proxy(service_name, name, context):
975+
class MapFeatNearCache(Map[KeyType, ValueType]):
976+
"""Map proxy implementation featuring Near Cache"""
977+
978+
def __init__(self, service_name, name, context):
979+
super(MapFeatNearCache, self).__init__(service_name, name, context)
980+
self._invalidation_listener_id = None
981+
self._near_cache = context.near_cache_manager.get_or_create_near_cache(name)
982+
983+
async def clear(self):
984+
self._near_cache._clear()
985+
return await super(MapFeatNearCache, self).clear()
986+
987+
async def evict_all(self):
988+
self._near_cache.clear()
989+
return await super(MapFeatNearCache, self).evict_all()
990+
991+
async def load_all(self, keys=None, replace_existing_values=True):
992+
if keys is None and replace_existing_values:
993+
self._near_cache.clear()
994+
return await super(MapFeatNearCache, self).load_all(keys, replace_existing_values)
995+
996+
async def _on_destroy(self):
997+
await self._remove_near_cache_invalidation_listener()
998+
self._near_cache.clear()
999+
await super(MapFeatNearCache, self)._on_destroy()
1000+
1001+
async def _add_near_cache_invalidation_listener(self):
1002+
codec = map_add_near_cache_invalidation_listener_codec
1003+
request = codec.encode_request(self.name, EntryEventType.INVALIDATION, self._is_smart)
1004+
self._invalidation_listener_id = await self._register_listener(
1005+
request,
1006+
lambda r: codec.decode_response(r),
1007+
lambda reg_id: map_remove_entry_listener_codec.encode_request(self.name, reg_id),
1008+
lambda m: codec.handle(m, self._handle_invalidation, self._handle_batch_invalidation),
1009+
)
1010+
1011+
async def _remove_near_cache_invalidation_listener(self):
1012+
if self._invalidation_listener_id:
1013+
await self.remove_entry_listener(self._invalidation_listener_id)
1014+
1015+
def _handle_invalidation(self, key, source_uuid, partition_uuid, sequence):
1016+
# key is always ``Data``
1017+
# null key means near cache has to remove all entries in it.
1018+
# see MapAddNearCacheEntryListenerMessageTask.
1019+
if key is None:
1020+
self._near_cache._clear()
1021+
else:
1022+
self._invalidate_cache(key)
1023+
1024+
def _handle_batch_invalidation(self, keys, source_uuids, partition_uuids, sequences):
1025+
# key_list is always list of ``Data``
1026+
for key_data in keys:
1027+
self._invalidate_cache(key_data)
1028+
1029+
def _invalidate_cache(self, key_data):
1030+
self._near_cache._invalidate(key_data)
1031+
1032+
def _invalidate_cache_batch(self, key_data_list):
1033+
for key_data in key_data_list:
1034+
self._near_cache._invalidate(key_data)
1035+
1036+
# internals
1037+
async def _contains_key_internal(self, key_data):
1038+
try:
1039+
return self._near_cache[key_data]
1040+
except KeyError:
1041+
return await super(MapFeatNearCache, self)._contains_key_internal(key_data)
1042+
1043+
async def _get_internal(self, key_data):
1044+
try:
1045+
return self._near_cache[key_data]
1046+
except KeyError:
1047+
value = await super(MapFeatNearCache, self)._get_internal(key_data)
1048+
self._near_cache.__setitem__(key_data, value)
1049+
return value
1050+
1051+
async def _get_all_internal(self, partition_to_keys, tasks=None):
1052+
tasks = tasks or []
1053+
for key_dic in partition_to_keys.values():
1054+
for key in list(key_dic.keys()):
1055+
try:
1056+
key_data = key_dic[key]
1057+
value = self._near_cache[key_data]
1058+
future = asyncio.Future()
1059+
future.set_result((key, value))
1060+
tasks.append(future)
1061+
del key_dic[key]
1062+
except KeyError:
1063+
pass
1064+
return await super(MapFeatNearCache, self)._get_all_internal(partition_to_keys, tasks)
1065+
1066+
def _try_remove_internal(self, key_data, timeout):
1067+
self._invalidate_cache(key_data)
1068+
return super(MapFeatNearCache, self)._try_remove_internal(key_data, timeout)
1069+
1070+
def _try_put_internal(self, key_data, value_data, timeout):
1071+
self._invalidate_cache(key_data)
1072+
return super(MapFeatNearCache, self)._try_put_internal(key_data, value_data, timeout)
1073+
1074+
def _set_internal(self, key_data, value_data, ttl, max_idle):
1075+
self._invalidate_cache(key_data)
1076+
return super(MapFeatNearCache, self)._set_internal(key_data, value_data, ttl, max_idle)
1077+
1078+
def _set_ttl_internal(self, key_data, ttl):
1079+
self._invalidate_cache(key_data)
1080+
return super(MapFeatNearCache, self)._set_ttl_internal(key_data, ttl)
1081+
1082+
def _replace_internal(self, key_data, value_data):
1083+
self._invalidate_cache(key_data)
1084+
return super(MapFeatNearCache, self)._replace_internal(key_data, value_data)
1085+
1086+
def _replace_if_same_internal(self, key_data, old_value_data, new_value_data):
1087+
self._invalidate_cache(key_data)
1088+
return super(MapFeatNearCache, self)._replace_if_same_internal(
1089+
key_data, old_value_data, new_value_data
1090+
)
1091+
1092+
def _remove_internal(self, key_data):
1093+
self._invalidate_cache(key_data)
1094+
return super(MapFeatNearCache, self)._remove_internal(key_data)
1095+
1096+
def _remove_all_internal(self, predicate_data):
1097+
self._near_cache.clear()
1098+
return super(MapFeatNearCache, self)._remove_all_internal(predicate_data)
1099+
1100+
def _remove_if_same_internal_(self, key_data, value_data):
1101+
self._invalidate_cache(key_data)
1102+
return super(MapFeatNearCache, self)._remove_if_same_internal_(key_data, value_data)
1103+
1104+
def _put_transient_internal(self, key_data, value_data, ttl, max_idle):
1105+
self._invalidate_cache(key_data)
1106+
return super(MapFeatNearCache, self)._put_transient_internal(
1107+
key_data, value_data, ttl, max_idle
1108+
)
1109+
1110+
def _put_internal(self, key_data, value_data, ttl, max_idle):
1111+
self._invalidate_cache(key_data)
1112+
return super(MapFeatNearCache, self)._put_internal(key_data, value_data, ttl, max_idle)
1113+
1114+
def _put_if_absent_internal(self, key_data, value_data, ttl, max_idle):
1115+
self._invalidate_cache(key_data)
1116+
return super(MapFeatNearCache, self)._put_if_absent_internal(
1117+
key_data, value_data, ttl, max_idle
1118+
)
1119+
1120+
def _load_all_internal(self, key_data_list, replace_existing_values):
1121+
self._invalidate_cache_batch(key_data_list)
1122+
return super(MapFeatNearCache, self)._load_all_internal(
1123+
key_data_list, replace_existing_values
1124+
)
1125+
1126+
def _execute_on_key_internal(self, key_data, entry_processor_data):
1127+
self._invalidate_cache(key_data)
1128+
return super(MapFeatNearCache, self)._execute_on_key_internal(
1129+
key_data, entry_processor_data
1130+
)
1131+
1132+
def _evict_internal(self, key_data):
1133+
self._invalidate_cache(key_data)
1134+
return super(MapFeatNearCache, self)._evict_internal(key_data)
1135+
1136+
def _delete_internal(self, key_data):
1137+
self._invalidate_cache(key_data)
1138+
return super(MapFeatNearCache, self)._delete_internal(key_data)
1139+
1140+
1141+
async def create_map_proxy(service_name, name, context):
9751142
near_cache_config = context.config.near_caches.get(name, None)
9761143
if near_cache_config is None:
9771144
return Map(service_name, name, context)
978-
raise InvalidConfigurationError("near cache is not supported")
1145+
nc = MapFeatNearCache(service_name, name, context)
1146+
if nc._near_cache.invalidate_on_change:
1147+
await nc._add_near_cache_invalidation_listener()
1148+
return nc

hazelcast/internal/asyncio_reactor.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,14 @@ async def _create_connection(self, config, address):
9797
ssl=ssl_context,
9898
server_hostname=server_hostname,
9999
)
100-
_sock, self._proto = res
100+
sock, self._proto = res
101+
if hasattr(sock, "_ssl_protocol"):
102+
sock = sock._ssl_protocol._transport._sock
103+
else:
104+
sock = sock._sock
105+
sockname = sock.getsockname()
106+
host, port = sockname[0], sockname[1]
107+
self.local_address = Address(host, port)
101108

102109
def _write(self, buf):
103110
self._proto.write(buf)

0 commit comments

Comments
 (0)