Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve exception handling for metrics store errors #1703

Merged
merged 4 commits into from
May 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions esrally/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def __init__(self, client, cluster_version=None):
self._client = client
self.logger = logging.getLogger(__name__)
self._cluster_version = cluster_version
self.retryable_status_codes = [502, 503, 504, 429]

# TODO #1335: Use version-specific support for metrics stores after 7.8.0.
def probe_version(self):
Expand Down Expand Up @@ -171,8 +172,29 @@ def guarded(self, target, *args, **kwargs):
)
self.logger.exception(msg)
raise exceptions.SystemSetupError(msg)
except elasticsearch.helpers.BulkIndexError as e:
for err in e.errors:
err_type = err.get("index", {}).get("error", {}).get("type", None)
if err.get("index", {}).get("status", None) not in self.retryable_status_codes:
msg = f"Unretryable error encountered when sending metrics to remote metrics store: [{err_type}]"
self.logger.exception("%s - Full error(s) [%s]", msg, str(e.errors))
raise exceptions.RallyError(msg)

if execution_count <= max_execution_count:
self.logger.debug(
"Error in sending metrics to remote metrics store [%s] in attempt [%d/%d]. Sleeping for [%f] seconds.",
e,
execution_count,
max_execution_count,
time_to_sleep,
)
time.sleep(time_to_sleep)
else:
msg = f"Failed to send metrics to remote metrics store: [{e.errors}]"
self.logger.exception("%s - Full error(s) [%s]", msg, str(e.errors))
raise exceptions.RallyError(msg)
except ApiError as e:
if e.status_code in (502, 503, 504, 429) and execution_count <= max_execution_count:
if e.status_code in self.retryable_status_codes and execution_count <= max_execution_count:
self.logger.debug(
"%s (code: %d) in attempt [%d/%d]. Sleeping for [%f] seconds.",
e.error,
Expand Down Expand Up @@ -451,9 +473,9 @@ def close(self):
metrics on close (in order to avoid additional latency during the benchmark).
"""
self.logger.info("Closing metrics store.")
self.opened = False
self.flush()
self._clear_meta_info()
self.opened = False

def add_meta_info(self, scope, scope_key, key, value):
"""
Expand Down
111 changes: 109 additions & 2 deletions tests/metrics_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from unittest import mock

import elasticsearch.exceptions
import elasticsearch.helpers
import pytest

from esrally import config, exceptions, metrics, paths, track
Expand Down Expand Up @@ -231,7 +232,59 @@ def raise_error(self):
err = elasticsearch.exceptions.ApiError("unit-test", meta=TestEsClient.ApiResponseMeta(status=self.status_code), body={})
raise err

retriable_errors = [ApiError(429), ApiError(502), ApiError(503), ApiError(504), ConnectionError(), ConnectionTimeout()]
class BulkIndexError:
def __init__(self, errors):
self.errors = errors
self.error_message = f"{len(self.errors)} document(s) failed to index"

def logging_statements(self, retries):
logging_statements = []
for i, v in enumerate(range(retries)):
logging_statements.append(
"Error in sending metrics to remote metrics store [%s] in attempt [%d/%d]. Sleeping for [%f] seconds."
% (
self.error_message,
i + 1,
max_retry,
sleep_slots[v],
)
)
logging_statements.append(
f"Failed to send metrics to remote metrics store: [{self.errors}] - Full error(s) [{self.errors}]"
)
return logging_statements

def raise_error(self):
raise elasticsearch.helpers.BulkIndexError(self.error_message, self.errors)

bulk_index_errors = [
{
"index": {
"_index": "rally-metrics-2023-04",
"_id": "dffAc4cBOnIJ2Omtflwg",
"status": 429,
"error": {
"type": "circuit_breaking_exception",
"reason": "[parent] Data too large, data for [<http_request>] would be [123848638/118.1mb], "
"which is larger than the limit of [123273216/117.5mb], real usage: [120182112/114.6mb], "
"new bytes reserved: [3666526/3.4mb]",
"bytes_wanted": 123848638,
"bytes_limit": 123273216,
"durability": "TRANSIENT",
},
}
},
]

retryable_errors = [
ApiError(429),
ApiError(502),
ApiError(503),
ApiError(504),
ConnectionError(),
ConnectionTimeout(),
BulkIndexError(bulk_index_errors),
]

max_retry = 10

Expand All @@ -247,7 +300,7 @@ def raise_error(self):
exepcted_logger_calls = []
expected_sleep_calls = []

for e in retriable_errors:
for e in retryable_errors:
exepcted_logger_calls += e.logging_statements(max_retry)
expected_sleep_calls += [mock.call(int(sleep_slots[i])) for i in range(0, max_retry)]

Expand Down Expand Up @@ -302,6 +355,60 @@ def raise_unknown_error():
"store on host [127.0.0.1] at port [9243]."
)

def test_raises_rally_error_on_unretryable_bulk_indexing_errors(self):
bulk_index_errors = [
{
"index": {
"_index": "rally-metrics-2023-04",
"_id": "dffAc4cBOnIJ2Omtflwg",
"status": 429,
"error": {
"type": "circuit_breaking_exception",
"reason": "[parent] Data too large, data for [<http_request>] would be [123848638/118.1mb], "
"which is larger than the limit of [123273216/117.5mb], real usage: [120182112/114.6mb], "
"new bytes reserved: [3666526/3.4mb]",
"bytes_wanted": 123848638,
"bytes_limit": 123273216,
"durability": "TRANSIENT",
},
}
},
{
"index": {
"_id": "1",
"_index": "rally-metrics-2023-04",
"error": {"type": "version_conflict_engine_exception"},
"status": 409,
}
},
{
"index": {
"_index": "rally-metrics-2023-04",
"_id": "dffAc4cBOnIJ2Omtflwg",
"status": 400,
"error": {
"type": "mapper_parsing_exception",
"reason": "failed to parse field [meta.error-description] of type [keyword] in document with id "
"'dffAc4cBOnIJ2Omtflwg'. Preview of field's value: 'HTTP status: 400, message: failed to parse "
"field [@timestamp] of type [date] in document with id '-PXAc4cBOnIJ2OmtX33J'. Preview of "
"field's value: '1998-04-30T15:02:56-05:00'",
},
}
},
]

def raise_bulk_index_error():
err = elasticsearch.helpers.BulkIndexError(f"{len(bulk_index_errors)} document(s) failed to index", bulk_index_errors)
raise err

client = metrics.EsClient(self.ClientMock([{"host": "127.0.0.1", "port": "9243"}]))

with pytest.raises(
exceptions.RallyError,
match=(r"Unretryable error encountered when sending metrics to remote metrics store: \[version_conflict_engine_exception\]"),
):
client.guarded(raise_bulk_index_error)


class TestEsMetrics:
RACE_TIMESTAMP = datetime.datetime(2016, 1, 31)
Expand Down