Skip to content

Commit 2d8647f

Browse files
committed
fix(sink): OTLP sink panics when event is invalid or cannot be detected
The current implementation of the OTLP sink contains some critical flaws which allows algorithm to panic. This could lead to a vector partition to be crashed. This PR change the OTLP sink algorithm to handle all the error correctly, no panic, no expect, no unwrap and logging all internal errors to make debugging process and customers life easier. Ref: LOG-19721
1 parent 647ef23 commit 2d8647f

File tree

11 files changed

+503
-433
lines changed

11 files changed

+503
-433
lines changed

src/sinks/opentelemetry/config.rs

+80-33
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,49 @@ impl SinkBatchSettings for OpentelemetryDefaultBatchSettings {
5353
const TIMEOUT_SECS: f64 = 1.0;
5454
}
5555

56+
#[derive(Debug)]
57+
pub struct OpentelemetrySinkEndpointError {
58+
message: String,
59+
}
60+
61+
impl OpentelemetrySinkEndpointError {
62+
pub fn new(msg: &str) -> Self {
63+
OpentelemetrySinkEndpointError {
64+
message: String::from(msg),
65+
}
66+
}
67+
}
68+
69+
impl From<InvalidUri> for OpentelemetrySinkEndpointError {
70+
fn from(error: InvalidUri) -> Self {
71+
Self::new(&error.to_string())
72+
}
73+
}
74+
75+
impl From<&str> for OpentelemetrySinkEndpointError {
76+
fn from(error: &str) -> Self {
77+
Self::new(error)
78+
}
79+
}
80+
81+
impl From<http::Error> for OpentelemetrySinkEndpointError {
82+
fn from(error: http::Error) -> Self {
83+
Self::new(&error.to_string())
84+
}
85+
}
86+
87+
impl std::fmt::Display for OpentelemetrySinkEndpointError {
88+
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
89+
write!(f, "{}", self.message)
90+
}
91+
}
92+
93+
impl std::error::Error for OpentelemetrySinkEndpointError {
94+
fn description(&self) -> &str {
95+
&self.message
96+
}
97+
}
98+
5699
#[derive(Clone, Debug, Default)]
57100
pub struct OpentelemetryEndpoint {
58101
healthcheck_uri: Uri,
@@ -62,66 +105,69 @@ pub struct OpentelemetryEndpoint {
62105
}
63106

64107
impl OpentelemetryEndpoint {
65-
pub fn new(endpoint: Uri) -> Self {
66-
let scheme = endpoint.scheme_str().unwrap();
67-
let authority = endpoint.authority().map(|a| a.as_str()).unwrap();
68-
let host = endpoint.host().unwrap().to_owned();
108+
pub fn healthcheck(&self) -> Uri {
109+
self.healthcheck_uri.clone()
110+
}
111+
112+
pub fn endpoint(&self, model_type: OpentelemetryModelType) -> Option<Uri> {
113+
match model_type {
114+
OpentelemetryModelType::Logs => Some(self.logs_uri.clone()),
115+
OpentelemetryModelType::Metrics => Some(self.metrics_uri.clone()),
116+
OpentelemetryModelType::Traces => Some(self.traces_uri.clone()),
117+
OpentelemetryModelType::Unknown => None,
118+
}
119+
}
120+
}
121+
122+
impl TryFrom<String> for OpentelemetryEndpoint {
123+
type Error = OpentelemetrySinkEndpointError;
124+
125+
fn try_from(endpoint: String) -> Result<Self, Self::Error> {
126+
let uri = endpoint
127+
.parse::<Uri>()
128+
.map_err(OpentelemetrySinkEndpointError::from)?;
129+
130+
let scheme = uri.scheme_str().ok_or("Endpoint scheme is invalid")?;
131+
let authority = uri
132+
.authority()
133+
.map(|a| a.as_str())
134+
.ok_or("Endpoint authority is invalid")?;
135+
let host = uri.host().ok_or("Endpoint host is invalid")?;
69136

70137
let healthcheck_uri = Uri::builder()
71138
.scheme(scheme)
72-
.authority(host + ":" + OPENTELEMETRY_HEALTHCHECK_PORT)
139+
.authority(host.to_owned() + ":" + OPENTELEMETRY_HEALTHCHECK_PORT)
73140
.path_and_query("/")
74141
.build()
75-
.unwrap();
142+
.map_err(OpentelemetrySinkEndpointError::from)?;
76143

77144
let logs_uri = Uri::builder()
78145
.scheme(scheme)
79146
.authority(authority)
80147
.path_and_query("/v1/logs")
81148
.build()
82-
.unwrap();
149+
.map_err(OpentelemetrySinkEndpointError::from)?;
83150

84151
let metrics_uri = Uri::builder()
85152
.scheme(scheme)
86153
.authority(authority)
87154
.path_and_query("/v1/metrics")
88155
.build()
89-
.unwrap();
156+
.map_err(OpentelemetrySinkEndpointError::from)?;
90157

91158
let traces_uri = Uri::builder()
92159
.scheme(scheme)
93160
.authority(authority)
94161
.path_and_query("/v1/traces")
95162
.build()
96-
.unwrap();
163+
.map_err(OpentelemetrySinkEndpointError::from)?;
97164

98-
Self {
165+
Ok(Self {
99166
healthcheck_uri,
100167
logs_uri,
101168
metrics_uri,
102169
traces_uri,
103-
}
104-
}
105-
106-
pub fn healthcheck(&self) -> Uri {
107-
self.healthcheck_uri.clone()
108-
}
109-
110-
pub fn endpoint(&self, model_type: OpentelemetryModelType) -> Uri {
111-
match model_type {
112-
OpentelemetryModelType::Logs => self.logs_uri.clone(),
113-
OpentelemetryModelType::Metrics => self.metrics_uri.clone(),
114-
OpentelemetryModelType::Traces => self.traces_uri.clone(),
115-
}
116-
}
117-
}
118-
119-
impl TryFrom<String> for OpentelemetryEndpoint {
120-
type Error = InvalidUri;
121-
122-
fn try_from(endpoint: String) -> Result<Self, Self::Error> {
123-
let uri = endpoint.parse::<Uri>()?;
124-
Ok(Self::new(uri))
170+
})
125171
}
126172
}
127173

@@ -224,7 +270,7 @@ impl SinkConfig for OpentelemetrySinkConfig {
224270
client,
225271
auth,
226272
},
227-
ctx.mezmo_ctx,
273+
ctx.mezmo_ctx.clone(),
228274
));
229275

