Skip to content

Commit d3aae84

Browse files
authored
Asyncio module soak tests (#755)
* Updated the code to get the raw socket in `AsyncioReactor._create_connection` to be compatible with `uvloop`. * Added soak tests for the asyncio client * Updated client/member shell scripts to terminate clients/members on termination. * Removed Python 3.7 support declaration (which was accidentally left).
1 parent 41d65f9 commit d3aae84

File tree

7 files changed

+319
-14
lines changed

7 files changed

+319
-14
lines changed

hazelcast/asyncio/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import warnings
22

3-
warnings.warn("Asyncio API for Hazelcast Python Client is in BETA. DO NOT use it in production.")
3+
warnings.warn("Asyncio API for Hazelcast Python Client is BETA. DO NOT use it in production.")
44
del warnings
55

66
__all__ = ["HazelcastClient", "Map"]

hazelcast/internal/asyncio_reactor.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,10 +130,7 @@ async def _create_connection(self, config, address):
130130
self._connected = True
131131

132132
sock, self._proto = res
133-
if hasattr(sock, "_ssl_protocol"):
134-
sock = sock._ssl_protocol._transport._sock
135-
else:
136-
sock = sock._sock
133+
sock = sock.get_extra_info("socket")
137134
sockname = sock.getsockname()
138135
host, port = sockname[0], sockname[1]
139136
self.local_address = Address(host, port)

setup.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
"Programming Language :: Python",
3636
"Programming Language :: Python :: 3",
3737
"Programming Language :: Python :: 3 :: Only",
38-
"Programming Language :: Python :: 3.7",
3938
"Programming Language :: Python :: 3.11",
4039
"Programming Language :: Python :: 3.12",
4140
"Programming Language :: Python :: 3.13",

tests/soak_test/hazelcast.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<hazelcast xmlns="http://www.hazelcast.com/schema/config"
22
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
33
xsi:schemaLocation="http://www.hazelcast.com/schema/config
4-
https://www.hazelcast.com/schema/config/hazelcast-config-5.0.xsd">
4+
https://www.hazelcast.com/schema/config/hazelcast-config-5.6.xsd">
55
<network>
66
<join>
77
<multicast enabled="true">
Lines changed: 268 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
import argparse
2+
import asyncio
3+
import logging
4+
import os
5+
import random
6+
import sys
7+
import time
8+
import typing
9+
from collections import defaultdict
10+
11+
sys.path.append(os.path.join(os.path.dirname(__file__), "../../"))
12+
# Making sure that hazelcast directory is in the sys.path so we can import modules from there in the command line.
13+
14+
from hazelcast.asyncio import HazelcastClient, Map
15+
from hazelcast.predicate import between
16+
from hazelcast.serialization.api import IdentifiedDataSerializable
17+
18+
if hasattr(time, "monotonic"):
19+
get_current_timestamp = time.monotonic
20+
else:
21+
get_current_timestamp = time.time
22+
23+
24+
TASK_COUNT = 256
25+
ENTRY_COUNT = 10000
26+
OBSERVATION_INTERVAL = 10.0
27+
28+
29+
class SoakTestCoordinator:
30+
def __init__(self, test_duration, address):
31+
self.address = address
32+
self._task_count_before = len(asyncio.all_tasks())
33+
self._deadline = get_current_timestamp() + test_duration * 60 * 60
34+
self._lock = asyncio.Lock()
35+
# the following are protected by the lock above
36+
self._reached_deadline = False
37+
self._tests_failed = False
38+
39+
async def start_tests(self):
40+
client, test_map = await self._init_client_map()
41+
logging.info("Soak test operations are starting!")
42+
logging.info("* " * 20 + "\n")
43+
test_runners = [TestRunner(i, self) for i in range(TASK_COUNT)]
44+
observer = OperationCountObserver(self, test_runners)
45+
async with asyncio.TaskGroup() as tg:
46+
observer_task = tg.create_task(observer.run())
47+
for runner in test_runners:
48+
tg.create_task(runner.run(test_map))
49+
50+
await observer_task
51+
52+
logging.info("* " * 20)
53+
logging.info("Soak test has finished!")
54+
logging.info("-" * 40)
55+
56+
if await self.tests_failed():
57+
logging.info("Soak test failed!")
58+
return
59+
60+
hang_counts = await observer.hang_counts()
61+
if not hang_counts:
62+
logging.info("All threads worked without hanging")
63+
else:
64+
for runner, hang_count in hang_counts:
65+
logging.info("Thread %s hanged %s times.", runner.id, hang_count)
66+
67+
await client.shutdown()
68+
# wait for canceled tasks to expire
69+
await asyncio.sleep(1)
70+
task_count_after = len(asyncio.all_tasks())
71+
logging.info("Task count before: %s, after: %s", self._task_count_before, task_count_after)
72+
73+
async def notify_error(self):
74+
async with self._lock:
75+
self._tests_failed = True
76+
77+
async def should_continue_tests(self):
78+
async with self._lock:
79+
return not (self._reached_deadline or self._tests_failed)
80+
81+
async def check_deadline(self):
82+
async with self._lock:
83+
now = get_current_timestamp()
84+
self._reached_deadline = now >= self._deadline
85+
86+
async def tests_failed(self) -> bool:
87+
async with self._lock:
88+
return self._tests_failed
89+
90+
async def _init_client_map(self) -> typing.Tuple[HazelcastClient, Map]:
91+
def no_op_listener(_):
92+
pass
93+
94+
try:
95+
client = await HazelcastClient.create_and_start(
96+
cluster_members=[self.address],
97+
data_serializable_factories={
98+
SimpleEntryProcessor.FACTORY_ID: {
99+
SimpleEntryProcessor.CLASS_ID: SimpleEntryProcessor
100+
}
101+
},
102+
)
103+
map = await client.get_map("test-map")
104+
await map.add_entry_listener(
105+
include_value=False,
106+
added_func=no_op_listener,
107+
removed_func=no_op_listener,
108+
updated_func=no_op_listener,
109+
)
110+
return client, map
111+
112+
except Exception as e:
113+
logging.exception("Client failed to start")
114+
raise e
115+
116+
117+
class TestRunner:
118+
def __init__(self, id, coordinator: SoakTestCoordinator):
119+
self.id = id
120+
self.coordinator = coordinator
121+
self.counter = OperationCounter()
122+
123+
async def run(self, test_map):
124+
coordinator = self.coordinator
125+
processor = SimpleEntryProcessor("test")
126+
while await coordinator.should_continue_tests():
127+
key = str(random.randint(0, ENTRY_COUNT))
128+
value = str(random.randint(0, ENTRY_COUNT))
129+
operation = random.randint(0, 100)
130+
try:
131+
if operation < 30:
132+
await test_map.get(key)
133+
elif operation < 60:
134+
await test_map.put(key, value)
135+
elif operation < 80:
136+
await test_map.values(between("this", 0, 10))
137+
else:
138+
await test_map.execute_on_key(key, processor)
139+
140+
await self.counter.increment()
141+
except Exception:
142+
await coordinator.notify_error()
143+
logging.exception("Unexpected error occurred in thread %s", self)
144+
return
145+
146+
147+
class OperationCountObserver:
148+
def __init__(self, coordinator, test_runners):
149+
self.coordinator = coordinator
150+
self.test_runners = test_runners
151+
self._lock = asyncio.Lock()
152+
# the lock above protects the fields below
153+
self._hang_counts = defaultdict(int)
154+
155+
async def hang_counts(self):
156+
async with self._lock:
157+
return self._hang_counts
158+
159+
async def _increment_hang_count(self, runner):
160+
async with self._lock:
161+
self._hang_counts[runner] += 1
162+
163+
async def run(self):
164+
while True:
165+
await asyncio.sleep(OBSERVATION_INTERVAL)
166+
await self.coordinator.check_deadline()
167+
if not await self.coordinator.should_continue_tests():
168+
break
169+
170+
logging.info("-" * 40)
171+
op_count = 0
172+
hanged_runners = []
173+
174+
for test_runner in self.test_runners:
175+
op_count_per_runner = await test_runner.counter.get_and_reset()
176+
op_count += op_count_per_runner
177+
if op_count == 0:
178+
hanged_runners.append(test_runner)
179+
180+
if not hanged_runners:
181+
logging.info("All threads worked without hanging")
182+
else:
183+
logging.info("%s threads hanged: %s", len(hanged_runners), hanged_runners)
184+
for hanged_runner in hanged_runners:
185+
await self._increment_hang_count(hanged_runner)
186+
187+
logging.info("-" * 40)
188+
logging.info("Operations Per Second: %s\n", op_count / OBSERVATION_INTERVAL)
189+
190+
191+
class OperationCounter:
192+
def __init__(self):
193+
self._count = 0
194+
self._lock = asyncio.Lock()
195+
196+
async def get_and_reset(self):
197+
async with self._lock:
198+
total = self._count
199+
self._count = 0
200+
return total
201+
202+
async def increment(self):
203+
async with self._lock:
204+
self._count += 1
205+
206+
207+
class SimpleEntryProcessor(IdentifiedDataSerializable):
208+
CLASS_ID = 1
209+
FACTORY_ID = 66
210+
211+
def __init__(self, value):
212+
self.value = value
213+
214+
def read_data(self, object_data_input):
215+
pass
216+
217+
def write_data(self, object_data_output):
218+
object_data_output.write_string(self.value)
219+
220+
def get_class_id(self):
221+
return self.CLASS_ID
222+
223+
def get_factory_id(self):
224+
return self.FACTORY_ID
225+
226+
227+
def parse_arguments():
228+
parser = argparse.ArgumentParser()
229+
parser.add_argument(
230+
"--duration",
231+
default=48.0,
232+
type=float,
233+
help="Duration of the test in hours",
234+
)
235+
parser.add_argument(
236+
"--address",
237+
required=True,
238+
type=str,
239+
help="host:port of the one of the cluster members",
240+
)
241+
parser.add_argument(
242+
"--log-file",
243+
required=True,
244+
type=str,
245+
help="Name of the log file",
246+
)
247+
return parser.parse_args()
248+
249+
250+
def setup_logging(log_file):
251+
logging.basicConfig(
252+
filename=log_file,
253+
filemode="w",
254+
format="%(asctime)s %(message)s",
255+
datefmt="%H:%M:%S",
256+
level=logging.INFO,
257+
)
258+
259+
260+
async def amain():
261+
arguments = parse_arguments()
262+
setup_logging(arguments.log_file)
263+
coordinator = SoakTestCoordinator(arguments.duration, arguments.address)
264+
await coordinator.start_tests()
265+
266+
267+
if __name__ == "__main__":
268+
asyncio.run(amain())

tests/soak_test/start_clients.sh

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,35 @@ DURATION=${2:-48.0}
55

66
mkdir -p "client_logs"
77

8-
for i in {0..9}
9-
do
10-
python map_soak_test.py \
11-
--duration "$DURATION" \
12-
--address "$ADDRESS" \
13-
--log-file client_logs/client-"$i" &
8+
declare -a pids
9+
10+
cleanup () {
11+
for pid in "${pids[@]}"; do
12+
echo "Stopping $pid"
13+
kill "$pid"
14+
done
15+
}
16+
17+
trap cleanup EXIT
18+
19+
for i in {1..5}; do
20+
python map_soak_test_asyncio.py \
21+
--duration "$DURATION" \
22+
--address "$ADDRESS" \
23+
--log-file client_logs/client-asyncio-"$i" &
24+
pid=$!
25+
echo "$pid running"
26+
pids+=("$pid")
1427
done
28+
29+
for i in {1..5}; do
30+
python map_soak_test.py \
31+
--duration "$DURATION" \
32+
--address "$ADDRESS" \
33+
--log-file client_logs/client-"$i" &
34+
pid=$!
35+
echo "$pid running"
36+
pids+=("$pid")
37+
done
38+
39+
wait

tests/soak_test/start_members.sh

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,20 @@
11
#!/bin/bash
22

3-
VERSION="5.0"
3+
VERSION="5.6.0"
44

55
mkdir -p "member_logs"
66

7+
declare -a pids
8+
9+
cleanup () {
10+
for pid in "${pids[@]}"; do
11+
echo "Stopping $pid"
12+
kill "$pid"
13+
done
14+
}
15+
16+
trap cleanup EXIT
17+
718
CLASSPATH="hazelcast-${VERSION}.jar:hazelcast-${VERSION}-tests.jar"
819
CMD_CONFIGS="-Djava.net.preferIPv4Stack=true"
920

@@ -13,4 +24,9 @@ do
1324
com.hazelcast.core.server.HazelcastMemberStarter \
1425
1> member_logs/hazelcast-err-"$i" \
1526
2> member_logs/hazelcast-out-"$i" &
27+
pid=$!
28+
echo "$pid running"
29+
pids+=("$pid")
1630
done
31+
32+
wait

0 commit comments

Comments
 (0)