-
Notifications
You must be signed in to change notification settings - Fork 301
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add API to write OpenTelemetry logs to GreptimeDB #4755
base: main
Are you sure you want to change the base?
Conversation
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #4755 +/- ##
==========================================
- Coverage 84.68% 84.21% -0.47%
==========================================
Files 1118 1119 +1
Lines 202399 203488 +1089
==========================================
- Hits 171393 171361 -32
- Misses 31006 32127 +1121 |
None, | ||
), | ||
( | ||
"timestamp", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can name this as greptime_timestamp
for consistency
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the otlp field, and I think it's better to leave it as is. Refer to https://opentelemetry.io/docs/specs/otel/logs/data-model/#log-and-event-record-definition
|
||
// TODO(qtang): we show convert jsonb to json string in http sql API | ||
let expected = r#"[["","",[64,0,0,0],"https://opentelemetry.io/schemas/1.0.0/scopeLogs","https://opentelemetry.io/schemas/1.0.0/resourceLogs",[64,0,0,1,16,0,0,13,16,0,0,19,114,101,115,111,117,114,99,101,95,97,116,116,114,114,101,115,111,117,114,99,101,45,97,116,116,114,45,118,97,108,45,49],[64,0,0,2,16,0,0,8,16,0,0,3,16,0,0,4,16,0,0,3,99,117,115,116,111,109,101,114,101,110,118,97,99,109,101,100,101,118],1581452773000000789,1581452773000000789,"30","30",1,"Info",9,"null"],["","",[64,0,0,0],"https://opentelemetry.io/schemas/1.0.0/scopeLogs","https://opentelemetry.io/schemas/1.0.0/resourceLogs",[64,0,0,1,16,0,0,13,16,0,0,19,114,101,115,111,117,114,99,101,95,97,116,116,114,114,101,115,111,117,114,99,101,45,97,116,116,114,45,118,97,108,45,49],[64,0,0,2,16,0,0,3,16,0,0,12,16,0,0,6,32,0,0,2,97,112,112,105,110,115,116,97,110,99,101,95,110,117,109,115,101,114,118,101,114,64,1],1581452773000009875,1581452773000009875,"3038303430323031303030303030303030303030303030303030303030303030","30313032303430383030303030303030",1,"Info",9,"null"]]"#; | ||
validate_data(&client, "select * from logs;", expected).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a jsonb to string sql function? we can convert the data otherwise it will be hard to maintain this code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The expected value seems to lack all the JSON field values, like those attribute maps. Also, the log body seems not appear in the result.
type Rejection = (StatusCode, &'static str); | ||
|
||
async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult<Self, Self::Rejection> { | ||
let table_name = parts.headers.get("X-Table-Name"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider add greptime prefix just like our database name field
|
||
async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult<Self, Self::Rejection> { | ||
let pipeline_name = parts.headers.get("X-Pipeline-Name"); | ||
let pipeline_version = parts.headers.get("X-Pipeline-Version"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider adding greptime prefix
@@ -205,6 +205,34 @@ where | |||
self.transformer.transform_mut(val) | |||
} | |||
|
|||
pub fn prepare_pipeline_value(&self, val: Value, result: &mut [Value]) -> Result<(), String> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add comment about its purpose and how it works
result[0] = val; | ||
} | ||
_ => { | ||
return Err("expect object".to_string()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not use string as error type. Also will this error message return to end user? We need to describe this situation more clear
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the next release, I will modify the error type. At present, the error types in the Pipeline crate are of the String type.
Ok(s) => Ok(s.to_string()), | ||
Err(_) => Err(( | ||
StatusCode::BAD_REQUEST, | ||
"`X-Pipeline-Name` or `X-Pipeline-Version` header is not string type.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to_str doesn't mean the value is not string. http headers are always string. we can return the nested error message.
Again, don't reinvent error handling in this module.
PipelineValue::Map(Map { values: map }) | ||
} | ||
|
||
fn build_identity_schema() -> Vec<ColumnSchema> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
by identity pipeline, I prefer it's an auto generated pipeline and schema based on input data. Here we already manually defined a schema for otel logs. So we better not to name it and assume it the identity pipeline
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've considered ways to automatically generate a pipeline. But it still has to convert the ExportLogsServiceRequest
into a PipelineValue
before it can be processed, so for performance reasons I skipped that step and just converted the ExportLogsServiceRequest into a Rows
OTLP_LOGS_ROWS.inc_by(rows as u64); | ||
|
||
self.handle_row_inserts(requests, ctx) | ||
.await | ||
.map_err(BoxedError::new) | ||
.context(error::ExecuteGrpcQuerySnafu) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OTLP_LOGS_ROWS.inc_by(rows as u64); | |
self.handle_row_inserts(requests, ctx) | |
.await | |
.map_err(BoxedError::new) | |
.context(error::ExecuteGrpcQuerySnafu) | |
self.handle_row_inserts(requests, ctx) | |
.await | |
.inspect(|_| OTLP_LOGS_ROWS.inc_by(rows as u64)) | |
.map_err(BoxedError::new) | |
.context(error::ExecuteGrpcQuerySnafu) |
Let's increment metrics after the insertion is done. Same as metrics and traces.
fn coerce_nested_value(v: &Value, transform: &Transform) -> Result<Option<ValueData>, String> { | ||
match &transform.type_ { | ||
Value::Array(_) | Value::Map(_) => (), | ||
t => { | ||
return Err(format!( | ||
"nested value type not supported {}", | ||
t.to_str_type() | ||
)) | ||
} | ||
} | ||
match v { | ||
Value::Map(_) => { | ||
let data: jsonb::Value = v.into(); | ||
Ok(Some(ValueData::BinaryValue(data.to_vec()))) | ||
} | ||
Value::Array(_) => { | ||
let data: jsonb::Value = v.into(); | ||
Ok(Some(ValueData::BinaryValue(data.to_vec()))) | ||
} | ||
_ => Err(format!("nested type not support {}", v.to_str_type())), | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The type from transform is ignored if it's not a map or array. It feels strange that a type field is written but not used in the configuration. Should we add something like a binary type to be more specific?
pipeline_info: PipelineInfo, | ||
table_info: TableInfo, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we should do this custom extractor. We can have headers: HeaderMap
and a function to extract header information.
let table_name = parts.headers.get("X-Table-Name"); | ||
match table_name { | ||
Some(name) => Ok(TableInfo { | ||
table_name: pipeline_header_error(name)?, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pipeline_header_error
doesn't handle table_name
missing.
Should we add a default table name if the table name is not presented, like the one we do in traces?
pub enum PipelineWay { | ||
Identity, | ||
Custom(Arc<Pipeline<GreptimeTransformer>>), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The enum might be better placed in the pipeline crate.
)), | ||
}, | ||
GreptimeValue { | ||
value_data: Some(ValueData::StringValue(bytes_to_hex_string(&log.trace_id))), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// A unique identifier for a trace. All logs from the same trace share
/// the same `trace_id`. The ID is a 16-byte array. An ID with all zeroes OR
/// of length other than 16 bytes is considered invalid (empty string in OTLP/JSON
/// is zero-length and thus is also invalid).
///
/// This field is optional.
///
/// The receivers SHOULD assume that the log record is not associated with a
/// trace if any of the following is true:
/// - the field is not present,
/// - the field contains an invalid value.
#[prost(bytes = "vec", tag = "9")]
pub trace_id: ::prost::alloc::vec::Vec<u8>,
According to the comment, we might want to
- validate the data
- convert it into human-readable string if the data is valid
Same as span_id
( | ||
"resource_schema_url", | ||
ColumnDataType::String, | ||
SemanticType::Field, | ||
None, | ||
None, | ||
), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to keep these schema_url
s.
|
||
// TODO(qtang): we show convert jsonb to json string in http sql API | ||
let expected = r#"[["","",[64,0,0,0],"https://opentelemetry.io/schemas/1.0.0/scopeLogs","https://opentelemetry.io/schemas/1.0.0/resourceLogs",[64,0,0,1,16,0,0,13,16,0,0,19,114,101,115,111,117,114,99,101,95,97,116,116,114,114,101,115,111,117,114,99,101,45,97,116,116,114,45,118,97,108,45,49],[64,0,0,2,16,0,0,8,16,0,0,3,16,0,0,4,16,0,0,3,99,117,115,116,111,109,101,114,101,110,118,97,99,109,101,100,101,118],1581452773000000789,1581452773000000789,"30","30",1,"Info",9,"null"],["","",[64,0,0,0],"https://opentelemetry.io/schemas/1.0.0/scopeLogs","https://opentelemetry.io/schemas/1.0.0/resourceLogs",[64,0,0,1,16,0,0,13,16,0,0,19,114,101,115,111,117,114,99,101,95,97,116,116,114,114,101,115,111,117,114,99,101,45,97,116,116,114,45,118,97,108,45,49],[64,0,0,2,16,0,0,3,16,0,0,12,16,0,0,6,32,0,0,2,97,112,112,105,110,115,116,97,110,99,101,95,110,117,109,115,101,114,118,101,114,64,1],1581452773000009875,1581452773000009875,"3038303430323031303030303030303030303030303030303030303030303030","30313032303430383030303030303030",1,"Info",9,"null"]]"#; | ||
validate_data(&client, "select * from logs;", expected).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The expected value seems to lack all the JSON field values, like those attribute maps. Also, the log body seems not appear in the result.
… type from hashmap to btremap to keep key order
I hereby agree to the terms of the GreptimeDB CLA.
Refer to a related PR or issue link (optional)
What's changed and what's your intention?
Checklist