@@ -149,9 +149,9 @@ def test__reset_offset(fetcher, mocker):
149149 assert fetcher ._subscriptions .assignment [tp ].position == 1001
150150
151151
152- def test__send_offset_requests (fetcher , mocker ):
153- tp = TopicPartition ("topic_send_offset " , 1 )
154- mocked_send = mocker .patch .object (fetcher , "_send_offset_request " )
152+ def test__send_list_offsets_requests (fetcher , mocker ):
153+ tp = TopicPartition ("topic_send_list_offsets " , 1 )
154+ mocked_send = mocker .patch .object (fetcher , "_send_list_offsets_request " )
155155 send_futures = []
156156
157157 def send_side_effect (* args , ** kw ):
@@ -168,19 +168,19 @@ def send_side_effect(*args, **kw):
168168 [None , - 1 ], itertools .cycle ([0 ]))
169169
170170 # Leader == None
171- fut = fetcher ._send_offset_requests ({tp : 0 })
171+ fut = fetcher ._send_list_offsets_requests ({tp : 0 })
172172 assert fut .failed ()
173173 assert isinstance (fut .exception , StaleMetadata )
174174 assert not mocked_send .called
175175
176176 # Leader == -1
177- fut = fetcher ._send_offset_requests ({tp : 0 })
177+ fut = fetcher ._send_list_offsets_requests ({tp : 0 })
178178 assert fut .failed ()
179179 assert isinstance (fut .exception , LeaderNotAvailableError )
180180 assert not mocked_send .called
181181
182182 # Leader == 0, send failed
183- fut = fetcher ._send_offset_requests ({tp : 0 })
183+ fut = fetcher ._send_list_offsets_requests ({tp : 0 })
184184 assert not fut .is_done
185185 assert mocked_send .called
186186 # Check that we bound the futures correctly to chain failure
@@ -189,7 +189,7 @@ def send_side_effect(*args, **kw):
189189 assert isinstance (fut .exception , NotLeaderForPartitionError )
190190
191191 # Leader == 0, send success
192- fut = fetcher ._send_offset_requests ({tp : 0 })
192+ fut = fetcher ._send_list_offsets_requests ({tp : 0 })
193193 assert not fut .is_done
194194 assert mocked_send .called
195195 # Check that we bound the futures correctly to chain success
@@ -198,12 +198,12 @@ def send_side_effect(*args, **kw):
198198 assert fut .value == {tp : (10 , 10000 )}
199199
200200
201- def test__send_offset_requests_multiple_nodes (fetcher , mocker ):
202- tp1 = TopicPartition ("topic_send_offset " , 1 )
203- tp2 = TopicPartition ("topic_send_offset " , 2 )
204- tp3 = TopicPartition ("topic_send_offset " , 3 )
205- tp4 = TopicPartition ("topic_send_offset " , 4 )
206- mocked_send = mocker .patch .object (fetcher , "_send_offset_request " )
201+ def test__send_list_offsets_requests_multiple_nodes (fetcher , mocker ):
202+ tp1 = TopicPartition ("topic_send_list_offsets " , 1 )
203+ tp2 = TopicPartition ("topic_send_list_offsets " , 2 )
204+ tp3 = TopicPartition ("topic_send_list_offsets " , 3 )
205+ tp4 = TopicPartition ("topic_send_list_offsets " , 4 )
206+ mocked_send = mocker .patch .object (fetcher , "_send_list_offsets_request " )
207207 send_futures = []
208208
209209 def send_side_effect (node_id , timestamps ):
@@ -218,7 +218,7 @@ def send_side_effect(node_id, timestamps):
218218
219219 # -- All node succeeded case
220220 tss = OrderedDict ([(tp1 , 0 ), (tp2 , 0 ), (tp3 , 0 ), (tp4 , 0 )])
221- fut = fetcher ._send_offset_requests (tss )
221+ fut = fetcher ._send_list_offsets_requests (tss )
222222 assert not fut .is_done
223223 assert mocked_send .call_count == 2
224224
@@ -244,7 +244,7 @@ def send_side_effect(node_id, timestamps):
244244
245245 # -- First succeeded second not
246246 del send_futures [:]
247- fut = fetcher ._send_offset_requests (tss )
247+ fut = fetcher ._send_list_offsets_requests (tss )
248248 assert len (send_futures ) == 2
249249 send_futures [0 ][2 ].success ({tp1 : (11 , 1001 )})
250250 send_futures [1 ][2 ].failure (UnknownTopicOrPartitionError (tp1 ))
@@ -253,22 +253,22 @@ def send_side_effect(node_id, timestamps):
253253
254254 # -- First fails second succeeded
255255 del send_futures [:]
256- fut = fetcher ._send_offset_requests (tss )
256+ fut = fetcher ._send_list_offsets_requests (tss )
257257 assert len (send_futures ) == 2
258258 send_futures [0 ][2 ].failure (UnknownTopicOrPartitionError (tp1 ))
259259 send_futures [1 ][2 ].success ({tp1 : (11 , 1001 )})
260260 assert fut .failed ()
261261 assert isinstance (fut .exception , UnknownTopicOrPartitionError )
262262
263263
264- def test__handle_offset_response_v1 (fetcher , mocker ):
264+ def test__handle_list_offsets_response_v1 (fetcher , mocker ):
265265 # Broker returns UnsupportedForMessageFormatError, will omit partition
266266 fut = Future ()
267267 res = ListOffsetsResponse [1 ]([
268268 ("topic" , [(0 , 43 , - 1 , - 1 )]),
269269 ("topic" , [(1 , 0 , 1000 , 9999 )])
270270 ])
271- fetcher ._handle_offset_response (fut , res )
271+ fetcher ._handle_list_offsets_response (fut , res )
272272 assert fut .succeeded ()
273273 assert fut .value == {TopicPartition ("topic" , 1 ): (9999 , 1000 )}
274274
@@ -277,7 +277,7 @@ def test__handle_offset_response_v1(fetcher, mocker):
277277 res = ListOffsetsResponse [1 ]([
278278 ("topic" , [(0 , 6 , - 1 , - 1 )]),
279279 ])
280- fetcher ._handle_offset_response (fut , res )
280+ fetcher ._handle_list_offsets_response (fut , res )
281281 assert fut .failed ()
282282 assert isinstance (fut .exception , NotLeaderForPartitionError )
283283
@@ -286,7 +286,7 @@ def test__handle_offset_response_v1(fetcher, mocker):
286286 res = ListOffsetsResponse [1 ]([
287287 ("topic" , [(0 , 3 , - 1 , - 1 )]),
288288 ])
289- fetcher ._handle_offset_response (fut , res )
289+ fetcher ._handle_list_offsets_response (fut , res )
290290 assert fut .failed ()
291291 assert isinstance (fut .exception , UnknownTopicOrPartitionError )
292292
@@ -299,19 +299,19 @@ def test__handle_offset_response_v1(fetcher, mocker):
299299 ("topic" , [(2 , 3 , - 1 , - 1 )]),
300300 ("topic" , [(3 , 0 , 1000 , 9999 )])
301301 ])
302- fetcher ._handle_offset_response (fut , res )
302+ fetcher ._handle_list_offsets_response (fut , res )
303303 assert fut .failed ()
304304 assert isinstance (fut .exception , NotLeaderForPartitionError )
305305
306306
307- def test__handle_offset_response_v2_v3 (fetcher , mocker ):
307+ def test__handle_list_offsets_response_v2_v3 (fetcher , mocker ):
308308 # including a throttle_time shouldnt cause issues
309309 fut = Future ()
310310 res = ListOffsetsResponse [2 ](
311311 123 , # throttle_time_ms
312312 [("topic" , [(0 , 0 , 1000 , 9999 )])
313313 ])
314- fetcher ._handle_offset_response (fut , res )
314+ fetcher ._handle_list_offsets_response (fut , res )
315315 assert fut .succeeded ()
316316 assert fut .value == {TopicPartition ("topic" , 0 ): (9999 , 1000 )}
317317
@@ -321,7 +321,7 @@ def test__handle_offset_response_v2_v3(fetcher, mocker):
321321 123 , # throttle_time_ms
322322 [("topic" , [(0 , 0 , 1000 , 9999 )])
323323 ])
324- fetcher ._handle_offset_response (fut , res )
324+ fetcher ._handle_list_offsets_response (fut , res )
325325 assert fut .succeeded ()
326326 assert fut .value == {TopicPartition ("topic" , 0 ): (9999 , 1000 )}
327327
0 commit comments