230276
let compression = self.compression;
@@ -233,6 +279,7 @@ impl SinkConfig for OpentelemetrySinkConfig {
233279
encoder: OpentelemetryEncoder,
234280
compression,
235281
batcher_settings,
282+
mezmo_ctx: ctx.mezmo_ctx,
236283
};
237284
Ok((VectorSink::from_event_streamsink(sink), healthcheck))
238285
}

src/sinks/opentelemetry/encoding.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ impl Encoder<Result<OpentelemetryModel, OpentelemetrySinkError>> for Opentelemet
1919
writer: &mut dyn io::Write,
2020
) -> io::Result<(usize, GroupedCountByteSize)> {
2121
let output = match input? {
22-
OpentelemetryModel::Logs(log_model) => encode_log(&log_model)?,
23-
OpentelemetryModel::Metrics(metric_model) => encode_metrics(&metric_model)?,
24-
OpentelemetryModel::Traces(trace_model) => encode_traces(trace_model)?,
22+
OpentelemetryModel::Logs(log_models) => encode_log(log_models)?,
23+
OpentelemetryModel::Metrics(metric_models) => encode_metrics(metric_models)?,
24+
OpentelemetryModel::Traces(trace_models) => encode_traces(trace_models)?,
2525
};
2626
let size = as_tracked_write::<_, _, io::Error>(writer, &output, |writer, output| {
2727
writer.write_all(output)?;

src/sinks/opentelemetry/logs/encoding.rs

+2-4
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,9 @@ use crate::sinks::opentelemetry::{
44
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
55
use prost::Message;
66

7-
pub fn encode(model: &OpentelemetryLogsModel) -> Result<Vec<u8>, OpentelemetrySinkError> {
8-
let logs = model.0[0].get("logs").unwrap().clone();
9-
7+
pub fn encode(models: Vec<OpentelemetryLogsModel>) -> Result<Vec<u8>, OpentelemetrySinkError> {
108
let req = ExportLogsServiceRequest {
11-
resource_logs: logs.into_iter().map(Into::into).collect(),
9+
resource_logs: models.into_iter().map(|model| model.0.into()).collect(),
1210
};
1311

1412
let mut buf = vec![];

0 commit comments

Comments
 (0)