Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/24063_retry_docker_logs_client.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `docker_logs` source now includes exponential backoff retry logic for Docker daemon communication failures. This improves reliability when working with slow or temporarily unresponsive Docker daemons by retrying with increasing delays instead of immediately stopping.

authors: titaneric
41 changes: 36 additions & 5 deletions src/sources/docker_logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use vrl::{
use super::util::MultilineConfig;
use crate::{
SourceSender,
common::backoff::ExponentialBackoff,
config::{DataType, SourceConfig, SourceContext, SourceOutput, log_schema},
docker::{DockerTlsConfig, docker},
event::{self, EstimatedJsonEncodedSizeOf, LogEvent, Value, merge_state::LogEventMergeState},
Expand Down Expand Up @@ -468,6 +469,8 @@ struct DockerLogsSource {
/// It may contain shortened container id.
hostname: Option<String>,
backoff_duration: Duration,
/// Backoff strategy for events stream retries
events_backoff: ExponentialBackoff,
}

impl DockerLogsSource {
Expand Down Expand Up @@ -521,6 +524,8 @@ impl DockerLogsSource {
main_recv,
hostname,
backoff_duration: backoff_secs,
events_backoff: ExponentialBackoff::from_millis(backoff_secs.as_millis() as u64)
.max_delay(Duration::from_secs(300)),
})
}

Expand Down Expand Up @@ -620,6 +625,9 @@ impl DockerLogsSource {
value = self.events.next() => {
match value {
Some(Ok(mut event)) => {
// Reset backoff on successful event
self.events_backoff.reset();

let action = event.action.unwrap();
let actor = event.actor.take().unwrap();
let id = actor.id.unwrap();
Expand Down Expand Up @@ -662,20 +670,43 @@ impl DockerLogsSource {
error,
container_id: None,
});
return;
// Retry events stream with exponential backoff
if !self.retry_events_stream_with_backoff("Docker events stream failed").await {
error!("Docker events stream failed and retry exhausted, shutting down");
return;
}
},
None => {
// TODO: this could be fixed, but should be tried with some timeoff and exponential backoff
error!(message = "Docker log event stream has ended unexpectedly.", internal_log_rate_limit = false);
info!(message = "Shutting down docker_logs source.");
return;
// Retry events stream with exponential backoff
if !self.retry_events_stream_with_backoff("Docker events stream ended").await {
error!("Docker events stream ended and retry exhausted, shutting down");
return;
}
}
};
}
};
}
}

/// Retry events stream with exponential backoff
/// Returns true if retry was attempted, false if exhausted
async fn retry_events_stream_with_backoff(&mut self, reason: &str) -> bool {
if let Some(delay) = self.events_backoff.next() {
warn!(
message = reason,
action = "retrying with backoff",
delay_ms = delay.as_millis()
);
tokio::time::sleep(delay).await;
self.events = Box::pin(self.esb.core.docker_logs_event_stream());
true
} else {
error!(message = "Events stream retry exhausted", reason = reason);
false
}
}

fn exclude_self(&self, id: &str) -> bool {
self.hostname
.as_ref()
Expand Down
Loading