Skip to content

Commit ae3c672

Browse files
dpkp88manpreet
authored andcommitted
Check for immediate failure when looking up coordinator in heartbeat thread (dpkp#1457)
1 parent c726adc commit ae3c672

File tree

1 file changed

+11
-15
lines changed

1 file changed

+11
-15
lines changed

kafka/coordinator/base.py

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -940,21 +940,17 @@ def _run_once(self):
940940
self.disable()
941941
return
942942

943-
# TODO: When consumer.wakeup() is implemented, we need to
944-
# disable here to prevent propagating an exception to this
945-
# heartbeat thread
946-
#
947-
# Release coordinator lock during client poll to avoid deadlocks
948-
# if/when connection errback needs coordinator lock
949-
self.coordinator._client.poll(timeout_ms=0)
950-
951-
if self.coordinator.coordinator_unknown():
952-
future = self.coordinator.lookup_coordinator()
953-
if not future.is_done or future.failed():
954-
# the immediate future check ensures that we backoff
955-
# properly in the case that no brokers are available
956-
# to connect to (and the future is automatically failed).
957-
with self.coordinator._lock:
943+
# TODO: When consumer.wakeup() is implemented, we need to
944+
# disable here to prevent propagating an exception to this
945+
# heartbeat thread
946+
self.coordinator._client.poll(timeout_ms=0)
947+
948+
if self.coordinator.coordinator_unknown():
949+
future = self.coordinator.lookup_coordinator()
950+
if not future.is_done or future.failed():
951+
# the immediate future check ensures that we backoff
952+
# properly in the case that no brokers are available
953+
# to connect to (and the future is automatically failed).
958954
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)
959955

960956
elif self.coordinator.heartbeat.session_timeout_expired():

0 commit comments

Comments
 (0)