Skip to content

Commit 3c2babf

Browse files
committed
Send TCP metrics for bytes sent and received to server
It would be good to send `tcp.bytesReceived` and `tcp.bytesSend` metrics in the metrics blob to the server, so that, someone can track how much traffic is passing between the server and the client. I have manually tested the stats exported to Prometheus and verified that these two new stats are displayed there correctly.
1 parent 6e7b244 commit 3c2babf

File tree

3 files changed

+66
-12
lines changed

3 files changed

+66
-12
lines changed

hazelcast/reactor.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,8 @@ def __init__(self):
344344
loop.shutdown()
345345
loop = _BasicLoop(self.map)
346346
self._loop = loop
347+
self.bytes_sent = 0
348+
self.bytes_received = 0
347349

348350
def start(self):
349351
self._loop.start()
@@ -421,9 +423,11 @@ def handle_read(self):
421423
try:
422424
while True:
423425
data = self.recv(receive_buffer_size)
426+
bytes_received = len(data)
427+
self._reactor.bytes_received += bytes_received
424428
reader.read(data)
425429
self.last_read_time = time.time()
426-
if len(data) < receive_buffer_size:
430+
if bytes_received < receive_buffer_size:
427431
break
428432
except socket.error as err:
429433
if err.args[0] not in _RETRYABLE_ERROR_CODES:
@@ -461,7 +465,7 @@ def handle_write(self):
461465
buf.truncate(0)
462466

463467
try:
464-
sent = self.send(bytes_)
468+
bytes_sent = self.send(bytes_)
465469
except socket.error as err:
466470
if err.args[0] in _RETRYABLE_ERROR_CODES:
467471
# Couldn't write the bytes but we should
@@ -474,9 +478,9 @@ def handle_write(self):
474478
# No exception is thrown during the send
475479
self.last_write_time = time.time()
476480
self.sent_protocol_bytes = True
477-
478-
if sent < len(bytes_):
479-
write_queue.appendleft(bytes_[sent:])
481+
self._reactor.bytes_sent += bytes_sent
482+
if bytes_sent < len(bytes_):
483+
write_queue.appendleft(bytes_[bytes_sent:])
480484

481485
def handle_close(self):
482486
_logger.warning("Connection closed by server")

hazelcast/statistics.py

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
_NEAR_CACHE_DESCRIPTOR_PREFIX = "nearcache"
2626
_NEAR_CACHE_DESCRIPTOR_DISCRIMINATOR = "name"
2727

28+
_TCP_METRICS_PREFIX = "tcp"
29+
2830

