Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement opentsdb query #1284

Closed
wants to merge 15 commits into from
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 41 additions & 0 deletions common_types/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,47 @@ pub fn build_schema_for_cpu() -> Schema {
builder.primary_key_indexes(vec![0, 1]).build().unwrap()
}

/// Build a schema for testing:
/// (tsid(uint64), key2(timestamp), tag1(string), tag2(string), value(double),
pub fn build_schema_for_metric() -> Schema {
let builder = schema::Builder::new()
.auto_increment_column_id(true)
.add_key_column(
column_schema::Builder::new(TSID_COLUMN.to_string(), DatumKind::UInt64)
.build()
.unwrap(),
)
.unwrap()
.add_key_column(
column_schema::Builder::new("timestamp".to_string(), DatumKind::Timestamp)
.build()
.unwrap(),
)
.unwrap()
.add_normal_column(
column_schema::Builder::new("tag1".to_string(), DatumKind::String)
.is_tag(true)
.build()
.unwrap(),
)
.unwrap()
.add_normal_column(
column_schema::Builder::new("tag2".to_string(), DatumKind::String)
.is_tag(true)
.build()
.unwrap(),
)
.unwrap()
.add_normal_column(
column_schema::Builder::new("value".to_string(), DatumKind::Double)
.build()
.unwrap(),
)
.unwrap();

builder.primary_key_indexes(vec![0, 1]).build().unwrap()
}

#[allow(clippy::too_many_arguments)]
pub fn build_row_for_dictionary(
key1: &[u8],
Expand Down
90 changes: 85 additions & 5 deletions proxy/src/opentsdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,32 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//! This module implements [put][1] for OpenTSDB
//! This module implements [put][1], [query][2] for OpenTSDB
//! [1]: http://opentsdb.net/docs/build/html/api_http/put.html
//! [2]: http://opentsdb.net/docs/build/html/api_http/query/index.html

use std::time::Instant;

use ceresdbproto::storage::{
RequestContext as GrpcRequestContext, WriteRequest as GrpcWriteRequest,
};
use futures::{stream::FuturesOrdered, StreamExt};
use generic_error::BoxError;
use http::StatusCode;
use logger::debug;
use logger::{debug, info};
use query_frontend::{
frontend::{Context as SqlContext, Frontend},
opentsdb::types::QueryRequest,
provider::CatalogMetaProvider,
};
use snafu::ResultExt;

use self::types::QueryResponse;
use crate::{
context::RequestContext,
error::{ErrNoCause, Result},
error::{ErrNoCause, ErrWithCause, Result},
metrics::HTTP_HANDLER_COUNTER_VEC,
opentsdb::types::{convert_put_request, PutRequest, PutResponse},
opentsdb::types::{convert_output_to_response, convert_put_request, PutRequest, PutResponse},
Context, Proxy,
};

Expand All @@ -38,7 +50,6 @@ impl Proxy {
req: PutRequest,
) -> Result<PutResponse> {
let write_table_requests = convert_put_request(req)?;

let num_rows: usize = write_table_requests
.iter()
.map(|req| {
Expand Down Expand Up @@ -90,4 +101,73 @@ impl Proxy {
}
}
}

pub async fn handle_opentsdb_query(
&self,
ctx: RequestContext,
req: QueryRequest,
) -> Result<Vec<QueryResponse>> {
let request_id = ctx.request_id;
let begin_instant = Instant::now();
let deadline = ctx.timeout.map(|t| begin_instant + t);

info!(
"Opentsdb query handler try to process request, request_id:{}, request:{:?}",
request_id, req
);

let provider = CatalogMetaProvider {
manager: self.instance.catalog_manager.clone(),
default_catalog: &ctx.catalog,
default_schema: &ctx.schema,
function_registry: &*self.instance.function_registry,
};
let frontend = Frontend::new(provider, self.instance.dyn_config.fronted.clone());
let sql_ctx = SqlContext::new(request_id, deadline);

let opentsdb_plan = frontend
.opentsdb_query_to_plan(&sql_ctx, req)
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::BAD_REQUEST,
msg: "Failed to build plan",
})?;

for plan in &opentsdb_plan.plans {
self.instance
.limiter
.try_limit(&plan.plan)
.box_err()
.context(ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "Query is blocked",
})?;
}

let mut futures = FuturesOrdered::new();
for plan in opentsdb_plan.plans {
let one_resp = async {
let output = self
.execute_plan(request_id, &ctx.catalog, &ctx.schema, plan.plan, deadline)
.await?;

convert_output_to_response(
output,
plan.metric,
plan.field_col_name,
plan.timestamp_col_name,
plan.tags,
plan.aggregated_tags,
)
};

futures.push_back(one_resp);
}

let resp = futures.collect::<Vec<_>>().await;
let resp = resp.into_iter().collect::<Result<Vec<_>>>()?;
let resp = resp.into_iter().flatten().collect();

Ok(resp)
}
}
Loading
Loading