From 43a70fb4fa3462eeb760ba5b671c708e20a22aa4 Mon Sep 17 00:00:00 2001 From: Jiacai Liu Date: Mon, 30 Jan 2023 16:38:44 +0800 Subject: [PATCH] feat: implement prom remote storage api (#578) * feat: implement prom remote storage api * translate req to sql * remote read pass * remote write pass * add more docs * sort labels before write * add testcase for remote prom handler --- Cargo.lock | 117 +++-- server/Cargo.toml | 1 + server/src/grpc/mod.rs | 2 +- server/src/grpc/storage_service/mod.rs | 8 +- server/src/grpc/storage_service/write.rs | 248 +++++---- server/src/handlers/mod.rs | 1 + server/src/handlers/prom.rs | 618 +++++++++++++++++++++++ server/src/handlers/sql.rs | 34 +- server/src/http.rs | 51 +- server/src/mysql/worker.rs | 4 +- server/src/server.rs | 5 + 11 files changed, 911 insertions(+), 178 deletions(-) create mode 100644 server/src/handlers/prom.rs diff --git a/Cargo.lock b/Cargo.lock index f1baf1bc66..2eb89531b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -95,7 +95,7 @@ dependencies = [ "datafusion", "env_logger", "ethbloom", - "futures 0.3.21", + "futures 0.3.25", "lazy_static", "log", "lru", @@ -249,7 +249,7 @@ dependencies = [ "chrono", "either", "fallible-streaming-iterator", - "futures 0.3.21", + "futures 0.3.25", "hash_hasher", "num-traits", "parquet2", @@ -298,9 +298,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.57" +version = "0.1.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76464446b8bc32758d7e88ee1a804d9914cd9b1cb264c029899680b0be29826f" +checksum = "705339e0e4a9690e2908d2b3d049d85682cf19fbd5782494498fbf7003a6a282" dependencies = [ "proc-macro2", "quote", @@ -445,7 +445,7 @@ dependencies = [ "common_util", "criterion", "env_logger", - "futures 0.3.21", + "futures 0.3.25", "log", "object_store 1.0.0-alpha02", "parquet", @@ -856,7 +856,7 @@ dependencies = [ "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=12024e7f5c18cca7e40461f51b275c824dd37851)", "common_types 0.4.0", "dashmap 5.4.0", - "futures 0.3.21", + "futures 0.3.25", "tokio 1.24.1", "tonic", ] @@ -1527,7 +1527,7 @@ dependencies = [ "datafusion-physical-expr", "datafusion-row", "datafusion-sql", - "futures 0.3.21", + "futures 0.3.25", "glob", "hashbrown", "itertools", @@ -2115,9 +2115,9 @@ checksum = "3a471a38ef8ed83cd6e40aa59c1ffe17db6855c18e3604d9c4ed8c08ebc28678" [[package]] name = "futures" -version = "0.3.21" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e" +checksum = "38390104763dc37a5145a53c29c63c1290b5d316d6086ec32c293f6736051bb0" dependencies = [ "futures-channel", "futures-core", @@ -2130,9 +2130,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.24" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30bdd20c28fadd505d0fd6712cdfcb0d4b5648baf45faef7f852afb2399bb050" +checksum = "52ba265a92256105f45b719605a571ffe2d1f0fea3807304b522c1d778f79eed" dependencies = [ "futures-core", "futures-sink", @@ -2140,9 +2140,9 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.24" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e5aa3de05362c3fb88de6531e6296e85cde7739cccad4b9dfeeb7f6ebce56bf" +checksum = "04909a7a7e4633ae6c4a9ab280aeb86da1236243a77b694a49eacd659a4bd3ac" [[package]] name = "futures-cpupool" @@ -2156,9 +2156,9 @@ dependencies = [ [[package]] name = "futures-executor" -version = "0.3.21" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6" +checksum = "7acc85df6714c176ab5edf386123fafe217be88c0840ec11f199441134a074e2" dependencies = [ "futures-core", "futures-task", @@ -2167,15 +2167,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.24" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbf4d2a7a308fd4578637c0b17c7e1c7ba127b8f6ba00b29f717e9655d85eb68" +checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb" [[package]] name = "futures-macro" -version = "0.3.24" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42cd15d1c7456c04dbdf7e88bcd69760d74f3a798d6444e16974b505b0e62f17" +checksum = "bdfb8ce053d86b91919aad980c220b1fb8401a9394410e1c289ed7e66b61835d" dependencies = [ "proc-macro2", "quote", @@ -2184,21 +2184,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.24" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21b20ba5a92e727ba30e72834706623d94ac93a725410b6a6b6fbc1b07f7ba56" +checksum = "39c15cf1a4aa79df40f1bb462fb39676d0ad9e366c2a33b590d7c66f4f81fcf9" [[package]] name = "futures-task" -version = "0.3.24" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6508c467c73851293f390476d4491cf4d227dbabcd4170f3bb6044959b294f1" +checksum = "2ffb393ac5d9a6eaa9d3fdf37ae2776656b706e200c8e16b1bdb227f5198e6ea" [[package]] name = "futures-util" -version = "0.3.24" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44fb6cb1be61cc1d2e43b262516aafcf63b241cffdb1d3fa115f91d9c7b09c90" +checksum = "197676987abd2f9cadff84926f410af1c183608d36641465df73ae8211dc65d6" dependencies = [ "futures-channel", "futures-core", @@ -3180,7 +3180,7 @@ dependencies = [ "async-trait", "chrono", "common_util", - "futures 0.3.21", + "futures 0.3.25", "log", "rskafka", "serde", @@ -3198,7 +3198,7 @@ dependencies = [ "ceresdbproto 0.1.0 (git+https://github.com/CeresDB/ceresdbproto.git?rev=9430fe7d321955878ca06c431b115d73640f5d8f)", "common_types 1.0.0-alpha02", "common_util", - "futures 0.3.21", + "futures 0.3.25", "log", "prost", "rand 0.7.3", @@ -3673,7 +3673,7 @@ dependencies = [ "async-trait", "bytes 1.2.1", "chrono", - "futures 0.3.21", + "futures 0.3.25", "itertools", "parking_lot 0.12.1", "percent-encoding 2.1.0", @@ -3694,7 +3694,7 @@ dependencies = [ "clru", "common_util", "crc", - "futures 0.3.21", + "futures 0.3.25", "lazy_static", "log", "lru", @@ -3958,7 +3958,7 @@ dependencies = [ "bytes 1.2.1", "chrono", "flate2", - "futures 0.3.21", + "futures 0.3.25", "hashbrown", "lz4", "num", @@ -3987,7 +3987,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1a672c84c3e5b5eb6530286b2d22cc1ea8e1e3560e4c314218d6ab749c6db99" dependencies = [ "async-trait", - "futures 0.3.21", + "futures 0.3.25", "integer-encoding 3.0.4", ] @@ -3999,7 +3999,7 @@ checksum = "73fd2690ad041f9296876daef1f2706f6347073bdbcc719090887f1691e4a09d" dependencies = [ "async-stream", "bitpacking", - "futures 0.3.21", + "futures 0.3.25", "parquet-format-async-temp", "streaming-decompression", ] @@ -4263,6 +4263,20 @@ dependencies = [ "tempfile", ] +[[package]] +name = "prom-remote-api" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbef7d8c450f6570e12606546fd941696561ed97a07a353e4131f4fc715da09f" +dependencies = [ + "async-trait", + "futures 0.3.25", + "prost", + "prost-build", + "snap", + "warp", +] + [[package]] name = "prometheus" version = "0.7.0" @@ -4320,9 +4334,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.11.0" +version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "399c3c31cdec40583bb68f0b18403400d01ec4289c383aa047560439952c4dd7" +checksum = "21dc42e00223fc37204bd4aa177e69420c604ca4a183209a8f9de30c6d934698" dependencies = [ "bytes 1.2.1", "prost-derive", @@ -4330,9 +4344,9 @@ dependencies = [ [[package]] name = "prost-build" -version = "0.11.1" +version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f835c582e6bd972ba8347313300219fed5bfa52caf175298d860b61ff6069bb" +checksum = "a3f8ad728fb08fe212df3c05169e940fbb6d9d16a877ddde14644a983ba2012e" dependencies = [ "bytes 1.2.1", "heck 0.4.0", @@ -4341,18 +4355,20 @@ dependencies = [ "log", "multimap", "petgraph", + "prettyplease", "prost", "prost-types", "regex", + "syn", "tempfile", "which", ] [[package]] name = "prost-derive" -version = "0.11.0" +version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7345d5f0e08c0536d7ac7229952590239e77abf0a0100a1b1d890add6ea96364" +checksum = "8bda8c0881ea9f722eb9629376db3d0b903b462477c1aafcb0566610ac28ac5d" dependencies = [ "anyhow", "itertools", @@ -4363,9 +4379,9 @@ dependencies = [ [[package]] name = "prost-types" -version = "0.11.1" +version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dfaa718ad76a44b3415e6c4d53b17c8f99160dcb3a99b10470fce8ad43f6e3e" +checksum = "a5e0526209433e96d83d750dd81a99118edbc55739e7e61a46764fd2ad537788" dependencies = [ "bytes 1.2.1", "prost", @@ -4468,7 +4484,7 @@ dependencies = [ "datafusion", "datafusion-expr", "df_operator", - "futures 0.3.21", + "futures 0.3.25", "log", "serde", "serde_derive", @@ -4834,7 +4850,7 @@ dependencies = [ "clru", "common_types 1.0.0-alpha02", "common_util", - "futures 0.3.21", + "futures 0.3.25", "log", "proto 1.0.0-alpha02", "router", @@ -4987,7 +5003,7 @@ dependencies = [ "chrono", "crc32c", "flate2", - "futures 0.3.21", + "futures 0.3.25", "integer-encoding 3.0.4", "lz4", "parking_lot 0.12.1", @@ -5300,7 +5316,7 @@ dependencies = [ "common_util", "datafusion", "df_operator", - "futures 0.3.21", + "futures 0.3.25", "http 0.2.8", "interpreters", "lazy_static", @@ -5310,6 +5326,7 @@ dependencies = [ "opensrv-mysql", "paste 1.0.8", "profile", + "prom-remote-api", "prometheus 0.12.0", "prometheus-static-metric", "prost", @@ -5591,9 +5608,9 @@ dependencies = [ [[package]] name = "snap" -version = "1.0.5" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45456094d1983e2ee2a18fdfebce3189fa451699d0502cb8e3b49dba5ba41451" +checksum = "5e9f0ab6ef7eb7353d9119c170a436d1bf248eea575ac42d19d12f4e34130831" [[package]] name = "snappy-sys" @@ -5874,7 +5891,7 @@ dependencies = [ "catalog", "common_types 1.0.0-alpha02", "common_util", - "futures 0.3.21", + "futures 0.3.25", "log", "prost", "proto 1.0.0-alpha02", @@ -5897,7 +5914,7 @@ dependencies = [ "datafusion-proto", "df_operator", "env_logger", - "futures 0.3.21", + "futures 0.3.25", "itertools", "log", "parquet", @@ -6445,7 +6462,7 @@ dependencies = [ "common_types 1.0.0-alpha02", "common_util", "env_logger", - "futures 0.3.21", + "futures 0.3.25", "object_store 1.0.0-alpha02", "parquet", "parquet_ext", @@ -6851,7 +6868,7 @@ dependencies = [ "common_types 1.0.0-alpha02", "common_util", "env_logger", - "futures 0.3.21", + "futures 0.3.25", "log", "message_queue", "prost", diff --git a/server/Cargo.toml b/server/Cargo.toml index 9fad2b34c2..46f10689c0 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -33,6 +33,7 @@ meta_client = { workspace = true } opensrv-mysql = "0.1.0" paste = { workspace = true } profile = { workspace = true } +prom-remote-api = { version = "0.2.1", features = ["warp"] } prometheus = { workspace = true } prometheus-static-metric = { workspace = true } prost = { workspace = true } diff --git a/server/src/grpc/mod.rs b/server/src/grpc/mod.rs index 2b0e250001..f94835f38a 100644 --- a/server/src/grpc/mod.rs +++ b/server/src/grpc/mod.rs @@ -47,7 +47,7 @@ pub mod forward; mod meta_event_service; mod metrics; mod remote_engine_service; -mod storage_service; +pub(crate) mod storage_service; #[derive(Debug, Snafu)] pub enum Error { diff --git a/server/src/grpc/storage_service/mod.rs b/server/src/grpc/storage_service/mod.rs index 336beeaadd..7c2175ffea 100644 --- a/server/src/grpc/storage_service/mod.rs +++ b/server/src/grpc/storage_service/mod.rs @@ -52,7 +52,7 @@ pub(crate) mod error; mod prom_query; mod query; mod route; -mod write; +pub(crate) mod write; const STREAM_QUERY_CHANNEL_LEN: usize = 20; @@ -491,11 +491,11 @@ impl StorageService for StorageServiceImpl { /// Create CreateTablePlan from a write metric. // The caller must ENSURE that the HandlerContext's schema_config is not None. -pub fn write_metric_to_create_table_plan( - ctx: &HandlerContext, +pub fn write_metric_to_create_table_plan( + schema_config: Option<&SchemaConfig>, write_metric: &WriteMetric, ) -> Result { - let schema_config = ctx.schema_config.unwrap(); + let schema_config = schema_config.unwrap(); Ok(CreateTablePlan { engine: schema_config.default_engine_type.clone(), if_not_exists: true, diff --git a/server/src/grpc/storage_service/write.rs b/server/src/grpc/storage_service/write.rs index 976cfc30cc..fcc720a019 100644 --- a/server/src/grpc/storage_service/write.rs +++ b/server/src/grpc/storage_service/write.rs @@ -8,6 +8,7 @@ use std::{ }; use ceresdbproto::storage::{value, WriteEntry, WriteMetric, WriteRequest, WriteResponse}; +use cluster::config::SchemaConfig; use common_types::{ bytes::Bytes, datum::{Datum, DatumKind}, @@ -24,10 +25,13 @@ use snafu::{ensure, OptionExt, ResultExt}; use sql::plan::{InsertPlan, Plan}; use table_engine::table::TableRef; -use crate::grpc::storage_service::{ - self, - error::{self, ErrNoCause, ErrWithCause, Result}, - HandlerContext, +use crate::{ + grpc::storage_service::{ + self, + error::{self, ErrNoCause, ErrWithCause, Result}, + HandlerContext, + }, + instance::InstanceRef, }; pub(crate) async fn handle_write( @@ -37,11 +41,12 @@ pub(crate) async fn handle_write( let request_id = RequestId::next_id(); let begin_instant = Instant::now(); let deadline = ctx.timeout.map(|t| begin_instant + t); - + let catalog = ctx.catalog(); + let schema = ctx.schema(); debug!( "Grpc handle write begin, catalog:{}, schema:{}, request_id:{}, first_table:{:?}, num_tables:{}", - ctx.catalog(), - ctx.schema(), + catalog, + schema, request_id, req.metrics .first() @@ -49,58 +54,28 @@ pub(crate) async fn handle_write( req.metrics.len(), ); - let instance = &ctx.instance; - let plan_vec = write_request_to_insert_plan(ctx, req, request_id, deadline).await?; + let plan_vec = write_request_to_insert_plan( + request_id, + catalog, + schema, + ctx.instance.clone(), + req, + ctx.schema_config, + deadline, + ) + .await?; let mut success = 0; for insert_plan in plan_vec { - debug!( - "Grpc handle write table begin, table:{}, row_num:{}", - insert_plan.table.name(), - insert_plan.rows.num_rows() - ); - let plan = Plan::Insert(insert_plan); - - ctx.instance - .limiter - .try_limit(&plan) - .map_err(|e| Box::new(e) as _) - .context(ErrWithCause { - code: StatusCode::FORBIDDEN, - msg: "Insert is blocked", - })?; - - let interpreter_ctx = InterpreterContext::builder(request_id, deadline) - // Use current ctx's catalog and schema as default catalog and schema - .default_catalog_and_schema(ctx.catalog().to_string(), ctx.schema().to_string()) - .build(); - let interpreter_factory = Factory::new( - instance.query_executor.clone(), - instance.catalog_manager.clone(), - instance.table_engine.clone(), - instance.table_manipulator.clone(), - ); - let interpreter = interpreter_factory - .create(interpreter_ctx, plan) - .map_err(|e| Box::new(e) as _) - .with_context(|| ErrWithCause { - code: StatusCode::INTERNAL_SERVER_ERROR, - msg: "Failed to create interpreter", - })?; - - let row_num = match interpreter - .execute() - .await - .map_err(|e| Box::new(e) as _) - .context(ErrWithCause { - code: StatusCode::INTERNAL_SERVER_ERROR, - msg: "failed to execute interpreter", - })? { - Output::AffectedRows(n) => n, - _ => unreachable!(), - }; - - success += row_num; + success += execute_plan( + request_id, + catalog, + schema, + ctx.instance.clone(), + insert_plan, + deadline, + ) + .await?; } let resp = WriteResponse { @@ -111,32 +86,102 @@ pub(crate) async fn handle_write( debug!( "Grpc handle write finished, catalog:{}, schema:{}, resp:{:?}", - ctx.catalog(), - ctx.schema(), - resp + catalog, schema, resp ); Ok(resp) } -async fn write_request_to_insert_plan( - ctx: &HandlerContext<'_, Q>, - write_request: WriteRequest, +pub async fn execute_plan( + request_id: RequestId, + catalog: &str, + schema: &str, + instance: InstanceRef, + insert_plan: InsertPlan, + deadline: Option, +) -> Result { + debug!( + "Grpc handle write table begin, table:{}, row_num:{}", + insert_plan.table.name(), + insert_plan.rows.num_rows() + ); + let plan = Plan::Insert(insert_plan); + + instance + .limiter + .try_limit(&plan) + .map_err(|e| Box::new(e) as _) + .context(ErrWithCause { + code: StatusCode::FORBIDDEN, + msg: "Insert is blocked", + })?; + + let interpreter_ctx = InterpreterContext::builder(request_id, deadline) + // Use current ctx's catalog and schema as default catalog and schema + .default_catalog_and_schema(catalog.to_string(), schema.to_string()) + .build(); + let interpreter_factory = Factory::new( + instance.query_executor.clone(), + instance.catalog_manager.clone(), + instance.table_engine.clone(), + instance.table_manipulator.clone(), + ); + let interpreter = interpreter_factory + .create(interpreter_ctx, plan) + .map_err(|e| Box::new(e) as _) + .with_context(|| ErrWithCause { + code: StatusCode::INTERNAL_SERVER_ERROR, + msg: "Failed to create interpreter", + })?; + + interpreter + .execute() + .await + .map_err(|e| Box::new(e) as _) + .context(ErrWithCause { + code: StatusCode::INTERNAL_SERVER_ERROR, + msg: "failed to execute interpreter", + }) + .and_then(|output| match output { + Output::AffectedRows(n) => Ok(n), + Output::Records(_) => ErrNoCause { + code: StatusCode::INTERNAL_SERVER_ERROR, + msg: "Invalid output type, expect AffectedRows, found Records", + } + .fail(), + }) +} + +pub async fn write_request_to_insert_plan( request_id: RequestId, + catalog: &str, + schema: &str, + instance: InstanceRef, + write_request: WriteRequest, + schema_config: Option<&SchemaConfig>, deadline: Option, ) -> Result> { let mut plan_vec = Vec::with_capacity(write_request.metrics.len()); for write_metric in write_request.metrics { let table_name = &write_metric.metric; - let mut table = try_get_table(ctx, table_name)?; + let mut table = try_get_table(catalog, schema, instance.clone(), table_name)?; if table.is_none() { - if let Some(config) = ctx.schema_config { + if let Some(config) = schema_config { if config.auto_create_tables { - create_table(ctx, &write_metric, request_id, deadline).await?; + create_table( + request_id, + catalog, + schema, + instance.clone(), + &write_metric, + schema_config, + deadline, + ) + .await?; // try to get table again - table = try_get_table(ctx, table_name)?; + table = try_get_table(catalog, schema, instance.clone(), table_name)?; } } } @@ -149,11 +194,7 @@ async fn write_request_to_insert_plan( None => { return ErrNoCause { code: StatusCode::BAD_REQUEST, - msg: format!( - "Table not found, schema:{}, table:{}", - ctx.schema(), - table_name - ), + msg: format!("Table not found, schema:{}, table:{}", schema, table_name), } .fail(); } @@ -164,30 +205,32 @@ async fn write_request_to_insert_plan( } fn try_get_table( - ctx: &HandlerContext<'_, Q>, + catalog: &str, + schema: &str, + instance: InstanceRef, table_name: &str, ) -> Result> { - ctx.instance + instance .catalog_manager - .catalog_by_name(ctx.catalog()) + .catalog_by_name(catalog) .map_err(|e| Box::new(e) as _) .with_context(|| ErrWithCause { code: StatusCode::INTERNAL_SERVER_ERROR, - msg: format!("Failed to find catalog, catalog_name:{}", ctx.catalog()), + msg: format!("Failed to find catalog, catalog_name:{}", catalog), })? .with_context(|| ErrNoCause { code: StatusCode::BAD_REQUEST, - msg: format!("Catalog not found, catalog_name:{}", ctx.catalog()), + msg: format!("Catalog not found, catalog_name:{}", catalog), })? - .schema_by_name(ctx.schema()) + .schema_by_name(schema) .map_err(|e| Box::new(e) as _) .with_context(|| ErrWithCause { code: StatusCode::INTERNAL_SERVER_ERROR, - msg: format!("Failed to find schema, schema_name:{}", ctx.schema()), + msg: format!("Failed to find schema, schema_name:{}", schema), })? .with_context(|| ErrNoCause { code: StatusCode::BAD_REQUEST, - msg: format!("Schema not found, schema_name:{}", ctx.schema()), + msg: format!("Schema not found, schema_name:{}", schema), })? .table_by_name(table_name) .map_err(|e| Box::new(e) as _) @@ -198,20 +241,24 @@ fn try_get_table( } async fn create_table( - ctx: &HandlerContext<'_, Q>, - write_metric: &WriteMetric, request_id: RequestId, + catalog: &str, + schema: &str, + instance: InstanceRef, + write_metric: &WriteMetric, + schema_config: Option<&SchemaConfig>, deadline: Option, ) -> Result<()> { - let create_table_plan = storage_service::write_metric_to_create_table_plan(ctx, write_metric) - .map_err(|e| Box::new(e) as _) - .with_context(|| ErrWithCause { - code: StatusCode::INTERNAL_SERVER_ERROR, - msg: format!( - "Failed to build creating table plan from metric, table:{}", - write_metric.metric - ), - })?; + let create_table_plan = + storage_service::write_metric_to_create_table_plan(schema_config, write_metric) + .map_err(|e| Box::new(e) as _) + .with_context(|| ErrWithCause { + code: StatusCode::INTERNAL_SERVER_ERROR, + msg: format!( + "Failed to build creating table plan from metric, table:{}", + write_metric.metric + ), + })?; debug!( "Grpc handle create table begin, table:{}, schema:{:?}", @@ -219,8 +266,6 @@ async fn create_table( ); let plan = Plan::Create(create_table_plan); - let instance = &ctx.instance; - instance .limiter .try_limit(&plan) @@ -232,7 +277,7 @@ async fn create_table( let interpreter_ctx = InterpreterContext::builder(request_id, deadline) // Use current ctx's catalog and schema as default catalog and schema - .default_catalog_and_schema(ctx.catalog().to_string(), ctx.schema().to_string()) + .default_catalog_and_schema(catalog.to_string(), schema.to_string()) .build(); let interpreter_factory = Factory::new( instance.query_executor.clone(), @@ -248,19 +293,22 @@ async fn create_table( msg: "Failed to create interpreter", })?; - let _ = match interpreter + interpreter .execute() .await .map_err(|e| Box::new(e) as _) .context(ErrWithCause { code: StatusCode::INTERNAL_SERVER_ERROR, msg: "failed to execute interpreter", - })? { - Output::AffectedRows(n) => n, - _ => unreachable!(), - }; - - Ok(()) + }) + .and_then(|output| match output { + Output::AffectedRows(_) => Ok(()), + Output::Records(_) => ErrNoCause { + code: StatusCode::INTERNAL_SERVER_ERROR, + msg: "Invalid output type, expect AffectedRows, found Records", + } + .fail(), + }) } fn write_metric_to_insert_plan(table: TableRef, write_metric: WriteMetric) -> Result { diff --git a/server/src/handlers/mod.rs b/server/src/handlers/mod.rs index e695b3b610..4735096514 100644 --- a/server/src/handlers/mod.rs +++ b/server/src/handlers/mod.rs @@ -4,6 +4,7 @@ pub mod admin; pub mod error; +pub mod prom; pub mod sql; mod prelude { diff --git a/server/src/handlers/prom.rs b/server/src/handlers/prom.rs new file mode 100644 index 0000000000..9d873c7c0a --- /dev/null +++ b/server/src/handlers/prom.rs @@ -0,0 +1,618 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +//! This module implements prometheus remote storage API. +//! It converts write request to gRPC write request, and +//! translates query request to SQL for execution. + +use std::{collections::HashMap, time::Instant}; + +use async_trait::async_trait; +use ceresdbproto::storage::{ + value, Field, FieldGroup, Tag, Value, WriteEntry, WriteMetric, WriteRequest as WriteRequestPb, +}; +use common_types::{ + datum::DatumKind, + request_id::RequestId, + schema::{RecordSchema, TIMESTAMP_COLUMN, TSID_COLUMN}, +}; +use interpreters::interpreter::Output; +use log::debug; +use prom_remote_api::types::{ + label_matcher, Label, LabelMatcher, Query, QueryResult, RemoteStorage, Sample, TimeSeries, + WriteRequest, +}; +use query_engine::executor::{Executor as QueryExecutor, RecordBatchVec}; +use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu}; +use warp::reject; + +use crate::{ + context::RequestContext, handlers, instance::InstanceRef, + schema_config_provider::SchemaConfigProviderRef, +}; + +#[derive(Debug, Snafu)] +pub enum Error { + #[snafu(display("Metric name is not found.\nBacktrace:\n{}", backtrace))] + MissingName { backtrace: Backtrace }, + + #[snafu(display("Invalid matcher type, value:{}.\nBacktrace:\n{}", value, backtrace))] + InvalidMatcherType { value: i32, backtrace: Backtrace }, + + #[snafu(display("Read response must be Rows.\nBacktrace:\n{}", backtrace))] + ResponseMustRows { backtrace: Backtrace }, + + #[snafu(display("TSID column is missing in query response.\nBacktrace:\n{}", backtrace))] + MissingTSID { backtrace: Backtrace }, + + #[snafu(display( + "Timestamp column is missing in query response.\nBacktrace:\n{}", + backtrace + ))] + MissingTimestamp { backtrace: Backtrace }, + + #[snafu(display( + "Value column is missing in query response.\nBacktrace:\n{}", + backtrace + ))] + MissingValue { backtrace: Backtrace }, + + #[snafu(display("Handle sql failed, err:{}.", source))] + SqlHandle { + source: Box, + }, + + #[snafu(display("Tsid must be u64, current:{}.\nBacktrace:\n{}", kind, backtrace))] + TsidMustU64 { + kind: DatumKind, + backtrace: Backtrace, + }, + + #[snafu(display("Timestamp wrong type, current:{}.\nBacktrace:\n{}", kind, backtrace))] + MustTimestamp { + kind: DatumKind, + backtrace: Backtrace, + }, + + #[snafu(display( + "Value must be f64 compatible type, current:{}.\nBacktrace:\n{}", + kind, + backtrace + ))] + F64Castable { + kind: DatumKind, + backtrace: Backtrace, + }, + + #[snafu(display( + "Tag must be string type, current:{}.\nBacktrace:\n{}", + kind, + backtrace + ))] + TagMustString { + kind: DatumKind, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to write via gRPC, source:{}.", source))] + GRPCWriteError { + source: crate::grpc::storage_service::error::Error, + }, + + #[snafu(display("Failed to get schema, source:{}.", source))] + SchemaError { + source: crate::schema_config_provider::Error, + }, +} + +define_result!(Error); + +impl reject::Reject for Error {} + +const NAME_LABEL: &str = "__name__"; +const VALUE_COLUMN: &str = "value"; + +pub struct CeresDBStorage { + instance: InstanceRef, + schema_config_provider: SchemaConfigProviderRef, +} + +impl CeresDBStorage { + pub fn new(instance: InstanceRef, schema_config_provider: SchemaConfigProviderRef) -> Self { + Self { + instance, + schema_config_provider, + } + } +} + +impl CeresDBStorage { + /// Separate metric from labels, and sort labels by name. + fn normalize_labels(mut labels: Vec