Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(otlp): replace otlp trace attr type from string to jsonb #4918

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions src/frontend/src/instance/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use pipeline::PipelineWay;
use servers::error::{self, AuthSnafu, Result as ServerResult};
use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef};
use servers::otlp;
use servers::otlp::plugin::TraceParserRef;
use servers::query_handler::OpenTelemetryProtocolHandler;
use session::context::QueryContextRef;
use snafu::ResultExt;
Expand Down Expand Up @@ -64,6 +63,7 @@ impl OpenTelemetryProtocolHandler for Instance {
async fn traces(
&self,
request: ExportTraceServiceRequest,
table_name: String,
ctx: QueryContextRef,
) -> ServerResult<Output> {
self.plugins
Expand All @@ -77,13 +77,7 @@ impl OpenTelemetryProtocolHandler for Instance {
.get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
interceptor_ref.pre_execute(ctx.clone())?;

let (table_name, spans) = match self.plugins.get::<TraceParserRef>() {
Some(parser) => (parser.table_name(), parser.parse(request)),
None => (
otlp::trace::TRACE_TABLE_NAME.to_string(),
otlp::trace::parse(request),
),
};
let spans = otlp::trace::parse(request);

let (requests, rows) = otlp::trace::to_grpc_insert_requests(table_name, spans)?;

Expand Down
11 changes: 10 additions & 1 deletion src/servers/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,14 @@ pub enum Error {
location: Location,
},

#[snafu(display("Invalid table name"))]
InvalidTableName {
#[snafu(source)]
error: tonic::metadata::errors::ToStrError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to initialize a watcher for file {}", path))]
FileWatch {
path: String,
Expand Down Expand Up @@ -620,7 +628,8 @@ impl ErrorExt for Error {
| UnsupportedContentType { .. }
| TimestampOverflow { .. }
| OpenTelemetryLog { .. }
| UnsupportedJsonDataTypeForTag { .. } => StatusCode::InvalidArguments,
| UnsupportedJsonDataTypeForTag { .. }
| InvalidTableName { .. } => StatusCode::InvalidArguments,

Catalog { source, .. } => source.status_code(),
RowWriter { source, .. } => source.status_code(),
Expand Down
16 changes: 13 additions & 3 deletions src/servers/src/grpc/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ use opentelemetry_proto::tonic::collector::trace::v1::{
ExportTraceServiceRequest, ExportTraceServiceResponse,
};
use session::context::{Channel, QueryContext};
use snafu::OptionExt;
use snafu::{OptionExt, ResultExt};
use tonic::{Request, Response, Status};

use crate::error;
use crate::http::header::constants::GREPTIME_TRACE_TABLE_NAME_HEADER_NAME;
use crate::otlp::trace::TRACE_TABLE_NAME;
use crate::query_handler::OpenTelemetryProtocolHandlerRef;

pub struct OtlpService {
Expand All @@ -46,7 +48,15 @@ impl TraceService for OtlpService {
&self,
request: Request<ExportTraceServiceRequest>,
) -> StdResult<Response<ExportTraceServiceResponse>, Status> {
let (_headers, extensions, req) = request.into_parts();
let (headers, extensions, req) = request.into_parts();

let table_name = match headers.get(GREPTIME_TRACE_TABLE_NAME_HEADER_NAME) {
Some(table_name) => table_name
.to_str()
.context(error::InvalidTableNameSnafu)?
.to_string(),
None => TRACE_TABLE_NAME.to_string(),
};

let mut ctx = extensions
.get::<QueryContext>()
Expand All @@ -55,7 +65,7 @@ impl TraceService for OtlpService {
ctx.set_channel(Channel::Otlp);
let ctx = Arc::new(ctx);

let _ = self.handler.traces(req, ctx).await?;
let _ = self.handler.traces(req, table_name, ctx).await?;

Ok(Response::new(ExportTraceServiceResponse {
partial_success: None,
Expand Down
1 change: 1 addition & 0 deletions src/servers/src/http/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub mod constants {
pub const GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME: &str = "x-greptime-log-pipeline-version";
pub const GREPTIME_LOG_TABLE_NAME_HEADER_NAME: &str = "x-greptime-log-table-name";
pub const GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME: &str = "x-greptime-log-extract-keys";
pub const GREPTIME_TRACE_TABLE_NAME_HEADER_NAME: &str = "x-greptime-trace-table-name";
}

pub static GREPTIME_DB_HEADER_FORMAT: HeaderName =
Expand Down
66 changes: 47 additions & 19 deletions src/servers/src/http/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use axum::response::IntoResponse;
use axum::{async_trait, Extension};
use bytes::Bytes;
use common_telemetry::tracing;
use http::HeaderMap;
use opentelemetry_proto::tonic::collector::logs::v1::{
ExportLogsServiceRequest, ExportLogsServiceResponse,
};
Expand All @@ -41,11 +42,13 @@ use snafu::prelude::*;

use super::header::constants::GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME;
use super::header::{write_cost_header_map, CONTENT_TYPE_PROTOBUF};
use crate::error::{self, PipelineSnafu, Result};
use crate::error::{self, InvalidUtf8ValueSnafu, PipelineSnafu, Result};
use crate::http::header::constants::{
GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME, GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME,
GREPTIME_LOG_TABLE_NAME_HEADER_NAME,
GREPTIME_LOG_TABLE_NAME_HEADER_NAME, GREPTIME_TRACE_TABLE_NAME_HEADER_NAME,
};
use crate::otlp::logs::LOG_TABLE_NAME;
use crate::otlp::trace::TRACE_TABLE_NAME;
use crate::query_handler::OpenTelemetryProtocolHandlerRef;

#[axum_macros::debug_handler]
Expand Down Expand Up @@ -80,10 +83,16 @@ pub async fn metrics(
#[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "traces"))]
pub async fn traces(
State(handler): State<OpenTelemetryProtocolHandlerRef>,
header: HeaderMap,
Extension(mut query_ctx): Extension<QueryContext>,
bytes: Bytes,
) -> Result<OtlpResponse<ExportTraceServiceResponse>> {
let db = query_ctx.get_db_string();
let table_name = extract_table_name_from_header(
&header,
GREPTIME_TRACE_TABLE_NAME_HEADER_NAME,
TRACE_TABLE_NAME,
)?;
query_ctx.set_channel(Channel::Otlp);
let query_ctx = Arc::new(query_ctx);
let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_TRACES_ELAPSED
Expand All @@ -92,7 +101,7 @@ pub async fn traces(
let request =
ExportTraceServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?;
handler
.traces(request, query_ctx)
.traces(request, table_name, query_ctx)
.await
.map(|o| OtlpResponse {
resp_body: ExportTraceServiceResponse {
Expand All @@ -107,17 +116,31 @@ pub struct PipelineInfo {
pub pipeline_version: Option<String>,
}

fn pipeline_header_error(
fn parse_header_value_to_string(header: &HeaderValue) -> Result<String> {
String::from_utf8(header.as_bytes().to_vec()).context(InvalidUtf8ValueSnafu)
}

fn parse_pipeline_header_value_to_string(
header: &HeaderValue,
key: &str,
) -> StdResult<String, (http::StatusCode, String)> {
let header_utf8 = str::from_utf8(header.as_bytes());
match header_utf8 {
Ok(s) => Ok(s.to_string()),
Err(_) => Err((
header_name: &str,
) -> StdResult<String, (StatusCode, String)> {
parse_header_value_to_string(header).map_err(|_| {
(
StatusCode::BAD_REQUEST,
format!("`{}` header is not valid UTF-8 string type.", key),
)),
format!("`{}` header is not valid UTF-8 string type.", header_name),
)
})
}

fn extract_table_name_from_header(
headers: &HeaderMap,
header: &str,
default_table_name: &str,
) -> Result<String> {
let table_name = headers.get(header);
match table_name {
Some(name) => parse_header_value_to_string(name),
None => Ok(default_table_name.to_string()),
}
}
Comment on lines +119 to 145
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we can merge all these three functions.


Expand All @@ -133,11 +156,11 @@ where
let pipeline_version = parts.headers.get(GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME);
match (pipeline_name, pipeline_version) {
(Some(name), Some(version)) => Ok(PipelineInfo {
pipeline_name: Some(pipeline_header_error(
pipeline_name: Some(parse_pipeline_header_value_to_string(
name,
GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME,
)?),
pipeline_version: Some(pipeline_header_error(
pipeline_version: Some(parse_pipeline_header_value_to_string(
version,
GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME,
)?),
Expand All @@ -147,7 +170,7 @@ where
pipeline_version: None,
}),
(Some(name), None) => Ok(PipelineInfo {
pipeline_name: Some(pipeline_header_error(
pipeline_name: Some(parse_pipeline_header_value_to_string(
name,
GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME,
)?),
Expand All @@ -173,10 +196,13 @@ where

match table_name {
Some(name) => Ok(TableInfo {
table_name: pipeline_header_error(name, GREPTIME_LOG_TABLE_NAME_HEADER_NAME)?,
table_name: parse_pipeline_header_value_to_string(
name,
GREPTIME_LOG_TABLE_NAME_HEADER_NAME,
)?,
}),
None => Ok(TableInfo {
table_name: "opentelemetry_logs".to_string(),
table_name: LOG_TABLE_NAME.to_string(),
}),
}
}
Expand All @@ -196,8 +222,10 @@ where

match select {
Some(name) => {
let select_header =
pipeline_header_error(name, GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME)?;
let select_header = parse_pipeline_header_value_to_string(
name,
GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME,
)?;
if select_header.is_empty() {
Ok(SelectInfoWrapper(Default::default()))
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/servers/src/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@

pub mod logs;
pub mod metrics;
pub mod plugin;
pub mod trace;
mod utils;
41 changes: 3 additions & 38 deletions src/servers/src/otlp/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ use pipeline::{Array, Map, PipelineWay, SchemaInfo, SelectInfo, Value as Pipelin
use snafu::{ensure, ResultExt};

use super::trace::attributes::OtlpAnyValue;
use super::utils::{bytes_to_hex_string, key_value_to_jsonb};
use crate::error::{
IncompatibleSchemaSnafu, OpenTelemetryLogSnafu, Result, UnsupportedJsonDataTypeForTagSnafu,
};
use crate::otlp::trace::span::bytes_to_hex_string;

pub const LOG_TABLE_NAME: &str = "opentelemetry_logs";

/// Convert OpenTelemetry metrics to GreptimeDB insert requests
///
Expand Down Expand Up @@ -772,43 +774,6 @@ fn key_value_to_map(key_values: Vec<KeyValue>) -> BTreeMap<String, PipelineValue
map
}

fn any_value_to_jsonb(value: any_value::Value) -> JsonbValue<'static> {
match value {
any_value::Value::StringValue(s) => JsonbValue::String(s.into()),
any_value::Value::IntValue(i) => JsonbValue::Number(JsonbNumber::Int64(i)),
any_value::Value::DoubleValue(d) => JsonbValue::Number(JsonbNumber::Float64(d)),
any_value::Value::BoolValue(b) => JsonbValue::Bool(b),
any_value::Value::ArrayValue(a) => {
let values = a
.values
.into_iter()
.map(|v| match v.value {
Some(value) => any_value_to_jsonb(value),
None => JsonbValue::Null,
})
.collect();
JsonbValue::Array(values)
}
any_value::Value::KvlistValue(kv) => key_value_to_jsonb(kv.values),
any_value::Value::BytesValue(b) => JsonbValue::String(bytes_to_hex_string(&b).into()),
}
}

fn key_value_to_jsonb(key_values: Vec<KeyValue>) -> JsonbValue<'static> {
let mut map = BTreeMap::new();
for kv in key_values {
let value = match kv.value {
Some(value) => match value.value {
Some(value) => any_value_to_jsonb(value),
None => JsonbValue::Null,
},
None => JsonbValue::Null,
};
map.insert(kv.key.clone(), value);
}
JsonbValue::Object(map)
}

fn log_body_to_string(body: &AnyValue) -> String {
let otlp_value = OtlpAnyValue::from(body);
otlp_value.to_string()
Expand Down
28 changes: 0 additions & 28 deletions src/servers/src/otlp/plugin.rs

This file was deleted.

Loading