2931
class Statistics(object):
3032
def __init__(
@@ -130,7 +132,7 @@ def _register_system_gauge(self, gauge_name, gauge_fn, value_type=ValueType.LONG
130132
self._registered_system_gauges[gauge_name] = (gauge_fn, value_type)
131133
except Exception as e:
132134
_logger.debug(
133-
"Unable to register the system related gauge %s. Error: %s" % (gauge_name, e)
135+
"Unable to register the system related gauge %s. Error: %s", gauge_name, e
134136
)
135137

136138
def _register_process_gauge(self, gauge_name, gauge_fn, value_type=ValueType.LONG):
@@ -141,7 +143,7 @@ def _register_process_gauge(self, gauge_name, gauge_fn, value_type=ValueType.LON
141143
self._registered_process_gauges[gauge_name] = (gauge_fn, value_type)
142144
except Exception as e:
143145
_logger.debug(
144-
"Unable to register the process related gauge %s. Error: %s" % (gauge_name, e)
146+
"Unable to register the process related gauge %s. Error: %s", gauge_name, e
145147
)
146148

147149
def _collect_and_send_stats(self):
@@ -157,6 +159,7 @@ def _collect_and_send_stats(self):
157159
self._add_client_attributes(attributes, connection)
158160
self._add_near_cache_metrics(attributes, compressor)
159161
self._add_system_and_process_metrics(attributes, compressor)
162+
self._add_tcp_metrics(compressor)
160163
self._send_stats(
161164
collection_timestamp, "".join(attributes), compressor.generate_blob(), connection
162165
)
@@ -180,7 +183,7 @@ def _add_system_and_process_metrics(self, attributes, compressor):
180183
attributes, compressor, gauge_name, value, value_type
181184
)
182185
except:
183-
_logger.exception("Error while collecting '%s'." % gauge_name)
186+
_logger.exception("Error while collecting '%s'.", gauge_name)
184187

185188
if not self._registered_process_gauges:
186189
# Do not create the process object if no process-related
@@ -195,7 +198,7 @@ def _add_system_and_process_metrics(self, attributes, compressor):
195198
attributes, compressor, gauge_name, value, value_type
196199
)
197200
except:
198-
_logger.exception("Error while collecting '%s'." % gauge_name)
201+
_logger.exception("Error while collecting '%s'.", gauge_name)
199202

200203
def _add_system_or_process_metric(self, attributes, compressor, gauge_name, value, value_type):
201204
# We don't have any metrics that do not have prefix.
@@ -342,9 +345,26 @@ def _add_near_cache_metric(
342345
self._add_attribute(attributes, metric, value, nc_name_with_prefix)
343346
except:
344347
_logger.exception(
345-
"Error while collecting %s metric for near cache '%s'" % (metric, nc_name)
348+
"Error while collecting %s metric for near cache '%s'.", metric, nc_name
346349
)
347350

351+
def _add_tcp_metrics(self, compressor):
352+
self._add_tcp_metric(compressor, "bytesSend", self._reactor.bytes_sent)
353+
self._add_tcp_metric(compressor, "bytesReceived", self._reactor.bytes_received)
354+
355+
def _add_tcp_metric(
356+
self, compressor, metric, value, value_type=ValueType.LONG, unit=ProbeUnit.BYTES
357+
):
358+
descriptor = MetricDescriptor(
359+
metric=metric,
360+
prefix=_TCP_METRICS_PREFIX,
361+
unit=unit,
362+
)
363+
try:
364+
self._add_metric(compressor, descriptor, value, value_type)
365+
except:
366+
_logger.exception("Error while collecting '%s.%s'.", _TCP_METRICS_PREFIX, metric)
367+
348368
def _add_metric(self, compressor, descriptor, value, value_type):
349369
if value_type == ValueType.LONG:
350370
compressor.add_long(descriptor, value)

tests/integration/backward_compatible/client_test.py

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import time
2+
import unittest
23

3-
from tests.base import HazelcastTestCase
4+
from tests.base import HazelcastTestCase, SingleMemberTestCase
45
from hazelcast.client import HazelcastClient
56
from hazelcast.lifecycle import LifecycleState
67
from tests.hzrc.ttypes import Lang
7-
from tests.util import get_current_timestamp
8+
from tests.util import get_current_timestamp, random_string
89

910

1011
class ClientTest(HazelcastTestCase):
@@ -110,3 +111,32 @@ def get_labels_from_member(self, client_uuid):
110111
client_uuid
111112
)
112113
return self.rc.executeOnController(self.cluster.id, script, Lang.JAVASCRIPT).result
114+
115+
116+
class ClientTcpMetricsTest(SingleMemberTestCase):
117+
@classmethod
118+
def configure_client(cls, config):
119+
config["cluster_name"] = cls.cluster.id
120+
return config
121+
122+
def test_bytes_received(self):
123+
reactor = self.client._reactor
124+
125+
bytes_received = reactor.bytes_received
126+
self.assertGreater(bytes_received, 0)
127+
128+
m = self.client.get_map(random_string()).blocking()
129+
m.get(random_string())
130+
131+
self.assertGreater(reactor.bytes_received, bytes_received)
132+
133+
def test_bytes_sent(self):
134+
reactor = self.client._reactor
135+
136+
bytes_sent = reactor.bytes_sent
137+
self.assertGreater(bytes_sent, 0)
138+
139+
m = self.client.get_map(random_string()).blocking()
140+
m.set(random_string(), random_string())
141+
142+
self.assertGreater(reactor.bytes_sent, bytes_sent)

0 commit comments

Comments
 (0)