Skip to content

Commit b012a97

Browse files
authored
[BACKPORT] Send TCP metrics for bytes sent and received to server (#500)
* Send TCP metrics for bytes sent and received to server It would be good to send `tcp.bytesReceived` and `tcp.bytesSend` metrics in the metrics blob to the server, so that, someone can track how much traffic is passing between the server and the client. I have manually tested the stats exported to Prometheus and verified that these two new stats are displayed there correctly. * add v5 to branch filter in test runner
1 parent f1db480 commit b012a97

File tree

3 files changed

+65
-12
lines changed

3 files changed

+65
-12
lines changed

hazelcast/reactor.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,8 @@ def __init__(self):
344344
loop.shutdown()
345345
loop = _BasicLoop(self.map)
346346
self._loop = loop
347+
self.bytes_sent = 0
348+
self.bytes_received = 0
347349

348350
def start(self):
349351
self._loop.start()
@@ -421,9 +423,11 @@ def handle_read(self):
421423
try:
422424
while True:
423425
data = self.recv(receive_buffer_size)
426+
bytes_received = len(data)
427+
self._reactor.bytes_received += bytes_received
424428
reader.read(data)
425429
self.last_read_time = time.time()
426-
if len(data) < receive_buffer_size:
430+
if bytes_received < receive_buffer_size:
427431
break
428432
except socket.error as err:
429433
if err.args[0] not in _RETRYABLE_ERROR_CODES:
@@ -461,7 +465,7 @@ def handle_write(self):
461465
buf.truncate(0)
462466

463467
try:
464-
sent = self.send(bytes_)
468+
bytes_sent = self.send(bytes_)
465469
except socket.error as err:
466470
if err.args[0] in _RETRYABLE_ERROR_CODES:
467471
# Couldn't write the bytes but we should
@@ -474,9 +478,9 @@ def handle_write(self):
474478
# No exception is thrown during the send
475479
self.last_write_time = time.time()
476480
self.sent_protocol_bytes = True
477-
478-
if sent < len(bytes_):
479-
write_queue.appendleft(bytes_[sent:])
481+
self._reactor.bytes_sent += bytes_sent
482+
if bytes_sent < len(bytes_):
483+
write_queue.appendleft(bytes_[bytes_sent:])
480484

481485
def handle_close(self):
482486
_logger.warning("Connection closed by server")

hazelcast/statistics.py

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
_NEAR_CACHE_DESCRIPTOR_PREFIX = "nearcache"
2626
_NEAR_CACHE_DESCRIPTOR_DISCRIMINATOR = "name"
2727

28+
_TCP_METRICS_PREFIX = "tcp"
29+
2830

2931
class Statistics(object):
3032
def __init__(
@@ -130,7 +132,7 @@ def _register_system_gauge(self, gauge_name, gauge_fn, value_type=ValueType.LONG
130132
self._registered_system_gauges[gauge_name] = (gauge_fn, value_type)
131133
except Exception as e:
132134
_logger.debug(
133-
"Unable to register the system related gauge %s. Error: %s" % (gauge_name, e)
135+
"Unable to register the system related gauge %s. Error: %s", gauge_name, e
134136
)
135137

136138
def _register_process_gauge(self, gauge_name, gauge_fn, value_type=ValueType.LONG):
@@ -141,7 +143,7 @@ def _register_process_gauge(self, gauge_name, gauge_fn, value_type=ValueType.LON
141143
self._registered_process_gauges[gauge_name] = (gauge_fn, value_type)
142144
except Exception as e:
143145
_logger.debug(
144-
"Unable to register the process related gauge %s. Error: %s" % (gauge_name, e)
146+
"Unable to register the process related gauge %s. Error: %s", gauge_name, e
145147
)
146148

147149
def _collect_and_send_stats(self):
@@ -157,6 +159,7 @@ def _collect_and_send_stats(self):
157159
self._add_client_attributes(attributes, connection)
158160
self._add_near_cache_metrics(attributes, compressor)
159161
self._add_system_and_process_metrics(attributes, compressor)
162+
self._add_tcp_metrics(compressor)
160163
self._send_stats(
161164
collection_timestamp, "".join(attributes), compressor.generate_blob(), connection
162165
)
@@ -180,7 +183,7 @@ def _add_system_and_process_metrics(self, attributes, compressor):
180183
attributes, compressor, gauge_name, value, value_type
181184
)
182185
except:
183-
_logger.exception("Error while collecting '%s'." % gauge_name)
186+
_logger.exception("Error while collecting '%s'.", gauge_name)
184187

185188
if not self._registered_process_gauges:
186189
# Do not create the process object if no process-related
@@ -195,7 +198,7 @@ def _add_system_and_process_metrics(self, attributes, compressor):
195198
attributes, compressor, gauge_name, value, value_type
196199
)
197200
except:
198-
_logger.exception("Error while collecting '%s'." % gauge_name)
201+
_logger.exception("Error while collecting '%s'.", gauge_name)
199202

200203
def _add_system_or_process_metric(self, attributes, compressor, gauge_name, value, value_type):
201204
# We don't have any metrics that do not have prefix.
@@ -342,9 +345,26 @@ def _add_near_cache_metric(
342345
self._add_attribute(attributes, metric, value, nc_name_with_prefix)
343346
except:
344347
_logger.exception(
345-
"Error while collecting %s metric for near cache '%s'" % (metric, nc_name)
348+
"Error while collecting %s metric for near cache '%s'.", metric, nc_name
346349
)
347350

351+
def _add_tcp_metrics(self, compressor):
352+
self._add_tcp_metric(compressor, "bytesSend", self._reactor.bytes_sent)
353+
self._add_tcp_metric(compressor, "bytesReceived", self._reactor.bytes_received)
354+
355+
def _add_tcp_metric(
356+
self, compressor, metric, value, value_type=ValueType.LONG, unit=ProbeUnit.BYTES
357+
):
358+
descriptor = MetricDescriptor(
359+
metric=metric,
360+
prefix=_TCP_METRICS_PREFIX,
361+
unit=unit,
362+
)
363+
try:
364+
self._add_metric(compressor, descriptor, value, value_type)
365+
except:
366+
_logger.exception("Error while collecting '%s.%s'.", _TCP_METRICS_PREFIX, metric)
367+
348368
def _add_metric(self, compressor, descriptor, value, value_type):
349369
if value_type == ValueType.LONG:
350370
compressor.add_long(descriptor, value)

tests/integration/backward_compatible/client_test.py

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import time
22

3-
from tests.base import HazelcastTestCase
3+
from tests.base import HazelcastTestCase, SingleMemberTestCase
44
from hazelcast.client import HazelcastClient
55
from hazelcast.lifecycle import LifecycleState
66
from tests.hzrc.ttypes import Lang
7-
from tests.util import get_current_timestamp
7+
from tests.util import get_current_timestamp, random_string
88

99

1010
class ClientTest(HazelcastTestCase):
@@ -110,3 +110,32 @@ def get_labels_from_member(self, client_uuid):
110110
client_uuid
111111
)
112112
return self.rc.executeOnController(self.cluster.id, script, Lang.JAVASCRIPT).result
113+
114+
115+
class ClientTcpMetricsTest(SingleMemberTestCase):
116+
@classmethod
117+
def configure_client(cls, config):
118+
config["cluster_name"] = cls.cluster.id
119+
return config
120+
121+
def test_bytes_received(self):
122+
reactor = self.client._reactor
123+
124+
bytes_received = reactor.bytes_received
125+
self.assertGreater(bytes_received, 0)
126+
127+
m = self.client.get_map(random_string()).blocking()
128+
m.get(random_string())
129+
130+
self.assertGreater(reactor.bytes_received, bytes_received)
131+
132+
def test_bytes_sent(self):
133+
reactor = self.client._reactor
134+
135+
bytes_sent = reactor.bytes_sent
136+
self.assertGreater(bytes_sent, 0)
137+
138+
m = self.client.get_map(random_string()).blocking()
139+
m.set(random_string(), random_string())
140+
141+
self.assertGreater(reactor.bytes_sent, bytes_sent)

0 commit comments

Comments
 (0)