Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 49 additions & 7 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,6 @@ async def __call__(self, es, params):
"""
detailed_results = params.get("detailed-results", False)
api_kwargs = self._default_kw_params(params)

bulk_params = {}
if "timeout" in params:
bulk_params["timeout"] = params["timeout"]
Expand Down Expand Up @@ -533,8 +532,31 @@ async def __call__(self, es, params):
response = await es.bulk(params=bulk_params, **api_kwargs)
else:
response = await es.bulk(doc_type=params.get("type"), params=bulk_params, **api_kwargs)
retry_stats = []
total_success = total_error = total_time = 0
for i in range(3): # this can be configurable later
stats, lines_to_retry = (
self.detailed_stats(params, response) if detailed_results else self.simple_stats(bulk_size, unit, response, api_kwargs)
)
retry_stats.append(stats)
if len(lines_to_retry) == 0:
break
api_kwargs["body"] = lines_to_retry
bulk_size = len(lines_to_retry) / 2
response = await es.bulk(params=bulk_params, **api_kwargs)
request_status = response.meta.status
if request_status == 400:
self.logger.warn(f"400 after retry. Payload: {lines_to_retry}")
if len(retry_stats) == 1:
stats = retry_stats[0]
else:
for stats in retry_stats:
total_success += stats["success-count"]
total_error += stats["error-count"]
total_time += stats["took"]
retry_count = len(retry_stats)

stats = self.detailed_stats(params, response) if detailed_results else self.simple_stats(bulk_size, unit, response)
stats = {"success-count": total_success, "error-count": total_error, "retry-count": retry_count, "took": total_time, "success": True, "retried": True}

meta_data = {
"index": params.get("index"),
Expand All @@ -561,7 +583,7 @@ def _utf8len(line):
bulk_request_size_bytes = 0
total_document_size_bytes = 0
with_action_metadata = mandatory(params, "action-metadata-present", self)

request_status = response.meta.status
if isinstance(params["body"], bytes):
bulk_lines = params["body"].split(b"\n")
elif isinstance(params["body"], str):
Expand Down Expand Up @@ -598,6 +620,7 @@ def _utf8len(line):
shards_histogram[sk]["item-count"] += 1
if data["status"] > 299 or ("_shards" in data and data["_shards"]["failed"] > 0):
bulk_error_count += 1
request_status = max(request_status, data["status"])
self.extract_error_details(error_details, data)
else:
bulk_success_count += 1
Expand All @@ -610,6 +633,7 @@ def _utf8len(line):
"shards_histogram": list(shards_histogram.values()),
"bulk-request-size-bytes": bulk_request_size_bytes,
"total-document-size-bytes": total_document_size_bytes,
"http-status": request_status,
}
if bulk_error_count > 0:
stats["error-type"] = "bulk"
Expand All @@ -620,22 +644,36 @@ def _utf8len(line):

return stats

def simple_stats(self, bulk_size, unit, response):
def simple_stats(self, bulk_size, unit, response, params):
bulk_success_count = bulk_size if unit == "docs" else None
bulk_error_count = 0
error_details = set()
request_status = response.meta.status
doc_status = -1
# parse lazily on the fast path
props = parse(response, ["errors", "took"])

if isinstance(params["body"], bytes):
bulk_lines = params["body"].split(b"\n")
elif isinstance(params["body"], str):
bulk_lines = params["body"].split("\n")
elif isinstance(params["body"], list):
bulk_lines = params["body"]
else:
raise exceptions.DataError("bulk body is not of type bytes, string, or list")
lines_to_retry = []
if props.get("errors", False):
# determine success count regardless of unit because we need to iterate through all items anyway
bulk_success_count = 0
# Reparse fully in case of errors - this will be slower
parsed_response = json.loads(response.getvalue())
for item in parsed_response["items"]:
for i, item in enumerate(parsed_response["items"]):
data = next(iter(item.values()))
if data["status"] > 299 or ("_shards" in data and data["_shards"]["failed"] > 0):
doc_status = max(doc_status, data["status"])
bulk_error_count += 1
if data["status"] == 429: # don't retry other statuses right now.
lines_to_retry.append(bulk_lines[(i * 2)])
lines_to_retry.append(bulk_lines[(i * 2) + 1])
self.extract_error_details(error_details, data)
else:
bulk_success_count += 1
Expand All @@ -644,11 +682,15 @@ def simple_stats(self, bulk_size, unit, response):
"success": bulk_error_count == 0,
"success-count": bulk_success_count,
"error-count": bulk_error_count,
"request-status": request_status,
}
if doc_status > 0:
stats["doc-status"] = doc_status # if we have not encountered any errors, we will never have inspected the status for each doc
# alternatively we could set it to the same as request-status, even though in reality they tend to be status 201 rather than 200
if bulk_error_count > 0:
stats["error-type"] = "bulk"
stats["error-description"] = self.error_description(error_details)
return stats
return stats, lines_to_retry

def extract_error_details(self, error_details, data):
error_data = data.get("error", {})
Expand Down