88def test_reconnect_on_connection_kill (env ):
99 """
1010 Test that memtier_benchmark can automatically reconnect when connections are killed.
11-
11+
1212 This test:
1313 1. Starts memtier_benchmark with --reconnect-on-error enabled
1414 2. Runs a background thread that periodically kills client connections using CLIENT KILL
1515 3. Verifies that memtier_benchmark successfully reconnects and completes the test
1616 """
1717 key_max = 10000
1818 key_min = 1
19-
19+
2020 # Configure memtier with reconnection enabled
2121 benchmark_specs = {
2222 "name" : env .testName ,
2323 "args" : [
24- ' --pipeline=1' ,
25- ' --ratio=1:1' ,
26- ' --key-pattern=R:R' ,
27- ' --key-minimum={}' .format (key_min ),
28- ' --key-maximum={}' .format (key_max ),
29- ' --reconnect-on-error' , # Enable automatic reconnection
30- ' --max-reconnect-attempts=10' , # Allow up to 10 reconnection attempts
31- ' --reconnect-backoff-factor=1.5' , # Backoff factor for delays
32- ' --connection-timeout=5' , # 5 second connection timeout
33- ]
24+ " --pipeline=1" ,
25+ " --ratio=1:1" ,
26+ " --key-pattern=R:R" ,
27+ " --key-minimum={}" .format (key_min ),
28+ " --key-maximum={}" .format (key_max ),
29+ " --reconnect-on-error" , # Enable automatic reconnection
30+ " --max-reconnect-attempts=10" , # Allow up to 10 reconnection attempts
31+ " --reconnect-backoff-factor=1.5" , # Backoff factor for delays
32+ " --connection-timeout=5" , # 5 second connection timeout
33+ ],
3434 }
3535 addTLSArgs (benchmark_specs , env )
36-
36+
3737 # Use fewer threads/clients and more requests to have a longer running test
3838 config = get_default_memtier_config (threads = 2 , clients = 2 , requests = 5000 )
3939 master_nodes_list = env .getMasterNodesList ()
40- overall_expected_request_count = get_expected_request_count (config , key_min , key_max )
41-
40+ overall_expected_request_count = get_expected_request_count (
41+ config , key_min , key_max
42+ )
43+
4244 add_required_env_arguments (benchmark_specs , config , env , master_nodes_list )
43-
45+
4446 # Create a temporary directory
4547 test_dir = tempfile .mkdtemp ()
4648 config = RunConfig (test_dir , env .testName , config , {})
4749 ensure_clean_benchmark_folder (config .results_dir )
48-
50+
4951 benchmark = Benchmark .from_json (config , benchmark_specs )
50-
52+
5153 # Get master connections for killing clients
5254 master_nodes_connections = env .getOSSMasterNodesConnectionList ()
53-
55+
5456 # Flag to stop the killer thread
5557 stop_killer = threading .Event ()
5658 kill_count = [0 ] # Use list to allow modification in nested function
57-
59+
5860 def client_killer ():
5961 """Background thread that kills client connections periodically"""
6062 while not stop_killer .is_set ():
@@ -63,75 +65,104 @@ def client_killer():
6365 for master_connection in master_nodes_connections :
6466 # Get list of clients
6567 clients = master_connection .execute_command ("CLIENT" , "LIST" )
66-
68+
69+ # CLIENT LIST may return bytes or string depending on Redis client version
70+ if isinstance (clients , bytes ):
71+ clients = clients .decode ('utf-8' )
72+
6773 # Parse client list and find memtier clients
6874 # CLIENT LIST returns a string with one client per line
69- for client_line in clients .split (' \n ' ):
75+ for client_line in clients .split (" \n " ):
7076 if not client_line .strip ():
7177 continue
72-
78+
7379 # Parse client info
7480 client_info = {}
75- for part in client_line .split ():
76- if '=' in part :
77- key , value = part .split ('=' , 1 )
81+ for part in client_line .split (' ' ):
82+ if "=" in part :
83+ key , value = part .split ("=" , 1 )
7884 client_info [key ] = value
79-
85+
8086 # Kill client if it has an ID and is not the current connection
8187 # (avoid killing our own connection)
82- if 'id' in client_info and ' cmd' in client_info :
88+ if "id" in client_info and " cmd" in client_info :
8389 # Don't kill connections running CLIENT LIST
84- if client_info [' cmd' ] != ' client' :
90+ if client_info [" cmd" ] != " client" :
8591 try :
86- master_connection .execute_command ("CLIENT" , "KILL" , "ID" , client_info ['id' ])
92+ master_connection .execute_command (
93+ "CLIENT" , "KILL" , "ID" , client_info ["id" ]
94+ )
8795 kill_count [0 ] += 1
88- env .debugPrint ("Killed client ID: {}" .format (client_info ['id' ]), True )
96+ env .debugPrint (
97+ "Killed client ID: {}" .format (
98+ client_info ["id" ]
99+ ),
100+ True ,
101+ )
89102 except Exception as e :
90103 # Client might have already disconnected
91- env .debugPrint ("Failed to kill client {}: {}" .format (client_info ['id' ], str (e )), True )
104+ env .debugPrint (
105+ "Failed to kill client {}: {}" .format (
106+ client_info ["id" ], str (e )
107+ ),
108+ True ,
109+ )
92110 except Exception as e :
93111 env .debugPrint ("Error in client_killer: {}" .format (str (e )), True )
94-
112+
95113 # Start the killer thread
96114 killer_thread = threading .Thread (target = client_killer )
97115 killer_thread .daemon = True
98116 killer_thread .start ()
99-
117+
100118 try :
101119 # Run memtier_benchmark
102120 memtier_ok = benchmark .run ()
103-
121+
104122 # Stop the killer thread
105123 stop_killer .set ()
106124 killer_thread .join (timeout = 5 )
107-
125+
108126 env .debugPrint ("Total clients killed: {}" .format (kill_count [0 ]), True )
109-
127+
110128 # Verify that we actually killed some connections
111- env .assertTrue (kill_count [0 ] > 0 , "Expected to kill at least one client connection" )
112-
129+ env .assertTrue (
130+ kill_count [0 ] > 0 , "Expected to kill at least one client connection"
131+ )
132+
113133 # Verify memtier completed successfully despite connection kills
114134 debugPrintMemtierOnError (config , env )
115- env .assertTrue (memtier_ok == True , "memtier_benchmark should complete successfully with reconnections" )
116-
135+ env .assertTrue (
136+ memtier_ok == True ,
137+ "memtier_benchmark should complete successfully with reconnections" ,
138+ )
139+
117140 # Verify output files exist
118- env .assertTrue (os .path .isfile (' {0}/mb.stdout' .format (config .results_dir )))
119- env .assertTrue (os .path .isfile (' {0}/mb.stderr' .format (config .results_dir )))
120- env .assertTrue (os .path .isfile (' {0}/mb.json' .format (config .results_dir )))
121-
141+ env .assertTrue (os .path .isfile (" {0}/mb.stdout" .format (config .results_dir )))
142+ env .assertTrue (os .path .isfile (" {0}/mb.stderr" .format (config .results_dir )))
143+ env .assertTrue (os .path .isfile (" {0}/mb.json" .format (config .results_dir )))
144+
122145 # Check stderr for reconnection messages
123- with open (' {0}/mb.stderr' .format (config .results_dir )) as stderr :
146+ with open (" {0}/mb.stderr" .format (config .results_dir )) as stderr :
124147 stderr_content = stderr .read ()
125148 # Should see reconnection attempt messages
126- env .assertTrue ('reconnection' in stderr_content .lower () or 'reconnect' in stderr_content .lower (),
127- "Expected to see reconnection messages in stderr" )
128-
149+ env .assertTrue (
150+ "reconnection" in stderr_content .lower ()
151+ or "reconnect" in stderr_content .lower (),
152+ "Expected to see reconnection messages in stderr" ,
153+ )
154+
129155 # Verify that some requests were completed
130156 # (we may not get the exact expected count due to reconnections, but should get some)
131- merged_command_stats = {'cmdstat_set' : {'calls' : 0 }, 'cmdstat_get' : {'calls' : 0 }}
132- overall_request_count = agg_info_commandstats (master_nodes_connections , merged_command_stats )
157+ merged_command_stats = {
158+ "cmdstat_set" : {"calls" : 0 },
159+ "cmdstat_get" : {"calls" : 0 },
160+ }
161+ overall_request_count = agg_info_commandstats (
162+ master_nodes_connections , merged_command_stats
163+ )
133164 env .assertTrue (overall_request_count > 0 , "Expected some requests to complete" )
134-
165+
135166 finally :
136167 # Make sure to stop the killer thread
137168 stop_killer .set ()
@@ -152,13 +183,13 @@ def test_reconnect_disabled_by_default(env):
152183 benchmark_specs = {
153184 "name" : env .testName ,
154185 "args" : [
155- ' --pipeline=1' ,
156- ' --ratio=1:1' ,
157- ' --key-pattern=R:R' ,
158- ' --key-minimum={}' .format (key_min ),
159- ' --key-maximum={}' .format (key_max ),
186+ " --pipeline=1" ,
187+ " --ratio=1:1" ,
188+ " --key-pattern=R:R" ,
189+ " --key-minimum={}" .format (key_min ),
190+ " --key-maximum={}" .format (key_max ),
160191 # Note: NO --reconnect-on-error flag
161- ]
192+ ],
162193 }
163194 addTLSArgs (benchmark_specs , env )
164195
@@ -180,11 +211,12 @@ def test_reconnect_disabled_by_default(env):
180211
181212 # Start memtier in background
182213 import subprocess
214+
183215 memtier_process = subprocess .Popen (
184- benchmark .command_line_args ,
185- stdout = open (' {0}/mb.stdout' .format (config .results_dir ), 'w' ),
186- stderr = open (' {0}/mb.stderr' .format (config .results_dir ), 'w' ),
187- cwd = config .results_dir
216+ benchmark .args ,
217+ stdout = open (" {0}/mb.stdout" .format (config .results_dir ), "w" ),
218+ stderr = open (" {0}/mb.stderr" .format (config .results_dir ), "w" ),
219+ cwd = config .results_dir ,
188220 )
189221
190222 # Wait a bit for connections to establish
@@ -194,21 +226,34 @@ def test_reconnect_disabled_by_default(env):
194226 killed = False
195227 for master_connection in master_nodes_connections :
196228 clients = master_connection .execute_command ("CLIENT" , "LIST" )
197- for client_line in clients .split ('\n ' ):
229+
230+ # CLIENT LIST may return bytes or string depending on Redis client version
231+ if isinstance (clients , bytes ):
232+ clients = clients .decode ('utf-8' )
233+
234+ for client_line in clients .split ("\n " ):
198235 if not client_line .strip ():
199236 continue
200237
201238 client_info = {}
202239 for part in client_line .split ():
203- if '=' in part :
204- key , value = part .split ('=' , 1 )
240+ if "=" in part :
241+ key , value = part .split ("=" , 1 )
205242 client_info [key ] = value
206243
207- if 'id' in client_info and 'cmd' in client_info and client_info ['cmd' ] != 'client' :
244+ if (
245+ "id" in client_info
246+ and "cmd" in client_info
247+ and client_info ["cmd" ] != "client"
248+ ):
208249 try :
209- master_connection .execute_command ("CLIENT" , "KILL" , "ID" , client_info ['id' ])
250+ master_connection .execute_command (
251+ "CLIENT" , "KILL" , "ID" , client_info ["id" ]
252+ )
210253 killed = True
211- env .debugPrint ("Killed client ID: {}" .format (client_info ['id' ]), True )
254+ env .debugPrint (
255+ "Killed client ID: {}" .format (client_info ["id" ]), True
256+ )
212257 break
213258 except :
214259 pass
@@ -223,4 +268,3 @@ def test_reconnect_disabled_by_default(env):
223268 # So we just verify the test completes one way or another
224269 env .debugPrint ("memtier exit code: {}" .format (return_code ), True )
225270 env .assertTrue (killed , "Expected to kill at least one connection" )
226-
0 commit comments