@@ -411,10 +411,10 @@ def _message_generator(self):
411411
412412 tp = self ._next_partition_records .topic_partition
413413
414- # We can ignore any prior signal to drop pending message sets
414+ # We can ignore any prior signal to drop pending record batches
415415 # because we are starting from a fresh one where fetch_offset == position
416416 # i.e., the user seek()'d to this position
417- self ._subscriptions .assignment [tp ].drop_pending_message_set = False
417+ self ._subscriptions .assignment [tp ].drop_pending_record_batch = False
418418
419419 for msg in self ._next_partition_records .take ():
420420
@@ -430,12 +430,12 @@ def _message_generator(self):
430430 break
431431
432432 # If there is a seek during message iteration,
433- # we should stop unpacking this message set and
433+ # we should stop unpacking this record batch and
434434 # wait for a new fetch response that aligns with the
435435 # new seek position
436- elif self ._subscriptions .assignment [tp ].drop_pending_message_set :
437- log .debug ("Skipping remainder of message set for partition %s" , tp )
438- self ._subscriptions .assignment [tp ].drop_pending_message_set = False
436+ elif self ._subscriptions .assignment [tp ].drop_pending_record_batch :
437+ log .debug ("Skipping remainder of record batch for partition %s" , tp )
438+ self ._subscriptions .assignment [tp ].drop_pending_record_batch = False
439439 self ._next_partition_records = None
440440 break
441441
@@ -451,16 +451,16 @@ def _message_generator(self):
451451
452452 self ._next_partition_records = None
453453
454- def _unpack_message_set (self , tp , records ):
454+ def _unpack_records (self , tp , records ):
455455 try :
456456 batch = records .next_batch ()
457457 while batch is not None :
458458
459459 # Try DefaultsRecordBatch / message log format v2
460460 # base_offset, last_offset_delta, and control batches
461461 try :
462- self ._subscriptions .assignment [tp ].last_offset_from_message_batch = batch .base_offset + \
463- batch .last_offset_delta
462+ self ._subscriptions .assignment [tp ].last_offset_from_record_batch = batch .base_offset + \
463+ batch .last_offset_delta
464464 # Control batches have a single record indicating whether a transaction
465465 # was aborted or committed.
466466 # When isolation_level is READ_COMMITTED (currently unsupported)
@@ -673,17 +673,18 @@ def _create_fetch_requests(self):
673673 """
674674 # create the fetch info as a dict of lists of partition info tuples
675675 # which can be passed to FetchRequest() via .items()
676+ version = self ._client .api_version (FetchRequest , max_version = 6 )
676677 fetchable = collections .defaultdict (lambda : collections .defaultdict (list ))
677678
678679 for partition in self ._fetchable_partitions ():
679680 node_id = self ._client .cluster .leader_for_partition (partition )
680681
681682 # advance position for any deleted compacted messages if required
682- if self ._subscriptions .assignment [partition ].last_offset_from_message_batch :
683- next_offset_from_batch_header = self ._subscriptions .assignment [partition ].last_offset_from_message_batch + 1
683+ if self ._subscriptions .assignment [partition ].last_offset_from_record_batch :
684+ next_offset_from_batch_header = self ._subscriptions .assignment [partition ].last_offset_from_record_batch + 1
684685 if next_offset_from_batch_header > self ._subscriptions .assignment [partition ].position :
685686 log .debug (
686- "Advance position for partition %s from %s to %s (last message batch location plus one)"
687+ "Advance position for partition %s from %s to %s (last record batch location plus one)"
687688 " to correct for deleted compacted messages and/or transactional control records" ,
688689 partition , self ._subscriptions .assignment [partition ].position , next_offset_from_batch_header )
689690 self ._subscriptions .assignment [partition ].position = next_offset_from_batch_header
@@ -697,52 +698,60 @@ def _create_fetch_requests(self):
697698 self ._client .cluster .request_update ()
698699
699700 elif self ._client .in_flight_request_count (node_id ) == 0 :
700- partition_info = (
701- partition .partition ,
702- position ,
703- self .config ['max_partition_fetch_bytes' ]
704- )
701+ if version < 5 :
702+ partition_info = (
703+ partition .partition ,
704+ position ,
705+ self .config ['max_partition_fetch_bytes' ]
706+ )
707+ else :
708+ partition_info = (
709+ partition .partition ,
710+ position ,
711+ - 1 , # log_start_offset is used internally by brokers / replicas only
712+ self .config ['max_partition_fetch_bytes' ],
713+ )
705714 fetchable [node_id ][partition .topic ].append (partition_info )
706715 log .debug ("Adding fetch request for partition %s at offset %d" ,
707716 partition , position )
708717 else :
709718 log .log (0 , "Skipping fetch for partition %s because there is an inflight request to node %s" ,
710719 partition , node_id )
711720
712- version = self ._client .api_version (FetchRequest , max_version = 4 )
713721 requests = {}
714722 for node_id , partition_data in six .iteritems (fetchable ):
715- if version < 3 :
723+ # As of version == 3 partitions will be returned in order as
724+ # they are requested, so to avoid starvation with
725+ # `fetch_max_bytes` option we need this shuffle
726+ # NOTE: we do have partition_data in random order due to usage
727+ # of unordered structures like dicts, but that does not
728+ # guarantee equal distribution, and starting in Python3.6
729+ # dicts retain insert order.
730+ partition_data = list (partition_data .items ())
731+ random .shuffle (partition_data )
732+
733+ if version <= 2 :
734+ requests [node_id ] = FetchRequest [version ](
735+ - 1 , # replica_id
736+ self .config ['fetch_max_wait_ms' ],
737+ self .config ['fetch_min_bytes' ],
738+ partition_data )
739+ elif version == 3 :
716740 requests [node_id ] = FetchRequest [version ](
717741 - 1 , # replica_id
718742 self .config ['fetch_max_wait_ms' ],
719743 self .config ['fetch_min_bytes' ],
720- partition_data .items ())
744+ self .config ['fetch_max_bytes' ],
745+ partition_data )
721746 else :
722- # As of version == 3 partitions will be returned in order as
723- # they are requested, so to avoid starvation with
724- # `fetch_max_bytes` option we need this shuffle
725- # NOTE: we do have partition_data in random order due to usage
726- # of unordered structures like dicts, but that does not
727- # guarantee equal distribution, and starting in Python3.6
728- # dicts retain insert order.
729- partition_data = list (partition_data .items ())
730- random .shuffle (partition_data )
731- if version == 3 :
732- requests [node_id ] = FetchRequest [version ](
733- - 1 , # replica_id
734- self .config ['fetch_max_wait_ms' ],
735- self .config ['fetch_min_bytes' ],
736- self .config ['fetch_max_bytes' ],
737- partition_data )
738- else :
739- requests [node_id ] = FetchRequest [version ](
740- - 1 , # replica_id
741- self .config ['fetch_max_wait_ms' ],
742- self .config ['fetch_min_bytes' ],
743- self .config ['fetch_max_bytes' ],
744- self ._isolation_level ,
745- partition_data )
747+ # through v6
748+ requests [node_id ] = FetchRequest [version ](
749+ - 1 , # replica_id
750+ self .config ['fetch_max_wait_ms' ],
751+ self .config ['fetch_min_bytes' ],
752+ self .config ['fetch_max_bytes' ],
753+ self ._isolation_level ,
754+ partition_data )
746755 return requests
747756
748757 def _handle_fetch_response (self , request , send_time , response ):
@@ -821,7 +830,7 @@ def _parse_fetched_data(self, completed_fetch):
821830 log .debug ("Adding fetched record for partition %s with"
822831 " offset %d to buffered record list" , tp ,
823832 position )
824- unpacked = list (self ._unpack_message_set (tp , records ))
833+ unpacked = list (self ._unpack_records (tp , records ))
825834 parsed_records = self .PartitionRecords (fetch_offset , tp , unpacked )
826835 if unpacked :
827836 last_offset = unpacked [- 1 ].offset
@@ -845,7 +854,9 @@ def _parse_fetched_data(self, completed_fetch):
845854 self ._sensors .record_topic_fetch_metrics (tp .topic , num_bytes , records_count )
846855
847856 elif error_type in (Errors .NotLeaderForPartitionError ,
848- Errors .UnknownTopicOrPartitionError ):
857+ Errors .UnknownTopicOrPartitionError ,
858+ Errors .KafkaStorageError ):
859+ log .debug ("Error fetching partition %s: %s" , tp , error_type .__name__ )
849860 self ._client .cluster .request_update ()
850861 elif error_type is Errors .OffsetOutOfRangeError :
851862 position = self ._subscriptions .assignment [tp ].position
@@ -862,8 +873,10 @@ def _parse_fetched_data(self, completed_fetch):
862873 elif error_type is Errors .TopicAuthorizationFailedError :
863874 log .warning ("Not authorized to read from topic %s." , tp .topic )
864875 raise Errors .TopicAuthorizationFailedError (set (tp .topic ))
865- elif error_type is Errors .UnknownError :
866- log .warning ("Unknown error fetching data for topic-partition %s" , tp )
876+ elif error_type .is_retriable :
877+ log .debug ("Retriable error fetching partition %s: %s" , tp , error_type ())
878+ if error_type .invalid_metadata :
879+ self ._client .cluster .request_update ()
867880 else :
868881 raise error_type ('Unexpected error while fetching data' )
869882
0 commit comments