Skip to content

Commit 9e04d8e

Browse files
committed
discard leader_epoch data for now
1 parent fb61535 commit 9e04d8e

File tree

3 files changed

+10
-9
lines changed

3 files changed

+10
-9
lines changed

kafka/consumer/fetcher.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -253,10 +253,9 @@ def _retrieve_offsets(self, timestamps, timeout_ms=float("inf")):
253253
or ``timeout_ms`` passed.
254254
255255
Arguments:
256-
timestamps: {TopicPartition: (int, int)} dict with (timestamp, leader_epoch)
257-
tuples s to fetch offsets by. Timestamp is -1 for the latest available, and
258-
-2 for the earliest available. Otherwise timestamp is treated as epoch milliseconds.
259-
Leader epoch is -1 to ignore, otherwise last known epoch value from partition.
256+
timestamps: {TopicPartition: int} dict with timestamps to fetch
257+
offsets by. -1 for the latest available, -2 for the earliest
258+
available. Otherwise timestamp is treated as epoch milliseconds.
260259
261260
Returns:
262261
{TopicPartition: OffsetAndTimestamp}: Mapping of partition to
@@ -385,7 +384,8 @@ def _append(self, drained, part, max_records, update_offsets):
385384
drained[tp].append(record)
386385

387386
if update_offsets:
388-
self._subscriptions.assignment[tp].position = OffsetAndMetadata(next_offset, '', leader_epoch)
387+
# TODO: save leader_epoch
388+
self._subscriptions.assignment[tp].position = OffsetAndMetadata(next_offset, '', -1)
389389
return len(part_records)
390390

391391
else:
@@ -549,7 +549,7 @@ def _send_list_offsets_requests(self, timestamps):
549549
return Future().failure(
550550
Errors.LeaderNotAvailableError(partition))
551551
else:
552-
leader_epoch = self._client.cluster.leader_epoch_for_partition(partition)
552+
leader_epoch = -1
553553
timestamps_by_node[node_id][partition] = (timestamp, leader_epoch)
554554

555555
# Aggregate results until we have all

kafka/coordinator/consumer.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -835,7 +835,8 @@ def _handle_offset_fetch_response(self, future, response):
835835
elif offset >= 0:
836836
# record the position with the offset
837837
# (-1 indicates no committed offset to fetch)
838-
offsets[tp] = OffsetAndMetadata(offset, metadata, leader_epoch)
838+
# TODO: save leader_epoch
839+
offsets[tp] = OffsetAndMetadata(offset, metadata, -1)
839840
else:
840841
log.debug("Group %s has no committed offset for partition"
841842
" %s", self.group_id, tp)

test/test_fetcher.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,8 +244,8 @@ def send_side_effect(node_id, timestamps):
244244
else:
245245
second_future = f
246246
assert req_by_node == {
247-
0: {tp1: (0, 0), tp3: (0, 0)},
248-
1: {tp2: (0, 0), tp4: (0, 0)}
247+
0: {tp1: (0, -1), tp3: (0, -1)},
248+
1: {tp2: (0, -1), tp4: (0, -1)}
249249
}
250250

251251
# We only resolved 1 future so far, so result future is not yet ready

0 commit comments

Comments
 (0)