@@ -261,7 +261,7 @@ def send_side_effect(node_id, timestamps):
261261 assert isinstance (fut .exception , UnknownTopicOrPartitionError )
262262
263263
264- def test__handle_offset_response (fetcher , mocker ):
264+ def test__handle_offset_response_v1 (fetcher , mocker ):
265265 # Broker returns UnsupportedForMessageFormatError, will omit partition
266266 fut = Future ()
267267 res = ListOffsetsResponse [1 ]([
@@ -304,6 +304,28 @@ def test__handle_offset_response(fetcher, mocker):
304304 assert isinstance (fut .exception , NotLeaderForPartitionError )
305305
306306
307+ def test__handle_offset_response_v2_v3 (fetcher , mocker ):
308+ # including a throttle_time shouldnt cause issues
309+ fut = Future ()
310+ res = ListOffsetsResponse [2 ](
311+ 123 , # throttle_time_ms
312+ [("topic" , [(0 , 0 , 1000 , 9999 )])
313+ ])
314+ fetcher ._handle_offset_response (fut , res )
315+ assert fut .succeeded ()
316+ assert fut .value == {TopicPartition ("topic" , 0 ): (9999 , 1000 )}
317+
318+ # v3 response is the same format
319+ fut = Future ()
320+ res = ListOffsetsResponse [3 ](
321+ 123 , # throttle_time_ms
322+ [("topic" , [(0 , 0 , 1000 , 9999 )])
323+ ])
324+ fetcher ._handle_offset_response (fut , res )
325+ assert fut .succeeded ()
326+ assert fut .value == {TopicPartition ("topic" , 0 ): (9999 , 1000 )}
327+
328+
307329def test_fetched_records (fetcher , topic , mocker ):
308330 fetcher .config ['check_crcs' ] = False
309331 tp = TopicPartition (topic , 0 )
0 commit comments