Skip to content

Commit f598822

Browse files
authored
Fix #44 (#45)
* use currentLeaderEpoch when updateListOffsets #44 * Fix
1 parent 9fd1d95 commit f598822

File tree

1 file changed

+13
-1
lines changed

1 file changed

+13
-1
lines changed

src/Consumer/OffsetManager.php

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,11 +120,23 @@ public function updateListOffsets(array $partitions, int $retry = 0): void
120120
foreach ($partitions as $partition) {
121121
$brokerPartitionMap[$broker->getBrokerIdByTopic($topicName, $partition)][] = $partition;
122122
}
123+
$topicsMeta = $broker->getTopicsMeta($topicName);
123124
foreach ($brokerPartitionMap as $brokerId => $partitions) {
124125
$request = new ListOffsetRequest();
125126
$topicPartitions = [];
126127
foreach ($partitions as $partition) {
127128
$topicPartitions[] = $listOffsetPartition = new ListOffsetPartition();
129+
foreach ($topicsMeta as $topicMeta) {
130+
if ($topicMeta->getName() === $topicName) {
131+
foreach ($topicMeta->getPartitions() as $partitionObject) {
132+
if ($partition === $partitionObject->getPartitionIndex()) {
133+
$listOffsetPartition->setCurrentLeaderEpoch($partitionObject->getLeaderEpoch());
134+
break;
135+
}
136+
}
137+
break;
138+
}
139+
}
128140
$listOffsetPartition->setPartitionIndex($partition)->setTimestamp(-1);
129141
}
130142
$request->setTopics([
@@ -135,7 +147,7 @@ public function updateListOffsets(array $partitions, int $retry = 0): void
135147
$response = KafkaUtil::retry($client, $request, $retry, 0);
136148
foreach ($response->getTopics() as $topic) {
137149
foreach ($topic->getPartitions() as $partition) {
138-
$this->offsets[$partition->getPartitionIndex()] = $partition->getOffset();
150+
$this->offsets[$partition->getPartitionIndex()] = $partition->getOffset() - 1;
139151
}
140152
}
141153
}

0 commit comments

Comments
 (0)