Skip to content

Commit 20e5761

Browse files
committed
Fix saveOffsets
1 parent 1def81c commit 20e5761

File tree

1 file changed

+7
-6
lines changed

1 file changed

+7
-6
lines changed

src/Consumer/OffsetManager.php

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -213,16 +213,17 @@ public function saveOffsets(int $partition, int $retry = 0): void
213213
$broker = $this->broker;
214214
for ($i = 0; $i <= $retry; ++$i) {
215215
/** @var OffsetCommitResponse $response */
216-
$response = $broker->getClientByBrokerId($broker->getBrokerIdByTopic($this->topic, $partition))->sendRecv($request);
216+
$response = $broker->getClientByBrokerId($this->coordinatorNodeId)->sendRecv($request);
217217
foreach ($response->getTopics() as $topic) {
218218
foreach ($topic->getPartitions() as $topicPartition) {
219219
$errorCode = $topicPartition->getErrorCode();
220-
if (!ErrorCode::success($errorCode)) {
221-
if ($retry > 0 && ErrorCode::canRetry($errorCode)) {
222-
continue 3;
223-
}
224-
ErrorCode::check($errorCode);
220+
if (ErrorCode::success($errorCode)) {
221+
return;
225222
}
223+
if (ErrorCode::canRetry($errorCode)) {
224+
continue 3;
225+
}
226+
ErrorCode::check($errorCode);
226227
}
227228
}
228229
}

0 commit comments

Comments
 (0)