Skip to content

Commit ba5154f

Browse files
committed
Fixup FetchRequestData init; to_forget encoding
1 parent e49b8dc commit ba5154f

File tree

1 file changed

+8
-3
lines changed

1 file changed

+8
-3
lines changed

kafka/consumer/fetcher.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1017,7 +1017,7 @@ def build_next(self, next_partitions):
10171017
log.debug("Built incremental fetch %s for node %s. Added %s, altered %s, removed %s out of %s",
10181018
self.next_metadata, self.node_id, added, altered, removed, self.session_partitions.keys())
10191019
to_send = {tp: next_partitions[tp] for tp in (added | altered)}
1020-
return FetchRequestData(to_send, removed, self.session_partitions.copy(), self.next_metadata)
1020+
return FetchRequestData(to_send, removed, self.next_metadata)
10211021

10221022
def handle_response(self, response):
10231023
if response.error_code != Errors.NoError.errno:
@@ -1127,7 +1127,7 @@ class FetchRequestData(object):
11271127

11281128
def __init__(self, to_send, to_forget, metadata):
11291129
self._to_send = to_send # {TopicPartition: (partition, ...)}
1130-
self._to_forget = to_forget
1130+
self._to_forget = to_forget # {TopicPartition}
11311131
self._metadata = metadata
11321132

11331133
@property
@@ -1153,7 +1153,12 @@ def to_send(self):
11531153

11541154
@property
11551155
def to_forget(self):
1156-
return self._to_forget
1156+
# Return as list of [(topic, (partiiton, ...)), ...]
1157+
# so it an be passed directly to encoder
1158+
partition_data = collections.defaultdict(list)
1159+
for tp in six.iteritems(self._to_forget):
1160+
partition_data[tp.topic].append(tp.partition)
1161+
return list(partition_data.items())
11571162

11581163

11591164
class FetchResponseMetricAggregator(object):

0 commit comments

Comments
 (0)