Skip to content

Commit

Permalink
Add blob-store-stats telemetry device and tests (#1755)
Browse files Browse the repository at this point in the history
This commit adds a new blob-store-stats telemetry device that based on 
the sample rate collects per-cluster, and per-node blob store statistics 
as they are reported by the respective API.
  • Loading branch information
b-deam authored Aug 8, 2023
1 parent facf127 commit 7a8ae06
Show file tree
Hide file tree
Showing 4 changed files with 345 additions and 1 deletion.
66 changes: 66 additions & 0 deletions docs/telemetry.rst
Original file line number Diff line number Diff line change
Expand Up @@ -328,3 +328,69 @@ It also works with ``esrally compare``::
.. note::

This telemetry device has no runtime overhead. It does all of it's work after the race is complete.

blob-store-stats
----------------

The blob-store-stats telemetry device regularly calls the blob store stats API and records one metrics document for cluster level stats (``_all``), and one metrics document per node.

Supported telemetry parameters:

* ``blob-store-stats-sample-interval`` (default 1): A positive number greater than zero denoting the sampling interval in seconds.

Example of recorded documents given two nodes in the target cluster::


{
"name": "blob-store-stats",
"node": "_all",
"meta": {
"cluster": "es",
"_nodes": {
"total": 2,
"successful": 2,
"failed": 0
}
},
"ListObjects": 30,
"PutMultipartObject": 0,
"PutObject": 0,
"GetObject": 2334
},
{
"name": "blob-store-stats",
"node": "OkuSgfZWSq2fprKXD6CNOw",
"meta": {
"cluster": "es",
"_nodes": {
"total": 2,
"successful": 2,
"failed": 0
}
},
"ListObjects": 30,
"PutMultipartObject": 0,
"PutObject": 0,
"GetObject": 1167
},
{
"name": "blob-store-stats",
"node": "ufg1tLOiTIiHkmgGiztW9Q",
"meta": {
"cluster": "es",
"_nodes": {
"total": 2,
"successful": 2,
"failed": 0
},
"ListObjects": 0,
"PutMultipartObject": 0,
"PutObject": 0,
"GetObject": 1167
}
}


.. note::

This telemetry device is only applicable to `Stateless Elasticsearch <https://www.elastic.co/blog/stateless-your-new-state-of-find-with-elasticsearch>`_ and requires elevated privleges only available to Elastic developers.
1 change: 1 addition & 0 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,7 @@ def prepare_telemetry(self, es, enable, index_names, data_stream_names, build_ha
telemetry.DataStreamStats(telemetry_params, es, self.metrics_store),
telemetry.IngestPipelineStats(es, self.metrics_store),
telemetry.DiskUsageStats(telemetry_params, es_default, self.metrics_store, index_names, data_stream_names),
telemetry.BlobStoreStats(telemetry_params, es, self.metrics_store),
]
else:
devices = []
Expand Down
140 changes: 140 additions & 0 deletions esrally/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -2363,3 +2363,143 @@ def handle_telemetry_usage(self, response):
knn_vectors = field_info.get("knn_vectors_in_bytes", 0)
if knn_vectors > 0:
self.metrics_store.put_value_cluster_level("disk_usage_knn_vectors", knn_vectors, meta_data=meta, unit="byte")


class BlobStoreStats(TelemetryDevice):
internal = False
command = "blob-store-stats"
human_name = "Blob Store Stats"
help = "Regularly samples blob store stats, only applicable to serverless Elasticsearch"

"""
Gathers blob snapshots stats on both a cluster and node level
"""

def __init__(self, telemetry_params, clients, metrics_store):
"""
:param telemetry_params: The configuration object for telemetry_params.
``blob-store-stats-sample-interval``: positive integer controlling the sampling interval.
Default: 1 second.
:param clients: A dict of clients to all clusters.
:param metrics_store: The configured metrics store we write to.
"""
super().__init__()

self.telemetry_params = telemetry_params
self.clients = clients
self.sample_interval = telemetry_params.get("blob-store-stats-sample-interval", 1)
if self.sample_interval <= 0:
raise exceptions.SystemSetupError(
f"The telemetry parameter 'blob-store-stats-sample-interval' must be greater than zero but was {self.sample_interval}."
)
self.specified_cluster_names = self.clients.keys()
self.metrics_store = metrics_store
self.samplers = []

def __str__(self):
return "blob-store-stats"

def on_benchmark_start(self):
for cluster_name in self.specified_cluster_names:
if not self.clients[cluster_name].is_serverless:
self.logger.warning(
"Cannot attach telemetry device [%s] to cluster [%s], [%s] is only supported with serverless Elasticsearch",
self,
cluster_name,
self,
)
continue

self.logger.debug("Gathering [%s] for [%s]", self, cluster_name)
recorder = BlobStoreStatsRecorder(
cluster_name,
self.clients[cluster_name],
self.metrics_store,
self.sample_interval,
)
sampler = SamplerThread(recorder)
self.samplers.append(sampler)
sampler.daemon = True
# we don't require starting recorders precisely at the same time
sampler.start()

def on_benchmark_stop(self):
if self.samplers:
for sampler in self.samplers:
sampler.finish()


class BlobStoreStatsRecorder:
"""
Collects and pushes blob store stats for the specified cluster to the metric store.
"""

def __init__(self, cluster_name, client, metrics_store, sample_interval):
"""
:param cluster_name: The cluster_name that the client connects to, as specified in target.hosts. This may differ
from the actual cluster name deployed.
:param client: The Elasticsearch client for this cluster.
:param metrics_store: The configured metrics store we write to.
:param sample_interval: integer controlling the interval, in seconds, between collecting samples.
"""

self.rally_cluster_name = cluster_name
self.client = client
self.metrics_store = metrics_store
self.sample_interval = sample_interval
self.logger = logging.getLogger(__name__)

def __str__(self):
return "blob-store-stats"

def record(self):
"""
Collect blob store stats at a per cluster and node level and push to metrics store.
"""
# pylint: disable=import-outside-toplevel
import elasticsearch

try:
stats_api_endpoint = "/_internal/blob_store/stats"
stats = self.client.perform_request(method="GET", path=stats_api_endpoint, params={})
except elasticsearch.ApiError as e:
msg = f"An API error [{e}] occurred while collecting [{self}] on cluster [{self.rally_cluster_name}]"
self.logger.error(msg)
return
except elasticsearch.TransportError as e:
msg = f"A transport error [{e}] occurred while collecting [{self}] on cluster [{self.rally_cluster_name}]"
self.logger.error(msg)
return

self._push_stats(stats)

def _push_stats(self, stats):
stats_meta_data = {key: value for key, value in stats.items() if key == "_nodes"}
meta_data = {"cluster": stats.get("cluster_name", self.rally_cluster_name), **stats_meta_data}

if cluster_stats := self._extract_blob_store_stats(stats.get("_all")):
self.metrics_store.put_doc(
{
"name": "blob-store-stats",
"node": "_all",
**cluster_stats,
},
level=MetaInfoScope.cluster,
meta_data=meta_data,
)

for node_id, node_stats in stats.get("nodes", {}).items():
if ns := self._extract_blob_store_stats(node_stats):
self.metrics_store.put_doc(
{
"name": "blob-store-stats",
"node": node_id,
**ns,
},
level=MetaInfoScope.node,
node_name=node_id,
meta_data=meta_data,
)

def _extract_blob_store_stats(self, stats):
return stats.get("object_store_stats", {})
139 changes: 138 additions & 1 deletion tests/telemetry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import logging
import random
from collections import namedtuple
from dataclasses import dataclass
from unittest import mock
from unittest.mock import call

Expand Down Expand Up @@ -150,14 +151,15 @@ def test_store_calculated_metrics(self, metrics_store_put_value, stop_watch):


class Client:
def __init__(self, *, nodes=None, info=None, indices=None, transform=None, cluster=None, transport_client=None):
def __init__(self, *, nodes=None, info=None, indices=None, transform=None, cluster=None, transport_client=None, is_serverless=None):
self.nodes = nodes
self._info = wrap(info)
self.indices = indices
self.transform = transform
self.cluster = cluster
if transport_client:
self.transport = transport_client
self.is_serverless = is_serverless

def info(self):
return self._info()
Expand Down Expand Up @@ -206,6 +208,22 @@ def __call__(self, *args, **kwargs):
return self.response


class ApiErrorSupplier:
@dataclass
class ApiResponseMeta:
status: int

def __call__(self, status=None, body=None, message=None):
return elasticsearch.ApiError(
meta=self.ApiResponseMeta(status=status),
body=body,
message=message,
)


raiseApiError = ApiErrorSupplier()


class TransportErrorSupplier:
def __call__(self, *args, **kwargs):
raise elasticsearch.TransportError
Expand Down Expand Up @@ -4937,3 +4955,122 @@ def test_indexed_vector(self, es, metrics_store_cluster_level):

def _mock_store(self, name, size, field):
return mock.call(name, size, meta_data={"index": "foo", "field": field}, unit="byte")


class TestBlobStoreStats:
def test_negative_sample_interval_forbidden(self):
cfg = create_config()
metrics_store = metrics.EsMetricsStore(cfg)
telemetry_params = {"blob-store-stats-sample-interval": -1}
with pytest.raises(
exceptions.SystemSetupError,
match=r"The telemetry parameter 'blob-store-stats-sample-interval' must be greater than zero but was .*\.",
):
telemetry.BlobStoreStats(telemetry_params, Client(), metrics_store)

def test_non_serverless_is_skipped_and_serverless_is_not_skipped(self, caplog):
caplog.set_level(logging.DEBUG)
cfg = create_config()
metrics_store = metrics.EsMetricsStore(cfg)
telemetry_params = {"blob-store-stats-sample-interval": 1}

clients = {
"default": Client(is_serverless=False),
"cluster_b": Client(is_serverless=True),
}
t = telemetry.BlobStoreStats(telemetry_params, clients, metrics_store)
t.on_benchmark_start()

assert (
"Cannot attach telemetry device [blob-store-stats] to cluster [default],"
" [blob-store-stats] is only supported with serverless Elasticsearch" in caplog.text
)

assert "Gathering [blob-store-stats] for [cluster_b]" in caplog.text


class TestBlobStoreStatsRecorder:
blob_store_stats_response = {
"_nodes": {"total": 2, "successful": 2, "failed": 0},
"cluster_name": "es",
"_all": {"object_store_stats": {"ListObjects": 5, "PutMultipartObject": 3, "PutObject": 161, "GetObject": 54}},
"nodes": {
"xwc71ug5QtOYWrEkNiVgYw": {"object_store_stats": {"ListObjects": 1, "PutMultipartObject": 0, "PutObject": 0, "GetObject": 5}},
"qRu2kq0_RnyVn-xmLIN5ZA": {
"object_store_stats": {"ListObjects": 4, "PutMultipartObject": 3, "PutObject": 161, "GetObject": 49}
},
},
}

@mock.patch("esrally.metrics.EsMetricsStore.put_doc")
def test_store_blob_store_stats(self, metrics_store_put_doc):
client = Client(transport_client=TransportClient(response=self.blob_store_stats_response))
cfg = create_config()
metrics_store = metrics.EsMetricsStore(cfg)
recorder = telemetry.BlobStoreStatsRecorder(cluster_name="default", client=client, metrics_store=metrics_store, sample_interval=1)
recorder.record()

metrics_store_put_doc.assert_has_calls(
[
mock.call(
{
"name": "blob-store-stats",
"node": "_all",
"ListObjects": 5,
"PutMultipartObject": 3,
"PutObject": 161,
"GetObject": 54,
},
level=MetaInfoScope.cluster,
meta_data={"cluster": "es", "_nodes": {"total": 2, "successful": 2, "failed": 0}},
),
mock.call(
{
"name": "blob-store-stats",
"node": "xwc71ug5QtOYWrEkNiVgYw",
"ListObjects": 1,
"PutMultipartObject": 0,
"PutObject": 0,
"GetObject": 5,
},
level=MetaInfoScope.node,
node_name="xwc71ug5QtOYWrEkNiVgYw",
meta_data={"cluster": "es", "_nodes": {"total": 2, "successful": 2, "failed": 0}},
),
mock.call(
{
"name": "blob-store-stats",
"node": "qRu2kq0_RnyVn-xmLIN5ZA",
"ListObjects": 4,
"PutMultipartObject": 3,
"PutObject": 161,
"GetObject": 49,
},
level=MetaInfoScope.node,
node_name="qRu2kq0_RnyVn-xmLIN5ZA",
meta_data={"cluster": "es", "_nodes": {"total": 2, "successful": 2, "failed": 0}},
),
],
)

@mock.patch("esrally.metrics.EsMetricsStore.put_doc")
def test_raises_exception_on_api_error(self, metrics_store_put_doc, caplog):
client = Client(transport_client=TransportClient(error=raiseApiError(status=429, body={}, message="api error"), force_error=True))
cfg = create_config()
metrics_store = metrics.EsMetricsStore(cfg)

recorder = telemetry.BlobStoreStatsRecorder(cluster_name="default", client=client, metrics_store=metrics_store, sample_interval=1)
recorder.record()
assert "An API error [ApiError(429, 'api error')] occurred while collecting [blob-store-stats] on cluster [default]" in caplog.text
assert metrics_store_put_doc.call_count == 0

@mock.patch("esrally.metrics.EsMetricsStore.put_doc")
def test_raises_exception_on_transport_error(self, metrics_store_put_doc, caplog):
client = Client(transport_client=TransportClient(response={}, force_error=True))
cfg = create_config()
metrics_store = metrics.EsMetricsStore(cfg)

recorder = telemetry.BlobStoreStatsRecorder(cluster_name="default", client=client, metrics_store=metrics_store, sample_interval=1)
recorder.record()
assert "A transport error [transport error] occurred while collecting [blob-store-stats] on cluster [default]" in caplog.text
assert metrics_store_put_doc.call_count == 0

0 comments on commit 7a8ae06

Please sign in to comment.