Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: opentsdb api support gzip body #1261

Merged
merged 16 commits into from
Oct 16, 2023
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions integration_tests/opentsdb/test-put.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,13 @@ def test_put_single_point_with_int_value():
assert r.text.__contains__('`timestamp` timestamp NOT NULL')
assert r.text.__contains__('`dc` string TAG')
assert r.text.__contains__('`host` string TAG')
# value is a bigint column
assert r.text.__contains__('`value` bigint')
# value is a double column
assert r.text.__contains__('`value` double')

r = execute_sql_query("""
SELECT timestamp, dc, host, value FROM {metric}
""".replace('{metric}', table_name))
assert r.text == """{"rows":[{"timestamp":{ts},"dc":"lga","host":"web01","value":9527}]}""".strip().replace('{ts}', str(ts))
assert r.text == """{"rows":[{"timestamp":{ts},"dc":"lga","host":"web01","value":9527.0}]}""".strip().replace('{ts}', str(ts))


def test_put_single_point_with_float_value():
Expand Down Expand Up @@ -251,7 +251,7 @@ def test_put_multi_points_with_different_tags_in_one_table():
SELECT timestamp, dc, host, value FROM {metric} ORDER BY value desc
""".replace('{metric}', table_name))
assert r.text == """
{"rows":[{"timestamp":{ts},"dc":null,"host":"web01","value":18},{"timestamp":{ts},"dc":"lga","host":null,"value":9}]}
{"rows":[{"timestamp":{ts},"dc":null,"host":"web01","value":18.0},{"timestamp":{ts},"dc":"lga","host":null,"value":9.0}]}
""".strip().replace('{ts}', str(ts))


Expand All @@ -261,7 +261,7 @@ def test_put_multi_points_with_different_datatype_in_one_table():
table_name = table_prefix + str(ts)
drop_table(table_name)

execute_put_then_assert_fail("""
execute_put_then_assert_success("""
[
{
"metric": "{metric}",
Expand Down Expand Up @@ -319,14 +319,14 @@ def test_put_multi_points_in_multi_table():
SELECT timestamp, dc, host, value FROM {metric}
""".replace('{metric}', table_name))
assert r.text == """
{"rows":[{"timestamp":{ts},"dc":"lga","host":"web01","value":18}]}
{"rows":[{"timestamp":{ts},"dc":"lga","host":"web01","value":18.0}]}
""".strip().replace('{ts}', str(ts))

r = execute_sql_query("""
SELECT timestamp, dc, host, value FROM {metric}
""".replace('{metric}', table2_name))
assert r.text == """
{"rows":[{"timestamp":{ts},"dc":"lga","host":"web02","value":9}]}
{"rows":[{"timestamp":{ts},"dc":"lga","host":"web02","value":9.0}]}
""".strip().replace('{ts}', str(ts))


Expand Down
4 changes: 2 additions & 2 deletions proxy/src/opentsdb/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ pub struct Point {
#[derive(Debug, Deserialize)]
#[serde(untagged)]
pub enum Value {
IntegerValue(i64),
// TODO: telegraf parse 0.0 as 0, which will confuse int and double type
// IntegerValue(i64),
F64Value(f64),
}

Expand Down Expand Up @@ -163,7 +164,6 @@ pub(crate) fn convert_put_request(req: PutRequest) -> Result<Vec<WriteTableReque
}

