diff --git a/Cargo.lock b/Cargo.lock index 81b0fcfa41..95b429a73a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -96,7 +96,7 @@ dependencies = [ "atomic_enum", "base64 0.13.1", "bytes_ext", - "ceresdbproto", + "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)", "codec", "common_types", "datafusion", @@ -1303,7 +1303,7 @@ checksum = "8ef195bacb1ca0eb02d6a0562b09852941d01de2b962c7066c922115fab7dcb7" dependencies = [ "arrow 38.0.0", "async-trait", - "ceresdbproto", + "ceresdbproto 1.0.23 (registry+https://github.com/rust-lang/crates.io-index)", "dashmap 5.4.0", "futures 0.3.28", "paste 1.0.12", @@ -1341,6 +1341,18 @@ dependencies = [ "walkdir", ] +[[package]] +name = "ceresdbproto" +version = "1.0.23" +source = "git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05#2c60e0591b6066957c80e7d6ae97cf53ccd591e1" +dependencies = [ + "prost", + "protoc-bin-vendored", + "tonic 0.8.3", + "tonic-build", + "walkdir", +] + [[package]] name = "cexpr" version = "0.6.0" @@ -1515,7 +1527,7 @@ dependencies = [ "async-trait", "bytes_ext", "catalog", - "ceresdbproto", + "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)", "common_types", "etcd-client", "future_ext", @@ -1593,7 +1605,7 @@ dependencies = [ "arrow 43.0.0", "arrow_ext", "bytes_ext", - "ceresdbproto", + "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)", "chrono", "datafusion", "hash_ext", @@ -1606,6 +1618,7 @@ dependencies = [ "serde_json", "snafu 0.6.10", "sqlparser", + "uuid", ] [[package]] @@ -2348,7 +2361,7 @@ dependencies = [ "async-recursion", "async-trait", "catalog", - "ceresdbproto", + "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)", "common_types", "datafusion", "datafusion-proto", @@ -3902,7 +3915,7 @@ name = "meta_client" version = "1.2.6-alpha" dependencies = [ "async-trait", - "ceresdbproto", + "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)", "common_types", "futures 0.3.28", "generic_error", @@ -4427,7 +4440,7 @@ version = "1.2.6-alpha" dependencies = [ "async-trait", "bytes", - "ceresdbproto", + "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)", "chrono", "clru", "crc", @@ -5304,7 +5317,7 @@ dependencies = [ "async-trait", "bytes", "catalog", - "ceresdbproto", + "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)", "clru", "cluster", "common_types", @@ -5431,7 +5444,7 @@ dependencies = [ "arrow 43.0.0", "async-trait", "catalog", - "ceresdbproto", + "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)", "cluster", "codec", "common_types", @@ -5742,7 +5755,7 @@ version = "1.2.6-alpha" dependencies = [ "arrow_ext", "async-trait", - "ceresdbproto", + "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)", "common_types", "futures 0.3.28", "generic_error", @@ -5871,7 +5884,7 @@ name = "router" version = "1.2.6-alpha" dependencies = [ "async-trait", - "ceresdbproto", + "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)", "cluster", "common_types", "generic_error", @@ -6246,7 +6259,7 @@ dependencies = [ "async-trait", "bytes_ext", "catalog", - "ceresdbproto", + "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)", "clru", "cluster", "common_types", @@ -6772,7 +6785,7 @@ dependencies = [ "async-trait", "bytes_ext", "catalog", - "ceresdbproto", + "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)", "codec", "common_types", "futures 0.3.28", @@ -6794,7 +6807,7 @@ dependencies = [ "arrow_ext", "async-trait", "bytes_ext", - "ceresdbproto", + "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)", "common_types", "datafusion", "datafusion-proto", @@ -6997,7 +7010,7 @@ dependencies = [ name = "time_ext" version = "1.2.6-alpha" dependencies = [ - "ceresdbproto", + "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)", "chrono", "common_types", "macros", @@ -7509,8 +7522,8 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 0.1.10", - "rand 0.3.23", + "cfg-if 1.0.0", + "rand 0.8.5", "static_assertions", ] @@ -7587,9 +7600,9 @@ checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" [[package]] name = "uuid" -version = "1.3.3" +version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "345444e32442451b267fc254ae85a209c64be56d2890e601a0c37ff0c3c5ecd2" +checksum = "5e395fcf16a7a3d8127ec99782007af141946b4795001f876d54fb0d55978560" dependencies = [ "getrandom 0.2.8", "rand 0.8.5", @@ -7598,9 +7611,9 @@ dependencies = [ [[package]] name = "uuid-macro-internal" -version = "1.3.3" +version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f67b459f42af2e6e1ee213cb9da4dbd022d3320788c3fb3e1b893093f1e45da" +checksum = "f49e7f3f3db8040a100710a11932239fd30697115e2ba4107080d8252939845e" dependencies = [ "proc-macro2", "quote", @@ -7649,7 +7662,7 @@ version = "1.2.6-alpha" dependencies = [ "async-trait", "bytes_ext", - "ceresdbproto", + "ceresdbproto 1.0.23 (git+https://github.com/CeresDB/horaedbproto.git?rev=2c60e05)", "chrono", "codec", "common_types", diff --git a/Cargo.toml b/Cargo.toml index 01af922e94..50875c3cff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -94,7 +94,7 @@ bytes = "1" bytes_ext = { path = "components/bytes_ext" } catalog = { path = "catalog" } catalog_impls = { path = "catalog_impls" } -ceresdbproto = "1.0.23" +ceresdbproto = { git = "https://github.com/CeresDB/horaedbproto.git", rev = "2c60e05" } codec = { path = "components/codec" } chrono = "0.4" clap = "3.0" @@ -183,6 +183,7 @@ tokio = { version = "1.29", features = ["full"] } wal = { path = "src/wal" } xorfilter-rs = { git = "https://github.com/CeresDB/xorfilter", rev = "ac8ef01" } zstd = { version = "0.12", default-features = false } +uuid = "1.6.1" regex = "1" # This profile optimizes for good runtime performance. diff --git a/analytic_engine/src/compaction/scheduler.rs b/analytic_engine/src/compaction/scheduler.rs index dea77022d2..1496ef514c 100644 --- a/analytic_engine/src/compaction/scheduler.rs +++ b/analytic_engine/src/compaction/scheduler.rs @@ -523,7 +523,7 @@ impl ScheduleWorker { } let res = space_store .compact_table( - request_id, + request_id.clone(), &table_data, &compaction_task, scan_options, diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index 35dcca8caf..81ac53d7a8 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -291,7 +291,7 @@ impl FlushTask { // Start flush duration timer. let local_metrics = self.table_data.metrics.local_flush_metrics(); let _timer = local_metrics.start_flush_timer(); - self.dump_memtables(request_id, &mems_to_flush, flush_req.need_reorder) + self.dump_memtables(request_id.clone(), &mems_to_flush, flush_req.need_reorder) .await .box_err() .context(FlushJobWithCause { @@ -421,7 +421,7 @@ impl FlushTask { if let Some(sampling_mem) = &mems_to_flush.sampling_mem { if let Some(seq) = self .dump_sampling_memtable( - request_id, + request_id.clone(), sampling_mem, &mut files_to_level0, need_reorder, @@ -436,7 +436,7 @@ impl FlushTask { } } for mem in &mems_to_flush.memtables { - let file = self.dump_normal_memtable(request_id, mem).await?; + let file = self.dump_normal_memtable(request_id.clone(), mem).await?; if let Some(file) = file { let sst_size = file.size; files_to_level0.push(AddFile { @@ -565,6 +565,7 @@ impl FlushTask { let store = self.space_store.clone(); let storage_format_hint = self.table_data.table_options().storage_format_hint; let sst_write_options = sst_write_options.clone(); + let request_id = request_id.clone(); // spawn build sst let handler = self.runtime.spawn(async move { @@ -785,7 +786,7 @@ impl SpaceStore { } for files in task.expired() { - self.delete_expired_files(table_data, request_id, files, &mut edit_meta); + self.delete_expired_files(table_data, &request_id, files, &mut edit_meta); } info!( @@ -798,7 +799,7 @@ impl SpaceStore { for input in inputs { self.compact_input_files( - request_id, + request_id.clone(), table_data, input, scan_options.clone(), @@ -874,7 +875,7 @@ impl SpaceStore { info!( "Instance try to compact table, table:{}, table_id:{}, request_id:{}, input_files:{:?}", - table_data.name, table_data.id, request_id, input.files, + table_data.name, table_data.id, &request_id, input.files, ); // The schema may be modified during compaction, so we acquire it first and use @@ -905,6 +906,7 @@ impl SpaceStore { let space_id = table_data.space_id; let table_id = table_data.id; let sequence = table_data.last_sequence(); + let request_id = request_id.clone(); let mut builder = MergeBuilder::new(MergeConfig { request_id, metrics_collector: None, @@ -933,7 +935,7 @@ impl SpaceStore { let record_batch_stream = if table_options.need_dedup() { row_iter::record_batch_with_key_iter_to_stream(DedupIterator::new( - request_id, + request_id.clone(), merge_iter, iter_options, )) @@ -978,7 +980,7 @@ impl SpaceStore { })?; let sst_info = sst_writer - .write(request_id, &sst_meta, record_batch_stream) + .write(request_id.clone(), &sst_meta, record_batch_stream) .await .box_err() .with_context(|| WriteSst { @@ -1038,7 +1040,7 @@ impl SpaceStore { pub(crate) fn delete_expired_files( &self, table_data: &TableData, - request_id: RequestId, + request_id: &RequestId, expired: &ExpiredFiles, edit_meta: &mut VersionEditMeta, ) { diff --git a/analytic_engine/src/instance/read.rs b/analytic_engine/src/instance/read.rs index 251d8168d3..09f261fd18 100644 --- a/analytic_engine/src/instance/read.rs +++ b/analytic_engine/src/instance/read.rs @@ -204,7 +204,7 @@ impl Instance { .metrics_collector .span(format!("{MERGE_ITER_METRICS_COLLECTOR_NAME_PREFIX}_{idx}")); let merge_config = MergeConfig { - request_id: request.request_id, + request_id: request.request_id.clone(), metrics_collector: Some(metrics_collector), deadline: request.opts.deadline, space_id: table_data.space_id, @@ -230,7 +230,7 @@ impl Instance { table: &table_data.name, })?; let dedup_iter = - DedupIterator::new(request.request_id, merge_iter, iter_options.clone()); + DedupIterator::new(request.request_id.clone(), merge_iter, iter_options.clone()); iters.push(dedup_iter); } @@ -263,7 +263,7 @@ impl Instance { .metrics_collector .span(format!("{CHAIN_ITER_METRICS_COLLECTOR_NAME_PREFIX}_{idx}")); let chain_config = ChainConfig { - request_id: request.request_id, + request_id: request.request_id.clone(), metrics_collector: Some(metrics_collector), deadline: request.opts.deadline, num_streams_to_prefetch: self.scan_options.num_streams_to_prefetch, diff --git a/benchmarks/src/merge_memtable_bench.rs b/benchmarks/src/merge_memtable_bench.rs index 3167673485..1ca655e50b 100644 --- a/benchmarks/src/merge_memtable_bench.rs +++ b/benchmarks/src/merge_memtable_bench.rs @@ -155,7 +155,7 @@ impl MergeMemTableBench { let request_id = RequestId::next_id(); let store_picker: ObjectStorePickerRef = Arc::new(self.store.clone()); let mut builder = MergeBuilder::new(MergeConfig { - request_id, + request_id: request_id.clone(), metrics_collector: None, deadline: None, space_id, @@ -181,7 +181,8 @@ impl MergeMemTableBench { let mut batch_num = 0; if self.dedup { - let mut dedup_iter = DedupIterator::new(request_id, merge_iter, iter_options); + let mut dedup_iter = + DedupIterator::new(request_id.clone(), merge_iter, iter_options); while let Some(batch) = dedup_iter.next_batch().await.unwrap() { let num_rows = batch.num_rows(); total_rows += num_rows; diff --git a/benchmarks/src/merge_sst_bench.rs b/benchmarks/src/merge_sst_bench.rs index 47d80ac83b..434f452b70 100644 --- a/benchmarks/src/merge_sst_bench.rs +++ b/benchmarks/src/merge_sst_bench.rs @@ -143,7 +143,7 @@ impl MergeSstBench { let request_id = RequestId::next_id(); let store_picker: ObjectStorePickerRef = Arc::new(self.store.clone()); let mut builder = MergeBuilder::new(MergeConfig { - request_id, + request_id: request_id.clone(), metrics_collector: None, deadline: None, space_id, diff --git a/benchmarks/src/sst_tools.rs b/benchmarks/src/sst_tools.rs index 1d4ca8044b..b9d41cbe83 100644 --- a/benchmarks/src/sst_tools.rs +++ b/benchmarks/src/sst_tools.rs @@ -247,6 +247,7 @@ pub async fn merge_sst(config: MergeSstConfig, runtime: Arc) { let space_id = config.space_id; let table_id = config.table_id; let sequence = max_sequence + 1; + let request_id = request_id.clone(); let mut builder = MergeBuilder::new(MergeConfig { request_id, @@ -272,7 +273,7 @@ pub async fn merge_sst(config: MergeSstConfig, runtime: Arc) { }; let record_batch_stream = if config.dedup { - let iter = DedupIterator::new(request_id, iter, iter_options); + let iter = DedupIterator::new(request_id.clone(), iter, iter_options); row_iter::record_batch_with_key_iter_to_stream(iter) } else { row_iter::record_batch_with_key_iter_to_stream(iter) diff --git a/common_types/Cargo.toml b/common_types/Cargo.toml index d0727a082b..c676ce9a14 100644 --- a/common_types/Cargo.toml +++ b/common_types/Cargo.toml @@ -48,3 +48,4 @@ serde = { workspace = true } serde_json = { workspace = true } snafu = { workspace = true } sqlparser = { workspace = true } +uuid = { workspace = true, features = ["fast-rng"] } diff --git a/common_types/src/request_id.rs b/common_types/src/request_id.rs index 9b9b6be98e..04fc10d35f 100644 --- a/common_types/src/request_id.rs +++ b/common_types/src/request_id.rs @@ -14,27 +14,21 @@ //! Request id. -use std::{ - fmt, - sync::atomic::{AtomicU64, Ordering}, -}; +use std::fmt; -#[derive(Debug, Clone, Copy)] -pub struct RequestId(u64); +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct RequestId(String); impl RequestId { /// Acquire next request id. pub fn next_id() -> Self { - static NEXT_ID: AtomicU64 = AtomicU64::new(1); - - let id = NEXT_ID.fetch_add(1, Ordering::Relaxed); - + let id = uuid::Uuid::new_v4().to_string(); Self(id) } #[inline] - pub fn as_u64(&self) -> u64 { - self.0 + pub fn as_str(&self) -> &str { + &self.0 } } @@ -44,12 +38,18 @@ impl fmt::Display for RequestId { } } -impl From for RequestId { - fn from(id: u64) -> Self { +impl From for RequestId { + fn from(id: String) -> Self { Self(id) } } +impl From<&str> for RequestId { + fn from(id: &str) -> Self { + Self(id.to_string()) + } +} + #[cfg(test)] mod tests { use super::*; @@ -57,10 +57,8 @@ mod tests { #[test] fn test_request_id() { let id = RequestId::next_id(); - assert_eq!(1, id.0); - let id = RequestId::next_id(); - assert_eq!(2, id.0); + let id2 = RequestId::next_id(); - assert_eq!("2", id.to_string()); + assert_ne!(id, id2); } } diff --git a/components/logger/src/lib.rs b/components/logger/src/lib.rs index d3780021f1..1dc78b0cb8 100644 --- a/components/logger/src/lib.rs +++ b/components/logger/src/lib.rs @@ -467,7 +467,7 @@ pub fn init_test_logger() { /// Timer for collecting slow query #[derive(Debug)] pub struct SlowTimer<'a> { - request_id: u64, + request_id: &'a str, sql: &'a str, slow_threshold: Duration, start_time: Instant, @@ -488,7 +488,7 @@ impl<'a> Drop for SlowTimer<'a> { } impl<'a> SlowTimer<'a> { - pub fn new(request_id: u64, sql: &'a str, threshold: Duration) -> SlowTimer { + pub fn new(request_id: &'a str, sql: &'a str, threshold: Duration) -> SlowTimer<'a> { SlowTimer { request_id, sql, diff --git a/df_engine_extensions/src/dist_sql_query/test_util.rs b/df_engine_extensions/src/dist_sql_query/test_util.rs index 9a1e6d3740..77584a9fca 100644 --- a/df_engine_extensions/src/dist_sql_query/test_util.rs +++ b/df_engine_extensions/src/dist_sql_query/test_util.rs @@ -199,7 +199,7 @@ impl TestContext { .extract_time_range(&test_schema, &logical_filters) .build(); let read_request = ReadRequest { - request_id: 42.into(), + request_id: "42".into(), opts: ReadOptions::default(), projected_schema, predicate, @@ -424,7 +424,7 @@ impl ExecutableScanBuilder for MockScanBuilder { ctx: TableScanContext, ) -> datafusion::error::Result> { let request = ReadRequest { - request_id: RequestId::from(42), + request_id: RequestId::from("test"), opts: ReadOptions { batch_size: ctx.batch_size, read_parallelism: ctx.read_parallelism, diff --git a/interpreters/src/context.rs b/interpreters/src/context.rs index cbc64612db..34e76d44c9 100644 --- a/interpreters/src/context.rs +++ b/interpreters/src/context.rs @@ -52,7 +52,7 @@ impl Context { /// Create a new context of query executor pub fn new_query_context(&self) -> Result { let ctx = QueryContext { - request_id: self.request_id, + request_id: self.request_id.clone(), deadline: self.deadline, default_catalog: self.default_catalog.clone(), default_schema: self.default_schema.clone(), @@ -72,7 +72,7 @@ impl Context { #[inline] pub fn request_id(&self) -> RequestId { - self.request_id + self.request_id.clone() } #[inline] diff --git a/proxy/src/grpc/prom_query.rs b/proxy/src/grpc/prom_query.rs index 70094ba943..a600c62cc4 100644 --- a/proxy/src/grpc/prom_query.rs +++ b/proxy/src/grpc/prom_query.rs @@ -93,7 +93,7 @@ impl Proxy { }; let frontend = Frontend::new(provider, self.instance.dyn_config.fronted.clone()); - let mut sql_ctx = SqlContext::new(request_id, deadline); + let mut sql_ctx = SqlContext::new(request_id.clone(), deadline); let expr = frontend .parse_promql(&mut sql_ctx, req.expr) .box_err() @@ -125,7 +125,7 @@ impl Proxy { })?; let output = self - .execute_plan(request_id, catalog, &schema, plan, deadline) + .execute_plan(request_id.clone(), catalog, &schema, plan, deadline) .await .box_err() .with_context(|| ErrWithCause { diff --git a/proxy/src/http/prom.rs b/proxy/src/http/prom.rs index 68190174a3..2fc520f43d 100644 --- a/proxy/src/http/prom.rs +++ b/proxy/src/http/prom.rs @@ -117,7 +117,7 @@ impl Proxy { metric: String, query: Query, ) -> Result { - let request_id = ctx.request_id; + let request_id = &ctx.request_id; let begin_instant = Instant::now(); let deadline = ctx.timeout.map(|t| begin_instant + t); info!("Handle prom remote query begin, ctx:{ctx:?}, metric:{metric}, request:{query:?}"); @@ -133,7 +133,7 @@ impl Proxy { function_registry: &*self.instance.function_registry, }; let frontend = Frontend::new(provider, self.instance.dyn_config.fronted.clone()); - let plan_ctx = Context::new(request_id, deadline); + let plan_ctx = Context::new(request_id.clone(), deadline); let RemoteQueryPlan { plan, @@ -156,7 +156,13 @@ impl Proxy { msg: "Query is blocked", })?; let output = self - .execute_plan(request_id, &ctx.catalog, &ctx.schema, plan, deadline) + .execute_plan( + request_id.clone(), + &ctx.catalog, + &ctx.schema, + plan, + deadline, + ) .await?; let cost = begin_instant.saturating_elapsed().as_millis(); diff --git a/proxy/src/influxdb/mod.rs b/proxy/src/influxdb/mod.rs index ec61cbb743..292bf1a93f 100644 --- a/proxy/src/influxdb/mod.rs +++ b/proxy/src/influxdb/mod.rs @@ -137,7 +137,7 @@ impl Proxy { function_registry: &*self.instance.function_registry, }; let frontend = Frontend::new(provider, self.instance.dyn_config.fronted.clone()); - let sql_ctx = SqlContext::new(request_id, deadline); + let sql_ctx = SqlContext::new(request_id.clone(), deadline); let mut stmts = frontend .parse_influxql(&sql_ctx, &req.query) @@ -180,7 +180,13 @@ impl Proxy { msg: "Query is blocked", })?; let output = self - .execute_plan(request_id, &ctx.catalog, &ctx.schema, plan, deadline) + .execute_plan( + request_id.clone(), + &ctx.catalog, + &ctx.schema, + plan, + deadline, + ) .await?; info!( diff --git a/proxy/src/read.rs b/proxy/src/read.rs index 9102be916b..ba9b33c7d6 100644 --- a/proxy/src/read.rs +++ b/proxy/src/read.rs @@ -161,14 +161,14 @@ impl Proxy { sql: &str, enable_partition_table_access: bool, ) -> Result { - let request_id = ctx.request_id; + let request_id = &ctx.request_id; let slow_threshold_secs = self .instance() .dyn_config .slow_threshold .load(std::sync::atomic::Ordering::Relaxed); let slow_threshold = Duration::from_secs(slow_threshold_secs); - let slow_timer = SlowTimer::new(ctx.request_id.as_u64(), sql, slow_threshold); + let slow_timer = SlowTimer::new(request_id.as_str(), sql, slow_threshold); let deadline = ctx.timeout.map(|t| slow_timer.start_time() + t); let catalog = self.instance.catalog_manager.default_catalog_name(); @@ -185,7 +185,7 @@ impl Proxy { }; let frontend = Frontend::new(provider, instance.dyn_config.fronted.clone()); - let mut sql_ctx = SqlContext::new(request_id, deadline); + let mut sql_ctx = SqlContext::new(request_id.clone(), deadline); // Parse sql, frontend error of invalid sql already contains sql // TODO(yingwen): Maybe move sql from frontend error to outer error let mut stmts = frontend @@ -236,10 +236,16 @@ impl Proxy { } let output = if enable_partition_table_access { - self.execute_plan_involving_partition_table(request_id, catalog, schema, plan, deadline) - .await + self.execute_plan_involving_partition_table( + request_id.clone(), + catalog, + schema, + plan, + deadline, + ) + .await } else { - self.execute_plan(request_id, catalog, schema, plan, deadline) + self.execute_plan(request_id.clone(), catalog, schema, plan, deadline) .await }; let output = output.box_err().with_context(|| ErrWithCause { diff --git a/proxy/src/write.rs b/proxy/src/write.rs index c455078ddf..991173205e 100644 --- a/proxy/src/write.rs +++ b/proxy/src/write.rs @@ -102,13 +102,13 @@ impl Proxy { ctx: Context, req: WriteRequest, ) -> Result { - let request_id = ctx.request_id; + let request_id = &ctx.request_id; let write_context = req.context.clone().context(ErrNoCause { msg: "Missing context", code: StatusCode::BAD_REQUEST, })?; - self.handle_auto_create_table_with_meta(request_id, &write_context.database, &req) + self.handle_auto_create_table_with_meta(request_id.clone(), &write_context.database, &req) .await?; let (write_request_to_local, write_requests_to_forward) = @@ -121,7 +121,7 @@ impl Proxy { .await; // Write to local. - self.collect_write_to_local_future(&mut futures, ctx, request_id, write_request_to_local) + self.collect_write_to_local_future(&mut futures, ctx, write_request_to_local) .await; self.collect_write_response(futures).await @@ -136,7 +136,7 @@ impl Proxy { ctx: Context, req: WriteRequest, ) -> Result { - let request_id = ctx.request_id; + let request_id = &ctx.request_id; let write_context = req.context.clone().context(ErrNoCause { msg: "Missing context", code: StatusCode::BAD_REQUEST, @@ -152,14 +152,14 @@ impl Proxy { // Create table. self.handle_auto_create_table_without_meta( - request_id, + request_id.clone(), &write_request_to_local, &write_context.database, ) .await?; // Write to local. - self.collect_write_to_local_future(&mut futures, ctx, request_id, write_request_to_local) + self.collect_write_to_local_future(&mut futures, ctx, write_request_to_local) .await; self.collect_write_response(futures).await @@ -197,7 +197,7 @@ impl Proxy { continue; } self.create_table( - request_id, + request_id.clone(), self.instance.catalog_manager.default_catalog_name(), schema, write_table_req, @@ -238,7 +238,7 @@ impl Proxy { let table = self.try_get_table(catalog, schema, table_name)?; if table.is_none() { self.create_table( - request_id, + request_id.clone(), catalog, schema, &write_request.table_requests[idx], @@ -268,7 +268,7 @@ impl Proxy { function_registry: &*self.instance.function_registry, }; let frontend = Frontend::new(provider, self.instance.dyn_config.fronted.clone()); - let ctx = FrontendContext::new(request_id, deadline); + let ctx = FrontendContext::new(request_id.clone(), deadline); let plan = frontend .write_req_to_plan(&ctx, schema_config, write_table_req) .box_err() @@ -385,15 +385,13 @@ impl Proxy { &'a self, futures: &mut WriteResponseFutures<'a>, ctx: Context, - request_id: RequestId, write_request: WriteRequest, ) { if write_request.table_requests.is_empty() { return; } - let local_handle = - async move { Ok(self.write_to_local(ctx, request_id, write_request).await) }; + let local_handle = async move { Ok(self.write_to_local(ctx, write_request).await) }; futures.push(local_handle.boxed()); } @@ -473,12 +471,8 @@ impl Proxy { } } - async fn write_to_local( - &self, - ctx: Context, - request_id: RequestId, - req: WriteRequest, - ) -> Result { + async fn write_to_local(&self, ctx: Context, req: WriteRequest) -> Result { + let request_id = ctx.request_id; let begin_instant = Instant::now(); let deadline = ctx.timeout.map(|t| begin_instant + t); let catalog_name = self.instance.catalog_manager.default_catalog_name(); @@ -497,7 +491,7 @@ impl Proxy { ); let write_context = WriteContext { - request_id, + request_id: request_id.clone(), deadline, catalog: catalog_name.to_string(), schema: schema_name.clone(), @@ -513,7 +507,7 @@ impl Proxy { let table = insert_plan.table.clone(); match self .execute_insert_plan( - request_id, + request_id.clone(), catalog_name, &schema_name, insert_plan, @@ -580,7 +574,7 @@ impl Proxy { let columns = find_new_columns(&table_schema, &write_table_req)?; if !columns.is_empty() { self.execute_add_columns_plan( - request_id, + request_id.clone(), &catalog, &schema, table.clone(), @@ -693,7 +687,7 @@ impl Proxy { operations: AlterTableOperation::AddColumn(columns), }); let _ = self - .execute_plan(request_id, catalog, schema, plan, deadline) + .execute_plan(request_id.clone(), catalog, schema, plan, deadline) .await?; info!("Add columns success, request_id:{request_id}, table:{table_name}"); diff --git a/query_engine/src/datafusion_impl/mod.rs b/query_engine/src/datafusion_impl/mod.rs index d4109d3219..ed8d963ffe 100644 --- a/query_engine/src/datafusion_impl/mod.rs +++ b/query_engine/src/datafusion_impl/mod.rs @@ -140,7 +140,7 @@ impl DfContextBuilder { .deadline .map(|deadline| deadline.duration_since(Instant::now()).as_millis() as u64); let ceresdb_options = CeresdbOptions { - request_id: ctx.request_id.as_u64(), + request_id: ctx.request_id.clone().to_string(), request_timeout: timeout, default_catalog: ctx.default_catalog.clone(), default_schema: ctx.default_schema.clone(), diff --git a/query_engine/src/datafusion_impl/task_context.rs b/query_engine/src/datafusion_impl/task_context.rs index b5eb7856cd..5946f3d22b 100644 --- a/query_engine/src/datafusion_impl/task_context.rs +++ b/query_engine/src/datafusion_impl/task_context.rs @@ -184,7 +184,7 @@ impl RemotePhysicalPlanExecutor for RemotePhysicalPlanExecutorImpl { .get::(); assert!(ceresdb_options.is_some()); let ceresdb_options = ceresdb_options.unwrap(); - let request_id = RequestId::from(ceresdb_options.request_id); + let request_id = RequestId::from(ceresdb_options.request_id.clone()); let deadline = ceresdb_options .request_timeout .map(|n| Instant::now() + Duration::from_millis(n)); @@ -251,7 +251,7 @@ struct DistQueryResolverBuilder { impl DistQueryResolverBuilder { fn build(&self, ctx: &Context) -> Resolver { let scan_builder = Box::new(ExecutableScanBuilderImpl { - request_id: ctx.request_id, + request_id: ctx.request_id.clone(), deadline: ctx.deadline, }); @@ -284,7 +284,7 @@ impl ExecutableScanBuilder for ExecutableScanBuilderImpl { }; let read_request = ReadRequest { - request_id: self.request_id, + request_id: self.request_id.clone(), opts: read_opts, projected_schema: ctx.projected_schema, predicate: ctx.predicate, diff --git a/query_frontend/src/frontend.rs b/query_frontend/src/frontend.rs index 016df62eb7..7208cfbedf 100644 --- a/query_frontend/src/frontend.rs +++ b/query_frontend/src/frontend.rs @@ -144,7 +144,7 @@ impl Frontend

{ pub fn statement_to_plan(&self, ctx: &Context, stmt: Statement) -> Result { let planner = Planner::new( &self.provider, - ctx.request_id, + ctx.request_id.clone(), ctx.read_parallelism, self.dyn_config.as_ref(), ); @@ -160,7 +160,7 @@ impl Frontend

{ ) -> Result<(Plan, Arc)> { let planner = Planner::new( &self.provider, - ctx.request_id, + ctx.request_id.clone(), ctx.read_parallelism, self.dyn_config.as_ref(), ); @@ -176,7 +176,7 @@ impl Frontend

{ ) -> Result { let planner = Planner::new( &self.provider, - ctx.request_id, + ctx.request_id.clone(), ctx.read_parallelism, self.dyn_config.as_ref(), ); @@ -186,7 +186,7 @@ impl Frontend

{ pub fn influxql_stmt_to_plan(&self, ctx: &Context, stmt: InfluxqlStatement) -> Result { let planner = Planner::new( &self.provider, - ctx.request_id, + ctx.request_id.clone(), ctx.read_parallelism, self.dyn_config.as_ref(), ); @@ -201,7 +201,7 @@ impl Frontend

{ ) -> Result { let planner = Planner::new( &self.provider, - ctx.request_id, + ctx.request_id.clone(), ctx.read_parallelism, self.dyn_config.as_ref(), ); diff --git a/server/src/grpc/remote_engine_service/mod.rs b/server/src/grpc/remote_engine_service/mod.rs index 7fb3e48aec..b0abb62822 100644 --- a/server/src/grpc/remote_engine_service/mod.rs +++ b/server/src/grpc/remote_engine_service/mod.rs @@ -133,7 +133,7 @@ struct ExecutePlanMetricCollector { } impl ExecutePlanMetricCollector { - fn new(request_id: u64, query: String, slow_threshold_secs: u64) -> Self { + fn new(request_id: String, query: String, slow_threshold_secs: u64) -> Self { Self { start: Instant::now(), query, @@ -616,12 +616,12 @@ impl RemoteEngineServiceImpl { .load(std::sync::atomic::Ordering::Relaxed); let metric = ExecutePlanMetricCollector::new( - ctx.request_id, + ctx.request_id_str.clone(), ctx.displayable_query, slow_threshold_secs, ); let query_ctx = create_query_ctx( - ctx.request_id, + ctx.request_id_str, ctx.default_catalog, ctx.default_schema, ctx.timeout_ms, @@ -661,12 +661,12 @@ impl RemoteEngineServiceImpl { .slow_threshold .load(std::sync::atomic::Ordering::Relaxed); let metric = ExecutePlanMetricCollector::new( - ctx.request_id, + ctx.request_id_str.clone(), ctx.displayable_query, slow_threshold_secs, ); let query_ctx = create_query_ctx( - ctx.request_id, + ctx.request_id_str, ctx.default_catalog, ctx.default_schema, ctx.timeout_ms, @@ -890,7 +890,7 @@ async fn handle_stream_read( msg: "fail to convert read request", })?; - let request_id = read_request.request_id; + let request_id = &read_request.request_id; info!( "Handle stream read, request_id:{request_id}, table:{table_ident:?}, read_options:{:?}, predicate:{:?} ", read_request.opts, @@ -1097,7 +1097,7 @@ fn extract_plan_from_req(request: ExecutePlanRequest) -> Result<(ExecContext, Ve } fn create_query_ctx( - request_id: u64, + request_id: String, default_catalog: String, default_schema: String, timeout_ms: i64, diff --git a/table_engine/src/provider.rs b/table_engine/src/provider.rs index a8b3e7310f..75cd58b9bf 100644 --- a/table_engine/src/provider.rs +++ b/table_engine/src/provider.rs @@ -51,7 +51,7 @@ const SCAN_TABLE_METRICS_COLLECTOR_NAME: &str = "scan_table"; #[derive(Clone, Debug)] pub struct CeresdbOptions { - pub request_id: u64, + pub request_id: String, pub request_timeout: Option, pub default_schema: String, pub default_catalog: String, @@ -76,13 +76,7 @@ impl ExtensionOptions for CeresdbOptions { fn set(&mut self, key: &str, value: &str) -> Result<()> { match key { - "request_id" => { - self.request_id = value.parse::().map_err(|e| { - DataFusionError::External( - format!("could not parse request_id, input:{value}, err:{e:?}").into(), - ) - })? - } + "request_id" => self.request_id = value.to_string(), "request_timeout" => { self.request_timeout = Some(value.parse::().map_err(|e| { DataFusionError::External( @@ -182,7 +176,7 @@ impl TableProviderAdapter { let ceresdb_options = state.config_options().extensions.get::(); assert!(ceresdb_options.is_some()); let ceresdb_options = ceresdb_options.unwrap(); - let request_id = RequestId::from(ceresdb_options.request_id); + let request_id = RequestId::from(ceresdb_options.request_id.clone()); let deadline = ceresdb_options .request_timeout .map(|n| Instant::now() + Duration::from_millis(n)); diff --git a/table_engine/src/remote/model.rs b/table_engine/src/remote/model.rs index 842829d3c4..2fc6a297eb 100644 --- a/table_engine/src/remote/model.rs +++ b/table_engine/src/remote/model.rs @@ -463,7 +463,8 @@ impl From for ceresdbproto::remote_engine::ExecutePlanRequ }; let pb_context = ceresdbproto::remote_engine::ExecContext { - request_id: value.context.request_id.as_u64(), + request_id: 0, // not used any more + request_id_str: value.context.request_id.to_string(), default_catalog: value.context.default_catalog, default_schema: value.context.default_schema, timeout_ms: rest_duration_ms, @@ -504,7 +505,7 @@ impl TryFrom for RemoteExecuteR msg: "missing exec ctx", })?; let ceresdbproto::remote_engine::ExecContext { - request_id, + request_id_str, default_catalog, default_schema, timeout_ms, @@ -512,7 +513,7 @@ impl TryFrom for RemoteExecuteR .. } = pb_exec_ctx; - let request_id = RequestId::from(request_id); + let request_id = RequestId::from(request_id_str); let deadline = if timeout_ms >= 0 { Some(Instant::now() + Duration::from_millis(timeout_ms as u64)) } else { diff --git a/table_engine/src/table.rs b/table_engine/src/table.rs index 62f7c3437a..4f3e854f49 100644 --- a/table_engine/src/table.rs +++ b/table_engine/src/table.rs @@ -437,7 +437,7 @@ impl TryFrom for ceresdbproto::remote_engine::TableReadRequest { })?; Ok(Self { - request_id: request.request_id.as_u64(), + request_id: 0, // this field not used any more opts: Some(request.opts.into()), projected_schema: Some(request.projected_schema.into()), predicate: Some(predicate_pb), @@ -465,7 +465,7 @@ impl TryFrom for ReadRequest { .context(ConvertPredicate)?, ); Ok(Self { - request_id: pb.request_id.into(), + request_id: pb.request_id.to_string().into(), opts, projected_schema, predicate,