Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/elasticsearch_retry_forbidden.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
The `elasticsearch` sink now retries on HTTP 403 Forbidden responses, which commonly occur when Elasticsearch enters read-only mode due to disk watermark thresholds. This applies both to full batch responses and to individual items within partial bulk failures when `request_retry_partial` is enabled.
51 changes: 50 additions & 1 deletion src/sinks/elasticsearch/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ impl RetryLogic for ElasticsearchRetryLogic {

match status {
StatusCode::TOO_MANY_REQUESTS => RetryAction::Retry("too many requests".into()),
StatusCode::FORBIDDEN => RetryAction::Retry(
format!(
"forbidden (possible read-only mode): {}",
String::from_utf8_lossy(response.http_response.body())
)
.into(),
),
StatusCode::NOT_IMPLEMENTED => {
RetryAction::DontRetry("endpoint not implemented".into())
}
Expand All @@ -136,17 +143,19 @@ impl RetryLogic for ElasticsearchRetryLogic {
if self.retry_partial {
// We will retry if there exists at least one item that
// failed with a retriable error.
// Those are backpressure and server errors.
// Those are backpressure, server errors, and forbidden (read-only mode).
let status_codes: Vec<bool> = resp
.iter_status()
.map(|(status, _)| {
status == StatusCode::TOO_MANY_REQUESTS
|| status == StatusCode::FORBIDDEN
|| status.is_server_error()
})
.collect();
if let Some((_status, _error)) =
resp.iter_status().find(|(status, _)| {
*status == StatusCode::TOO_MANY_REQUESTS
|| *status == StatusCode::FORBIDDEN
|| status.is_server_error()
})
{
Expand Down Expand Up @@ -279,4 +288,44 @@ mod tests {
"error type: mapper_parsing_exception, reason: object mapping for [host] tried to parse field [host] as object, but found a concrete value"
);
}

#[test]
fn handles_forbidden_response_as_retriable() {
let json = r#"{"error":{"root_cause":[{"type":"cluster_block_exception","reason":"blocked by: [FORBIDDEN/12/index read-only / allow delete (api)];"}],"type":"cluster_block_exception","reason":"blocked by: [FORBIDDEN/12/index read-only / allow delete (api)];"},"status":403}"#;
let response = Response::builder()
.status(StatusCode::FORBIDDEN)
.body(Bytes::from(json))
.unwrap();
let logic = ElasticsearchRetryLogic {
retry_partial: false,
};
assert!(matches!(
logic.should_retry_response(&ElasticsearchResponse {
http_response: response,
event_status: EventStatus::Errored,
events_byte_size: CountByteSize(1, JsonSize::new(1)).into(),
}),
RetryAction::Retry(_)
));
}

#[test]
fn handles_partial_forbidden_response() {
let json = "{\"took\":34,\"errors\":true,\"items\":[{\"index\":{\"_index\":\"test-asjkf1234\",\"_type\":\"log_lines\",\"_id\":\"4Z3QLYEBT52RtoOEKz2H\",\"status\":403,\"error\":{\"type\":\"cluster_block_exception\",\"reason\":\"blocked by: [FORBIDDEN/12/index read-only / allow delete (api)];\"}}}]}";
let response = Response::builder()
.status(StatusCode::OK)
.body(Bytes::from(json))
.unwrap();
let logic = ElasticsearchRetryLogic {
retry_partial: true,
};
assert!(matches!(
logic.should_retry_response(&ElasticsearchResponse {
http_response: response,
event_status: EventStatus::Errored,
events_byte_size: CountByteSize(1, JsonSize::new(1)).into(),
}),
RetryAction::RetryPartial(_)
));
}
}
6 changes: 4 additions & 2 deletions website/cue/reference/components/sinks/elasticsearch.cue
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,10 @@ components: sinks: elasticsearch: {
typed. To change this behavior, refer to the Elasticsearch [`ignore_malformed`
setting](\(urls.elasticsearch_ignore_malformed)).

By default, partial failures are not retried. To enable retries, set `request_retry_partial`. Once enabled it will
retry whole partially failed requests. As such it is advised to use `id_key` to avoid duplicates.
By default, partial failures are not retried. To enable retries, set `request_retry_partial`. Once enabled,
only the failed items with retriable status codes are retried (not the entire bulk request).
The retriable status codes are: 429 (Too Many Requests), 403 (Forbidden, e.g. read-only mode),
and 5xx (server errors). It is advised to use `id_key` to avoid duplicates.
"""
}

Expand Down