Skip to content

Commit 91daea3

Browse files
authored
Fix dpkp#1985: fix consumer deadlock when heartbeat thread request timeout (dpkp#2064)
1 parent bd557da commit 91daea3

File tree

1 file changed

+12
-8
lines changed

1 file changed

+12
-8
lines changed

kafka/coordinator/base.py

+12-8
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ def ensure_coordinator_ready(self):
242242
"""Block until the coordinator for this group is known
243243
(and we have an active connection -- java client uses unsent queue).
244244
"""
245-
with self._lock:
245+
with self._client._lock, self._lock:
246246
while self.coordinator_unknown():
247247

248248
# Prior to 0.8.2 there was no group coordinator
@@ -345,7 +345,7 @@ def _handle_join_failure(self, _):
345345

346346
def ensure_active_group(self):
347347
"""Ensure that the group is active (i.e. joined and synced)"""
348-
with self._lock:
348+
with self._client._lock, self._lock:
349349
if self._heartbeat_thread is None:
350350
self._start_heartbeat_thread()
351351

@@ -763,7 +763,7 @@ def close(self):
763763

764764
def maybe_leave_group(self):
765765
"""Leave the current group and reset local generation/memberId."""
766-
with self._lock:
766+
with self._client._lock, self._lock:
767767
if (not self.coordinator_unknown()
768768
and self.state is not MemberState.UNJOINED
769769
and self._generation is not Generation.NO_GENERATION):
@@ -946,6 +946,15 @@ def run(self):
946946
log.debug('Heartbeat thread closed')
947947

948948
def _run_once(self):
949+
with self.coordinator._client._lock, self.coordinator._lock:
950+
if self.enabled and self.coordinator.state is MemberState.STABLE:
951+
# TODO: When consumer.wakeup() is implemented, we need to
952+
# disable here to prevent propagating an exception to this
953+
# heartbeat thread
954+
# must get client._lock, or maybe deadlock at heartbeat
955+
# failure callbak in consumer poll
956+
self.coordinator._client.poll(timeout_ms=0)
957+
949958
with self.coordinator._lock:
950959
if not self.enabled:
951960
log.debug('Heartbeat disabled. Waiting')
@@ -961,11 +970,6 @@ def _run_once(self):
961970
self.disable()
962971
return
963972

964-
# TODO: When consumer.wakeup() is implemented, we need to
965-
# disable here to prevent propagating an exception to this
966-
# heartbeat thread
967-
self.coordinator._client.poll(timeout_ms=0)
968-
969973
if self.coordinator.coordinator_unknown():
970974
future = self.coordinator.lookup_coordinator()
971975
if not future.is_done or future.failed():

0 commit comments

Comments
 (0)