Skip to content

Commit 9f0b518

Browse files
dpkpjeffwidman
authored andcommitted
Reduce client poll timeout when no ifrs
1 parent 5bb1abd commit 9f0b518

File tree

2 files changed

+15
-0
lines changed

2 files changed

+15
-0
lines changed

kafka/client_async.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,9 @@ def poll(self, timeout_ms=None, future=None):
588588
metadata_timeout_ms,
589589
idle_connection_timeout_ms,
590590
self.config['request_timeout_ms'])
591+
# if there are no requests in flight, do not block longer than the retry backoff
592+
if self.in_flight_request_count() == 0:
593+
timeout = min(timeout, self.config['retry_backoff_ms'])
591594
timeout = max(0, timeout / 1000) # avoid negative timeouts
592595

593596
self._poll(timeout)

test/test_client_async.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,8 @@ def test_send(cli, conn):
229229
def test_poll(mocker):
230230
metadata = mocker.patch.object(KafkaClient, '_maybe_refresh_metadata')
231231
_poll = mocker.patch.object(KafkaClient, '_poll')
232+
ifrs = mocker.patch.object(KafkaClient, 'in_flight_request_count')
233+
ifrs.return_value = 1
232234
cli = KafkaClient(api_version=(0, 9))
233235

234236
# metadata timeout wins
@@ -245,6 +247,11 @@ def test_poll(mocker):
245247
cli.poll()
246248
_poll.assert_called_with(cli.config['request_timeout_ms'] / 1000.0)
247249

250+
# If no in-flight-requests, drop timeout to retry_backoff_ms
251+
ifrs.return_value = 0
252+
cli.poll()
253+
_poll.assert_called_with(cli.config['retry_backoff_ms'] / 1000.0)
254+
248255

249256
def test__poll():
250257
pass
@@ -300,12 +307,14 @@ def client(mocker):
300307

301308
def test_maybe_refresh_metadata_ttl(mocker, client):
302309
client.cluster.ttl.return_value = 1234
310+
mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1)
303311

304312
client.poll(timeout_ms=12345678)
305313
client._poll.assert_called_with(1.234)
306314

307315

308316
def test_maybe_refresh_metadata_backoff(mocker, client):
317+
mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1)
309318
now = time.time()
310319
t = mocker.patch('time.time')
311320
t.return_value = now
@@ -316,6 +325,7 @@ def test_maybe_refresh_metadata_backoff(mocker, client):
316325

317326
def test_maybe_refresh_metadata_in_progress(mocker, client):
318327
client._metadata_refresh_in_progress = True
328+
mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1)
319329

320330
client.poll(timeout_ms=12345678)
321331
client._poll.assert_called_with(9999.999) # request_timeout_ms
@@ -324,6 +334,7 @@ def test_maybe_refresh_metadata_in_progress(mocker, client):
324334
def test_maybe_refresh_metadata_update(mocker, client):
325335
mocker.patch.object(client, 'least_loaded_node', return_value='foobar')
326336
mocker.patch.object(client, '_can_send_request', return_value=True)
337+
mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1)
327338
send = mocker.patch.object(client, 'send')
328339

329340
client.poll(timeout_ms=12345678)
@@ -338,6 +349,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client):
338349
mocker.patch.object(client, '_can_connect', return_value=True)
339350
mocker.patch.object(client, '_maybe_connect', return_value=True)
340351
mocker.patch.object(client, 'maybe_connect', return_value=True)
352+
mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1)
341353

342354
now = time.time()
343355
t = mocker.patch('time.time')

0 commit comments

Comments
 (0)