Skip to content

Commit

Permalink
Send heartbeat frames more often
Browse files Browse the repository at this point in the history
The AMQP protocol is saying that we should send "two"
"heartbeat frames" during the "heartbeat timeout" (see [1] and rabbit
implement this in [2]).

The "two" value is the "rate" parameter in the current implementation.

The current implementation was sending only one frame during the
"heartbeat timeout", which is wrong.

[1] https://www.amqp.org/specification/0-9-1/amqp-org-download
[2] https://www.rabbitmq.com/heartbeats.html#heartbeats-interval

Signed-off-by: Arnaud Morin <arnaud.morin@ovhcloud.com>
  • Loading branch information
arnaudmorin authored and auvipy committed Jun 14, 2023
1 parent ab1edf9 commit a92dd03
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 3 deletions.
9 changes: 7 additions & 2 deletions amqp/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -724,13 +724,18 @@ def heartbeat_tick(self, rate=2):
once per second.
Keyword Arguments:
rate (int): Previously used, but ignored now.
rate (int): Number of heartbeat frames to send during the heartbeat
timeout
"""
AMQP_HEARTBEAT_LOGGER.debug('heartbeat_tick : for connection %s',
self._connection_id)
if not self.heartbeat:
return

# If rate is wrong, let's use 2 as default
if rate <= 0:
rate = 2

# treat actual data exchange in either direction as a heartbeat
sent_now = self.bytes_sent
recv_now = self.bytes_recv
Expand All @@ -755,7 +760,7 @@ def heartbeat_tick(self, rate=2):
self.prev_sent, self.prev_recv = sent_now, recv_now

# send a heartbeat if it's time to do so
if now > self.last_heartbeat_sent + self.heartbeat:
if now > self.last_heartbeat_sent + self.heartbeat / rate:
AMQP_HEARTBEAT_LOGGER.debug(
'heartbeat_tick: sending heartbeat for connection %s',
self._connection_id)
Expand Down
34 changes: 33 additions & 1 deletion t/unit/test_connection.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import re
import socket
import warnings
from array import array
from unittest.mock import Mock, call, patch

import pytest
import time

from amqp import Connection, spec
from amqp.connection import SSLError
Expand Down Expand Up @@ -514,6 +514,38 @@ def test_heartbeat_tick(self):
with pytest.raises(ConnectionError):
self.conn.heartbeat_tick()

def _test_heartbeat_rate_tick(self, rate):
# Doing 22 calls,
# First one is setting the variables
# All nexts may send heartbeats, depending on rate
for i in range(1, 22):
self.conn.heartbeat_tick(rate)
time.sleep(0.1)

def test_heartbeat_check_rate_default(self):
# Heartbeat set to 2 secs
self.conn.heartbeat = 2
# Default rate is 2 --> should send frames every sec
self._test_heartbeat_rate_tick(2)
# Verify that we wrote 2 frames
assert self.conn.frame_writer.call_count == 2

def test_heartbeat_check_rate_four(self):
# Heartbeat set to 2 secs
self.conn.heartbeat = 2
# Rate 4 --> should send frames every 0.5sec
self._test_heartbeat_rate_tick(4)
# Verify that we wrote 4 frames
assert self.conn.frame_writer.call_count == 4

def test_heartbeat_check_rate_wrong(self):
# Heartbeat set to 2 secs
self.conn.heartbeat = 2
# Default rate is 2 --> should send frames every sec
self._test_heartbeat_rate_tick(-42)
# Verify that we wrote 2 frames
assert self.conn.frame_writer.call_count == 2

def test_server_capabilities(self):
self.conn.server_properties['capabilities'] = {'foo': 1}
assert self.conn.server_capabilities == {'foo': 1}
Expand Down

0 comments on commit a92dd03

Please sign in to comment.