Skip to content

Commit 4a1d94a

Browse files
committed
Support Fetch Request/Response v6 in consumer
1 parent 837a600 commit 4a1d94a

File tree

3 files changed

+49
-38
lines changed

3 files changed

+49
-38
lines changed

kafka/consumer/fetcher.py

Lines changed: 46 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ def _message_generator(self):
451451

452452
self._next_partition_records = None
453453

454-
def _unpack_message_set(self, tp, records):
454+
def _unpack_records(self, tp, records):
455455
try:
456456
batch = records.next_batch()
457457
while batch is not None:
@@ -673,6 +673,7 @@ def _create_fetch_requests(self):
673673
"""
674674
# create the fetch info as a dict of lists of partition info tuples
675675
# which can be passed to FetchRequest() via .items()
676+
version = self._client.api_version(FetchRequest, max_version=6)
676677
fetchable = collections.defaultdict(lambda: collections.defaultdict(list))
677678

678679
for partition in self._fetchable_partitions():
@@ -697,52 +698,60 @@ def _create_fetch_requests(self):
697698
self._client.cluster.request_update()
698699

699700
elif self._client.in_flight_request_count(node_id) == 0:
700-
partition_info = (
701-
partition.partition,
702-
position,
703-
self.config['max_partition_fetch_bytes']
704-
)
701+
if version < 5:
702+
partition_info = (
703+
partition.partition,
704+
position,
705+
self.config['max_partition_fetch_bytes']
706+
)
707+
else:
708+
partition_info = (
709+
partition.partition,
710+
position,
711+
self.config['max_partition_fetch_bytes'],
712+
-1, # log_start_offset is used internally by brokers / replicas only
713+
)
705714
fetchable[node_id][partition.topic].append(partition_info)
706715
log.debug("Adding fetch request for partition %s at offset %d",
707716
partition, position)
708717
else:
709718
log.log(0, "Skipping fetch for partition %s because there is an inflight request to node %s",
710719
partition, node_id)
711720

712-
version = self._client.api_version(FetchRequest, max_version=4)
713721
requests = {}
714722
for node_id, partition_data in six.iteritems(fetchable):
715-
if version < 3:
723+
# As of version == 3 partitions will be returned in order as
724+
# they are requested, so to avoid starvation with
725+
# `fetch_max_bytes` option we need this shuffle
726+
# NOTE: we do have partition_data in random order due to usage
727+
# of unordered structures like dicts, but that does not
728+
# guarantee equal distribution, and starting in Python3.6
729+
# dicts retain insert order.
730+
partition_data = list(partition_data.items())
731+
random.shuffle(partition_data)
732+
733+
if version <= 2:
734+
requests[node_id] = FetchRequest[version](
735+
-1, # replica_id
736+
self.config['fetch_max_wait_ms'],
737+
self.config['fetch_min_bytes'],
738+
partition_data)
739+
elif version == 3:
716740
requests[node_id] = FetchRequest[version](
717741
-1, # replica_id
718742
self.config['fetch_max_wait_ms'],
719743
self.config['fetch_min_bytes'],
720-
partition_data.items())
744+
self.config['fetch_max_bytes'],
745+
partition_data)
721746
else:
722-
# As of version == 3 partitions will be returned in order as
723-
# they are requested, so to avoid starvation with
724-
# `fetch_max_bytes` option we need this shuffle
725-
# NOTE: we do have partition_data in random order due to usage
726-
# of unordered structures like dicts, but that does not
727-
# guarantee equal distribution, and starting in Python3.6
728-
# dicts retain insert order.
729-
partition_data = list(partition_data.items())
730-
random.shuffle(partition_data)
731-
if version == 3:
732-
requests[node_id] = FetchRequest[version](
733-
-1, # replica_id
734-
self.config['fetch_max_wait_ms'],
735-
self.config['fetch_min_bytes'],
736-
self.config['fetch_max_bytes'],
737-
partition_data)
738-
else:
739-
requests[node_id] = FetchRequest[version](
740-
-1, # replica_id
741-
self.config['fetch_max_wait_ms'],
742-
self.config['fetch_min_bytes'],
743-
self.config['fetch_max_bytes'],
744-
self._isolation_level,
745-
partition_data)
747+
# through v6
748+
requests[node_id] = FetchRequest[version](
749+
-1, # replica_id
750+
self.config['fetch_max_wait_ms'],
751+
self.config['fetch_min_bytes'],
752+
self.config['fetch_max_bytes'],
753+
self._isolation_level,
754+
partition_data)
746755
return requests
747756

748757
def _handle_fetch_response(self, request, send_time, response):
@@ -821,7 +830,7 @@ def _parse_fetched_data(self, completed_fetch):
821830
log.debug("Adding fetched record for partition %s with"
822831
" offset %d to buffered record list", tp,
823832
position)
824-
unpacked = list(self._unpack_message_set(tp, records))
833+
unpacked = list(self._unpack_records(tp, records))
825834
parsed_records = self.PartitionRecords(fetch_offset, tp, unpacked)
826835
if unpacked:
827836
last_offset = unpacked[-1].offset
@@ -845,7 +854,9 @@ def _parse_fetched_data(self, completed_fetch):
845854
self._sensors.record_topic_fetch_metrics(tp.topic, num_bytes, records_count)
846855

847856
elif error_type in (Errors.NotLeaderForPartitionError,
848-
Errors.UnknownTopicOrPartitionError):
857+
Errors.UnknownTopicOrPartitionError,
858+
Errors.KafkaStorageError):
859+
log.debug("Error fetching partition %s: %s", tp, error_type.__name__)
849860
self._client.cluster.request_update()
850861
elif error_type is Errors.OffsetOutOfRangeError:
851862
position = self._subscriptions.assignment[tp].position

kafka/protocol/fetch.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ class FetchRequest_v6(Request):
264264

265265
class FetchRequest_v7(Request):
266266
"""
267-
Add incremental fetch requests
267+
Add incremental fetch requests (see KIP-227)
268268
"""
269269
API_KEY = 1
270270
API_VERSION = 7

test/test_fetcher.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ def test__handle_fetch_error(fetcher, caplog, exception, log_level):
399399
assert caplog.records[0].levelname == logging.getLevelName(log_level)
400400

401401

402-
def test__unpack_message_set(fetcher):
402+
def test__unpack_records(fetcher):
403403
fetcher.config['check_crcs'] = False
404404
tp = TopicPartition('foo', 0)
405405
messages = [
@@ -408,7 +408,7 @@ def test__unpack_message_set(fetcher):
408408
(None, b"c", None),
409409
]
410410
memory_records = MemoryRecords(_build_record_batch(messages))
411-
records = list(fetcher._unpack_message_set(tp, memory_records))
411+
records = list(fetcher._unpack_records(tp, memory_records))
412412
assert len(records) == 3
413413
assert all(map(lambda x: isinstance(x, ConsumerRecord), records))
414414
assert records[0].value == b'a'

0 commit comments

Comments
 (0)