let value = match point.value {
Value::IntegerValue(v) => value::Value::Int64Value(v),
Value::F64Value(v) => value::Value::Float64Value(v),
};
let fields = vec![Field {
Expand Down
1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ cluster = { workspace = true }
common_types = { workspace = true }
datafusion = { workspace = true }
df_operator = { workspace = true }
flate2 = "1.0"
future_ext = { workspace = true }
futures = { workspace = true }
generic_error = { workspace = true }
Expand Down
4 changes: 4 additions & 0 deletions server/src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,7 @@ pub const CATALOG_HEADER: &str = "x-ceresdb-catalog";
pub const SCHEMA_HEADER: &str = "x-ceresdb-schema";
/// Header of tenant name
pub const TENANT_HEADER: &str = "x-ceresdb-access-tenant";
/// Header of content encoding type
pub const CONTENT_ENCODING_HEADER: &str = "content-encoding";

pub const GZIP_ENCODING: &str = "gzip";
51 changes: 48 additions & 3 deletions server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::{
collections::HashMap,
convert::Infallible,
error::Error as StdError,
io::Read,
net::IpAddr,
sync::{atomic::Ordering, Arc},
time::Duration,
Expand All @@ -26,6 +27,8 @@ use std::{
use analytic_engine::setup::OpenedWals;
use bytes_ext::Bytes;
use cluster::ClusterRef;
use datafusion::parquet::data_type::AsBytes;
use flate2::read::GzDecoder;
use generic_error::{BoxError, GenericError};
use logger::{error, info, RuntimeLevel};
use macros::define_result;
Expand Down Expand Up @@ -55,7 +58,8 @@ use warp::{
};

use crate::{
consts, error_util,
consts::{self, CONTENT_ENCODING_HEADER, GZIP_ENCODING},
error_util,
metrics::{self, HTTP_HANDLER_DURATION_HISTOGRAM_VEC},
};

Expand Down Expand Up @@ -121,6 +125,12 @@ pub enum Error {
source: Box<dyn StdError + Send + Sync>,
},

#[snafu(display("Fail to decompress gzip body, err:{}.", source))]
UnGzip { source: std::io::Error },

#[snafu(display("Unsupported content encoding type, value:{}.", encoding_type))]
UnspportedContentEncodingType { encoding_type: String },

#[snafu(display("Server already started.\nBacktrace:\n{}", backtrace))]
AlreadyStarted { backtrace: Backtrace },

Expand All @@ -141,6 +151,23 @@ define_result!(Error);

impl reject::Reject for Error {}

enum ContentEncodingType {
Gzip,
}

impl TryFrom<&str> for ContentEncodingType {
type Error = Error;

fn try_from(value: &str) -> Result<Self> {
match value {
GZIP_ENCODING => Ok(ContentEncodingType::Gzip),
_ => Err(Error::UnspportedContentEncodingType {
encoding_type: value.to_string(),
}),
}
}
}

/// Http service
///
/// Endpoints beginning with /debug are for internal use, and may subject to
Expand Down Expand Up @@ -397,7 +424,23 @@ impl Service {
.and(warp::query::<PutParams>())
.and(warp::body::bytes())
.and(self.with_proxy())
.and_then(|ctx, params, points, proxy: Arc<Proxy>| async move {
.and(header::optional::<String>(CONTENT_ENCODING_HEADER))
.and_then(|ctx, params, points: Bytes, proxy: Arc<Proxy>, encoding: Option<String>| async move {
let points = match encoding {
Some(encoding) => {
let encode_type = ContentEncodingType::try_from(encoding.as_str())?;
match encode_type {
ContentEncodingType::Gzip => {
let bytes = points.as_bytes();
let mut decoder = GzDecoder::new(bytes);
let mut decompressed_data = Vec::with_capacity(bytes.len()* 2);
decoder.read_to_end(&mut decompressed_data).context(UnGzip)?;
decompressed_data.into()
},
}
},
None => points,
};
let request = PutRequest::new(points, params);
let result = proxy.handle_opentsdb_put(ctx, request).await;
match result {
Expand Down Expand Up @@ -834,7 +877,9 @@ struct ErrorResponse {

fn error_to_status_code(err: &Error) -> StatusCode {
match err {
Error::CreateContext { .. } => StatusCode::BAD_REQUEST,
Error::UnGzip { .. }
| Error::UnspportedContentEncodingType { .. }
| Error::CreateContext { .. } => StatusCode::BAD_REQUEST,
// TODO(yingwen): Map handle request error to more accurate status code
Error::HandleRequest { .. }
| Error::MissingEngineRuntimes { .. }
Expand Down
Loading