From a92dd037712b5b7b1622f4f9d83157d095c90910 Mon Sep 17 00:00:00 2001 From: Arnaud Morin Date: Tue, 28 Feb 2023 11:42:31 +0100 Subject: [PATCH] Send heartbeat frames more often The AMQP protocol is saying that we should send "two" "heartbeat frames" during the "heartbeat timeout" (see [1] and rabbit implement this in [2]). The "two" value is the "rate" parameter in the current implementation. The current implementation was sending only one frame during the "heartbeat timeout", which is wrong. [1] https://www.amqp.org/specification/0-9-1/amqp-org-download [2] https://www.rabbitmq.com/heartbeats.html#heartbeats-interval Signed-off-by: Arnaud Morin --- amqp/connection.py | 9 +++++++-- t/unit/test_connection.py | 34 +++++++++++++++++++++++++++++++++- 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/amqp/connection.py b/amqp/connection.py index fe594013..07a817d6 100644 --- a/amqp/connection.py +++ b/amqp/connection.py @@ -724,13 +724,18 @@ def heartbeat_tick(self, rate=2): once per second. Keyword Arguments: - rate (int): Previously used, but ignored now. + rate (int): Number of heartbeat frames to send during the heartbeat + timeout """ AMQP_HEARTBEAT_LOGGER.debug('heartbeat_tick : for connection %s', self._connection_id) if not self.heartbeat: return + # If rate is wrong, let's use 2 as default + if rate <= 0: + rate = 2 + # treat actual data exchange in either direction as a heartbeat sent_now = self.bytes_sent recv_now = self.bytes_recv @@ -755,7 +760,7 @@ def heartbeat_tick(self, rate=2): self.prev_sent, self.prev_recv = sent_now, recv_now # send a heartbeat if it's time to do so - if now > self.last_heartbeat_sent + self.heartbeat: + if now > self.last_heartbeat_sent + self.heartbeat / rate: AMQP_HEARTBEAT_LOGGER.debug( 'heartbeat_tick: sending heartbeat for connection %s', self._connection_id) diff --git a/t/unit/test_connection.py b/t/unit/test_connection.py index 13eb0b68..46efe6fc 100644 --- a/t/unit/test_connection.py +++ b/t/unit/test_connection.py @@ -1,10 +1,10 @@ import re import socket import warnings -from array import array from unittest.mock import Mock, call, patch import pytest +import time from amqp import Connection, spec from amqp.connection import SSLError @@ -514,6 +514,38 @@ def test_heartbeat_tick(self): with pytest.raises(ConnectionError): self.conn.heartbeat_tick() + def _test_heartbeat_rate_tick(self, rate): + # Doing 22 calls, + # First one is setting the variables + # All nexts may send heartbeats, depending on rate + for i in range(1, 22): + self.conn.heartbeat_tick(rate) + time.sleep(0.1) + + def test_heartbeat_check_rate_default(self): + # Heartbeat set to 2 secs + self.conn.heartbeat = 2 + # Default rate is 2 --> should send frames every sec + self._test_heartbeat_rate_tick(2) + # Verify that we wrote 2 frames + assert self.conn.frame_writer.call_count == 2 + + def test_heartbeat_check_rate_four(self): + # Heartbeat set to 2 secs + self.conn.heartbeat = 2 + # Rate 4 --> should send frames every 0.5sec + self._test_heartbeat_rate_tick(4) + # Verify that we wrote 4 frames + assert self.conn.frame_writer.call_count == 4 + + def test_heartbeat_check_rate_wrong(self): + # Heartbeat set to 2 secs + self.conn.heartbeat = 2 + # Default rate is 2 --> should send frames every sec + self._test_heartbeat_rate_tick(-42) + # Verify that we wrote 2 frames + assert self.conn.frame_writer.call_count == 2 + def test_server_capabilities(self): self.conn.server_properties['capabilities'] = {'foo': 1} assert self.conn.server_capabilities == {'foo': 1}