From 5dca0d450109af1be2d14fa6160205a1924a5584 Mon Sep 17 00:00:00 2001 From: "xikai.wxk" Date: Tue, 29 Aug 2023 17:23:21 +0800 Subject: [PATCH] chore: upgrade nightly rust to 1.72 --- analytic_engine/src/compaction/picker.rs | 4 +-- analytic_engine/src/compaction/scheduler.rs | 2 +- analytic_engine/src/instance/engine.rs | 6 ----- analytic_engine/src/instance/wal_replayer.rs | 4 +-- analytic_engine/src/manifest/details.rs | 4 +-- analytic_engine/src/setup.rs | 2 +- analytic_engine/src/sst/file.rs | 2 +- .../src/sst/parquet/async_reader.rs | 3 +-- analytic_engine/src/table/version.rs | 16 +++++------ analytic_engine/src/table_options.rs | 2 +- analytic_engine/src/tests/util.rs | 2 +- benchmarks/src/merge_memtable_bench.rs | 2 +- benchmarks/src/merge_sst_bench.rs | 4 +-- benchmarks/src/sst_tools.rs | 2 +- cluster/src/shard_lock_manager.rs | 2 +- cluster/src/topology.rs | 2 +- common_types/src/bitset.rs | 8 ++++-- common_types/src/column_block.rs | 2 +- common_types/src/column_schema.rs | 4 +-- components/codec/src/columnar/bool.rs | 2 +- components/codec/src/columnar/bytes.rs | 2 +- components/codec/src/columnar/mod.rs | 4 +-- components/future_ext/src/cancel.rs | 2 +- components/object_store/src/disk_cache.rs | 8 +++--- components/object_store/src/obkv/mod.rs | 4 +-- components/object_store/src/prefix.rs | 2 +- proxy/src/grpc/prom_query.rs | 27 +++++++++---------- proxy/src/http/prom.rs | 4 +-- proxy/src/influxdb/mod.rs | 6 ++--- proxy/src/influxdb/types.rs | 2 +- proxy/src/limiter.rs | 12 +++------ proxy/src/read.rs | 2 +- proxy/src/write.rs | 4 +-- .../physical_plan_extension/prom_align.rs | 2 +- query_frontend/src/container.rs | 4 +-- query_frontend/src/frontend.rs | 20 +++++--------- query_frontend/src/lib.rs | 2 +- remote_engine_client/src/client.rs | 6 ++--- rust-toolchain.toml | 2 +- server/src/mysql/worker.rs | 2 +- server/src/postgresql/handler.rs | 2 +- table_engine/src/partition/rule/key.rs | 8 +++--- tools/src/bin/sst-metadata.rs | 2 +- wal/src/message_queue_impl/region_context.rs | 5 +--- wal/src/tests/read_write.rs | 2 +- 45 files changed, 93 insertions(+), 118 deletions(-) diff --git a/analytic_engine/src/compaction/picker.rs b/analytic_engine/src/compaction/picker.rs index 0616b91477..2827e19087 100644 --- a/analytic_engine/src/compaction/picker.rs +++ b/analytic_engine/src/compaction/picker.rs @@ -543,7 +543,7 @@ impl TimeWindowPicker { let (left, _) = Self::get_window_bounds_in_millis(window, ts); - let bucket_files = buckets.entry(left).or_insert_with(Vec::new); + let bucket_files = buckets.entry(left).or_default(); bucket_files.push(f.clone()); @@ -813,7 +813,7 @@ mod tests { #[test] fn test_time_window_picker() { - let picker_manager = PickerManager::default(); + let picker_manager = PickerManager; let twp = picker_manager.get_picker(CompactionStrategy::Default); let mut ctx = PickerContext { segment_duration: Duration::from_millis(1000), diff --git a/analytic_engine/src/compaction/scheduler.rs b/analytic_engine/src/compaction/scheduler.rs index a0f02756dd..88aa53c962 100644 --- a/analytic_engine/src/compaction/scheduler.rs +++ b/analytic_engine/src/compaction/scheduler.rs @@ -311,7 +311,7 @@ impl SchedulerImpl { space_store, runtime: runtime.clone(), schedule_interval: config.schedule_interval.0, - picker_manager: PickerManager::default(), + picker_manager: PickerManager, max_ongoing_tasks: config.max_ongoing_tasks, max_unflushed_duration: config.max_unflushed_duration.0, write_sst_max_buffer_size, diff --git a/analytic_engine/src/instance/engine.rs b/analytic_engine/src/instance/engine.rs index e1197684cb..c50a1201cf 100644 --- a/analytic_engine/src/instance/engine.rs +++ b/analytic_engine/src/instance/engine.rs @@ -407,12 +407,6 @@ impl Instance { let shard_id = request.shard_id; let mut table_ctxs = Vec::with_capacity(request.table_defs.len()); - // Open tables. - struct TableInfo { - name: String, - id: TableId, - } - let mut spaces_of_tables = Vec::with_capacity(request.table_defs.len()); for table_def in request.table_defs { let context = SpaceContext { diff --git a/analytic_engine/src/instance/wal_replayer.rs b/analytic_engine/src/instance/wal_replayer.rs index 3dd91ed7fc..6dabd7a4c5 100644 --- a/analytic_engine/src/instance/wal_replayer.rs +++ b/analytic_engine/src/instance/wal_replayer.rs @@ -217,7 +217,7 @@ impl TableBasedReplay { loop { // fetch entries to log_entry_buf let _timer = PULL_LOGS_DURATION_HISTOGRAM.start_timer(); - let decoder = WalDecoder::default(); + let decoder = WalDecoder; log_entry_buf = log_iter .next_log_entries(decoder, log_entry_buf) .await @@ -309,7 +309,7 @@ impl RegionBasedReplay { // Split and replay logs. loop { let _timer = PULL_LOGS_DURATION_HISTOGRAM.start_timer(); - let decoder = WalDecoder::default(); + let decoder = WalDecoder; log_entry_buf = log_iter .next_log_entries(decoder, log_entry_buf) .await diff --git a/analytic_engine/src/manifest/details.rs b/analytic_engine/src/manifest/details.rs index 672c4e1231..bc18f27015 100644 --- a/analytic_engine/src/manifest/details.rs +++ b/analytic_engine/src/manifest/details.rs @@ -595,8 +595,8 @@ struct ObjectStoreBasedSnapshotStore { } impl ObjectStoreBasedSnapshotStore { - const CURRENT_SNAPSHOT_NAME: &str = "current"; - const SNAPSHOT_PATH_PREFIX: &str = "manifest/snapshot"; + const CURRENT_SNAPSHOT_NAME: &'static str = "current"; + const SNAPSHOT_PATH_PREFIX: &'static str = "manifest/snapshot"; pub fn new(space_id: SpaceId, table_id: TableId, store: ObjectStoreRef) -> Self { let snapshot_path = Self::snapshot_path(space_id, table_id); diff --git a/analytic_engine/src/setup.rs b/analytic_engine/src/setup.rs index 580a509673..cab2975f56 100644 --- a/analytic_engine/src/setup.rs +++ b/analytic_engine/src/setup.rs @@ -429,7 +429,7 @@ async fn open_instance( manifest_storages, wal_manager, store_picker, - Arc::new(FactoryImpl::default()), + Arc::new(FactoryImpl), ) .await .context(OpenInstance)?; diff --git a/analytic_engine/src/sst/file.rs b/analytic_engine/src/sst/file.rs index 52e7f7b969..9cb8194268 100644 --- a/analytic_engine/src/sst/file.rs +++ b/analytic_engine/src/sst/file.rs @@ -369,7 +369,7 @@ struct FileHandleSet { impl FileHandleSet { fn latest(&self) -> Option { - if let Some(file) = self.file_map.values().rev().next() { + if let Some(file) = self.file_map.values().next_back() { return Some(file.clone()); } None diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index b9f00de55b..aa968efd40 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -778,8 +778,7 @@ mod tests { impl MockRandomSenders { fn start_to_send(&mut self) { - while !self.tx_group.is_empty() { - let tx = self.tx_group.pop().unwrap(); + while let Some(tx) = self.tx_group.pop() { let test_data = self.test_datas.pop().unwrap(); tokio::spawn(async move { for datum in test_data { diff --git a/analytic_engine/src/table/version.rs b/analytic_engine/src/table/version.rs index 22c7963bef..f884b82726 100644 --- a/analytic_engine/src/table/version.rs +++ b/analytic_engine/src/table/version.rs @@ -931,9 +931,9 @@ mod tests { #[test] fn test_table_version_sampling() { - let memtable = MemTableMocker::default().build(); + let memtable = MemTableMocker.build(); test_table_version_sampling_with_memtable(memtable); - let memtable = MemTableMocker::default().build_columnar(); + let memtable = MemTableMocker.build_columnar(); test_table_version_sampling_with_memtable(memtable); } @@ -975,9 +975,9 @@ mod tests { #[test] fn test_table_version_sampling_switch() { - let memtable = MemTableMocker::default().build(); + let memtable = MemTableMocker.build(); test_table_version_sampling_switch_with_memtable(memtable); - let memtable = MemTableMocker::default().build_columnar(); + let memtable = MemTableMocker.build_columnar(); test_table_version_sampling_switch_with_memtable(memtable); } @@ -1027,7 +1027,7 @@ mod tests { fn test_table_version_sampling_freeze() { let version = new_table_version(); - let memtable = MemTableMocker::default().build(); + let memtable = MemTableMocker.build(); let schema = memtable.schema().clone(); let memtable_id1 = 1; @@ -1063,7 +1063,7 @@ mod tests { assert_eq!(memtable_id1, read_view.sampling_mem.as_ref().unwrap().id); assert!(read_view.sampling_mem.unwrap().freezed); - let memtable = MemTableMocker::default().build(); + let memtable = MemTableMocker.build(); let memtable_id2 = 2; let mem_state = MemTableState { mem: memtable, @@ -1110,7 +1110,7 @@ mod tests { fn test_table_version_sampling_apply_edit() { let version = new_table_version(); - let memtable = MemTableMocker::default().build(); + let memtable = MemTableMocker.build(); let memtable_id1 = 1; let sampling_mem = SamplingMemTable::new(memtable, memtable_id1); @@ -1124,7 +1124,7 @@ mod tests { TimeRange::bucket_of(now, table_options::DEFAULT_SEGMENT_DURATION).unwrap(); // Prepare mutable memtable. - let memtable = MemTableMocker::default().build(); + let memtable = MemTableMocker.build(); let memtable_id2 = 2; let mem_state = MemTableState { mem: memtable, diff --git a/analytic_engine/src/table_options.rs b/analytic_engine/src/table_options.rs index 0c8d48a3e5..e9226b32f2 100644 --- a/analytic_engine/src/table_options.rs +++ b/analytic_engine/src/table_options.rs @@ -427,7 +427,7 @@ impl TableOptions { SEGMENT_DURATION.to_string(), self.segment_duration .map(|v| v.to_string()) - .unwrap_or_else(String::new), + .unwrap_or_default(), ), (UPDATE_MODE.to_string(), self.update_mode.to_string()), (ENABLE_TTL.to_string(), self.enable_ttl.to_string()), diff --git a/analytic_engine/src/tests/util.rs b/analytic_engine/src/tests/util.rs index 449effedf8..84eb553b28 100644 --- a/analytic_engine/src/tests/util.rs +++ b/analytic_engine/src/tests/util.rs @@ -617,7 +617,7 @@ impl EngineBuildContext for RocksDBEngineBuildContext { type WalsOpener = RocksDBWalsOpener; fn wals_opener(&self) -> Self::WalsOpener { - RocksDBWalsOpener::default() + RocksDBWalsOpener } fn config(&self) -> Config { diff --git a/benchmarks/src/merge_memtable_bench.rs b/benchmarks/src/merge_memtable_bench.rs index 4464505cde..d582f467e3 100644 --- a/benchmarks/src/merge_memtable_bench.rs +++ b/benchmarks/src/merge_memtable_bench.rs @@ -146,7 +146,7 @@ impl MergeMemTableBench { let table_id = self.table_id; let sequence = u64::MAX; let projected_schema = self.projected_schema.clone(); - let sst_factory: SstFactoryRef = Arc::new(FactoryImpl::default()); + let sst_factory: SstFactoryRef = Arc::new(FactoryImpl); let iter_options = IterOptions { batch_size: self.sst_read_options.num_rows_per_row_group, }; diff --git a/benchmarks/src/merge_sst_bench.rs b/benchmarks/src/merge_sst_bench.rs index 563d12ad37..ed862447f3 100644 --- a/benchmarks/src/merge_sst_bench.rs +++ b/benchmarks/src/merge_sst_bench.rs @@ -133,7 +133,7 @@ impl MergeSstBench { let table_id = self.table_id; let sequence = u64::MAX; let projected_schema = self.sst_read_options.projected_schema.clone(); - let sst_factory: SstFactoryRef = Arc::new(FactoryImpl::default()); + let sst_factory: SstFactoryRef = Arc::new(FactoryImpl); let iter_options = IterOptions { batch_size: self.sst_read_options.num_rows_per_row_group, }; @@ -189,7 +189,7 @@ impl MergeSstBench { let space_id = self.space_id; let table_id = self.table_id; let projected_schema = self.sst_read_options.projected_schema.clone(); - let sst_factory: SstFactoryRef = Arc::new(FactoryImpl::default()); + let sst_factory: SstFactoryRef = Arc::new(FactoryImpl); let request_id = RequestId::next_id(); let store_picker: ObjectStorePickerRef = Arc::new(self.store.clone()); diff --git a/benchmarks/src/sst_tools.rs b/benchmarks/src/sst_tools.rs index 9bb382d190..e711aa1835 100644 --- a/benchmarks/src/sst_tools.rs +++ b/benchmarks/src/sst_tools.rs @@ -228,7 +228,7 @@ pub async fn merge_sst(config: MergeSstConfig, runtime: Arc) { }; let request_id = RequestId::next_id(); - let sst_factory: SstFactoryRef = Arc::new(FactoryImpl::default()); + let sst_factory: SstFactoryRef = Arc::new(FactoryImpl); let store_picker: ObjectStorePickerRef = Arc::new(store); let projected_schema = ProjectedSchema::no_projection(schema.clone()); let sst_read_options = SstReadOptions { diff --git a/cluster/src/shard_lock_manager.rs b/cluster/src/shard_lock_manager.rs index 8a7bb0c00b..7c661394f6 100644 --- a/cluster/src/shard_lock_manager.rs +++ b/cluster/src/shard_lock_manager.rs @@ -524,7 +524,7 @@ impl ShardLock { lease_id: i64, expired_at: Instant, on_lock_expired: OnExpired, - etcd_client: &mut Client, + etcd_client: &Client, runtime: &RuntimeRef, ) -> Result<()> where diff --git a/cluster/src/topology.rs b/cluster/src/topology.rs index 682ad8f65e..6e18d2d592 100644 --- a/cluster/src/topology.rs +++ b/cluster/src/topology.rs @@ -118,7 +118,7 @@ impl SchemaTopologies { self.topologies .entry(schema_name.to_string()) - .or_insert_with(Default::default) + .or_default() .update_tables(tables); true diff --git a/common_types/src/bitset.rs b/common_types/src/bitset.rs index 0ba919bce9..96c6daa2bf 100644 --- a/common_types/src/bitset.rs +++ b/common_types/src/bitset.rs @@ -298,7 +298,11 @@ mod tests { } fn iter_set_bools(bools: &[bool]) -> impl Iterator + '_ { - bools.iter().enumerate().filter_map(|(x, y)| y.then(|| x)) + bools + .iter() + .enumerate() + .filter(|&(_, y)| *y) + .map(|(x, _)| x) } #[test] @@ -347,7 +351,7 @@ mod tests { } fn make_rng() -> StdRng { - let seed = OsRng::default().next_u64(); + let seed = OsRng.next_u64(); println!("Seed: {seed}"); StdRng::seed_from_u64(seed) } diff --git a/common_types/src/column_block.rs b/common_types/src/column_block.rs index c63ab71564..6098337fa2 100644 --- a/common_types/src/column_block.rs +++ b/common_types/src/column_block.rs @@ -1033,7 +1033,7 @@ macro_rules! define_column_block_builder { /// Append the [DatumView] into the builder, the datum view should have same the data /// type of builder - pub fn append_view<'a>(&mut self, datum: DatumView<'a>) -> Result<()> { + pub fn append_view(&mut self, datum: DatumView<'_>) -> Result<()> { let given = datum.kind(); match self { Self::Null { rows } => match datum { diff --git a/common_types/src/column_schema.rs b/common_types/src/column_schema.rs index 9ecac77d04..691df74916 100644 --- a/common_types/src/column_schema.rs +++ b/common_types/src/column_schema.rs @@ -602,7 +602,7 @@ mod tests { #[test] fn test_valid_tag_type() { - let invalid_tag_types = vec![DatumKind::Null, DatumKind::Float, DatumKind::Double]; + let invalid_tag_types = [DatumKind::Null, DatumKind::Float, DatumKind::Double]; for v in &DatumKind::VALUES { assert_eq!( @@ -614,7 +614,7 @@ mod tests { #[test] fn test_valid_dictionary_type() { - let valid_dictionary_types = vec![DatumKind::String]; + let valid_dictionary_types = [DatumKind::String]; for v in &DatumKind::VALUES { assert_eq!( diff --git a/components/codec/src/columnar/bool.rs b/components/codec/src/columnar/bool.rs index 77aa93f368..74ba32f97b 100644 --- a/components/codec/src/columnar/bool.rs +++ b/components/codec/src/columnar/bool.rs @@ -99,7 +99,7 @@ impl Encoding { } } - fn decode(&self, buf: &mut B, f: F) -> Result<()> + fn decode(&self, buf: &B, f: F) -> Result<()> where B: Buf, F: FnMut(bool) -> Result<()>, diff --git a/components/codec/src/columnar/bytes.rs b/components/codec/src/columnar/bytes.rs index fe553c8123..8e4d69e7b7 100644 --- a/components/codec/src/columnar/bytes.rs +++ b/components/codec/src/columnar/bytes.rs @@ -125,7 +125,7 @@ impl Encoding { } /// The layout can be referred to the docs of [`Encoding`]. - fn decode(&self, ctx: DecodeContext<'_>, buf: &mut B, f: F) -> Result<()> + fn decode(&self, ctx: DecodeContext<'_>, buf: &B, f: F) -> Result<()> where B: Buf, F: FnMut(Bytes) -> Result<()>, diff --git a/components/codec/src/columnar/mod.rs b/components/codec/src/columnar/mod.rs index 1cae4253ab..d15604f579 100644 --- a/components/codec/src/columnar/mod.rs +++ b/components/codec/src/columnar/mod.rs @@ -392,7 +392,7 @@ impl ColumnarDecoder { impl ColumnarDecoder { fn decode_with_nulls( ctx: DecodeContext<'_>, - buf: &mut B, + buf: &B, num_datums: usize, datum_kind: DatumKind, ) -> Result> { @@ -570,7 +570,7 @@ mod tests { #[test] fn test_small_int() { - let datums = vec![10u32, 1u32, 2u32, 81u32, 82u32]; + let datums = [10u32, 1u32, 2u32, 81u32, 82u32]; check_encode_end_decode( 10, diff --git a/components/future_ext/src/cancel.rs b/components/future_ext/src/cancel.rs index 1e6e6de69a..1eef43c6cf 100644 --- a/components/future_ext/src/cancel.rs +++ b/components/future_ext/src/cancel.rs @@ -62,7 +62,7 @@ where fn drop(&mut self) { if !self.done { let inner = self.inner.take().unwrap(); - let handle = self.runtime.spawn(async move { inner.await }); + let handle = self.runtime.spawn(inner); drop(handle); } } diff --git a/components/object_store/src/disk_cache.rs b/components/object_store/src/disk_cache.rs index 5efb3d03b2..525b79d39c 100644 --- a/components/object_store/src/disk_cache.rs +++ b/components/object_store/src/disk_cache.rs @@ -132,7 +132,7 @@ struct Manifest { impl Manifest { const CURRENT_VERSION: usize = 2; - const FILE_NAME: &str = "manifest.json"; + const FILE_NAME: &'static str = "manifest.json"; #[inline] fn is_valid(&self, version: usize, page_size: usize) -> bool { @@ -876,7 +876,7 @@ mod test { // remove cached values, then get again { - for range in vec![0..16, 16..32, 32..48, 48..64, 64..80, 80..96, 96..112] { + for range in [0..16, 16..32, 32..48, 48..64, 64..80, 80..96, 96..112] { let data_cache = store .inner .cache @@ -887,7 +887,7 @@ mod test { assert!(test_file_exists(&store.cache_dir, &location, &range)); } - for range in vec![16..32, 48..64, 80..96] { + for range in [16..32, 48..64, 80..96] { let mut data_cache = store .inner .cache @@ -1105,7 +1105,7 @@ mod test { .await .unwrap() }; - for range in vec![16..32, 32..48, 48..64, 64..80, 80..96, 96..112] { + for range in [16..32, 32..48, 48..64, 64..80, 80..96, 96..112] { let filename = DiskCacheStore::page_cache_name(&location, &range); let cache = store.cache.meta_cache.lock(&filename); assert!(cache.contains(&filename)); diff --git a/components/object_store/src/obkv/mod.rs b/components/object_store/src/obkv/mod.rs index 95292505bc..81c47bf8b5 100644 --- a/components/object_store/src/obkv/mod.rs +++ b/components/object_store/src/obkv/mod.rs @@ -639,7 +639,7 @@ impl ObjectStore for ObkvObjectStore { })); } - let iter = futures::stream::iter(meta_list.into_iter()); + let iter = futures::stream::iter(meta_list); debug!( "ObkvObjectStore list operation, prefix:{path}, cost:{:?}", instant.elapsed() @@ -682,7 +682,7 @@ impl ObjectStore for ObkvObjectStore { } } - let common_prefixes = Vec::from_iter(common_prefixes.into_iter()); + let common_prefixes = Vec::from_iter(common_prefixes); debug!( "ObkvObjectStore list_with_delimiter operation, prefix:{path}, cost:{:?}", instant.elapsed() diff --git a/components/object_store/src/prefix.rs b/components/object_store/src/prefix.rs index 2155f95962..3b0e11d090 100644 --- a/components/object_store/src/prefix.rs +++ b/components/object_store/src/prefix.rs @@ -348,7 +348,7 @@ mod tests { let _ = prefix_store.get(&test_filepath).await; let _ = prefix_store.get_range(&test_filepath, 0..1).await; - let _ = prefix_store.get_ranges(&test_filepath, &[0..2]).await; + let _ = prefix_store.get_ranges(&test_filepath, &[0..2; 1]).await; let meta = prefix_store.head(&test_filepath).await.unwrap(); assert!(!meta.location.as_ref().starts_with(test_prefix)); diff --git a/proxy/src/grpc/prom_query.rs b/proxy/src/grpc/prom_query.rs index 1714257df7..9e69398ebc 100644 --- a/proxy/src/grpc/prom_query.rs +++ b/proxy/src/grpc/prom_query.rs @@ -102,21 +102,18 @@ impl Proxy { msg: "Invalid request", })?; - let (plan, column_name) = - frontend - .promql_expr_to_plan(&mut sql_ctx, expr) - .map_err(|e| { - let code = if is_table_not_found_error(&e) { - StatusCode::NOT_FOUND - } else { - StatusCode::INTERNAL_SERVER_ERROR - }; - Error::ErrWithCause { - code, - msg: "Failed to create plan".to_string(), - source: Box::new(e), - } - })?; + let (plan, column_name) = frontend.promql_expr_to_plan(&sql_ctx, expr).map_err(|e| { + let code = if is_table_not_found_error(&e) { + StatusCode::NOT_FOUND + } else { + StatusCode::INTERNAL_SERVER_ERROR + }; + Error::ErrWithCause { + code, + msg: "Failed to create plan".to_string(), + source: Box::new(e), + } + })?; self.instance .limiter diff --git a/proxy/src/http/prom.rs b/proxy/src/http/prom.rs index 6f3c380d7b..50e0a1a4cb 100644 --- a/proxy/src/http/prom.rs +++ b/proxy/src/http/prom.rs @@ -133,14 +133,14 @@ impl Proxy { function_registry: &*self.instance.function_registry, }; let frontend = Frontend::new(provider); - let mut plan_ctx = Context::new(request_id, deadline); + let plan_ctx = Context::new(request_id, deadline); let RemoteQueryPlan { plan, timestamp_col_name, field_col_name, } = frontend - .prom_remote_query_to_plan(&mut plan_ctx, query.clone()) + .prom_remote_query_to_plan(&plan_ctx, query.clone()) .box_err() .with_context(|| ErrWithCause { code: StatusCode::BAD_REQUEST, diff --git a/proxy/src/influxdb/mod.rs b/proxy/src/influxdb/mod.rs index c5d102ef13..48d69822d8 100644 --- a/proxy/src/influxdb/mod.rs +++ b/proxy/src/influxdb/mod.rs @@ -137,10 +137,10 @@ impl Proxy { function_registry: &*self.instance.function_registry, }; let frontend = Frontend::new(provider); - let mut sql_ctx = SqlContext::new(request_id, deadline); + let sql_ctx = SqlContext::new(request_id, deadline); let mut stmts = frontend - .parse_influxql(&mut sql_ctx, &req.query) + .parse_influxql(&sql_ctx, &req.query) .box_err() .with_context(|| ErrWithCause { code: StatusCode::BAD_REQUEST, @@ -164,7 +164,7 @@ impl Proxy { ); let plan = frontend - .influxql_stmt_to_plan(&mut sql_ctx, stmts.remove(0)) + .influxql_stmt_to_plan(&sql_ctx, stmts.remove(0)) .box_err() .with_context(|| ErrWithCause { code: StatusCode::BAD_REQUEST, diff --git a/proxy/src/influxdb/types.rs b/proxy/src/influxdb/types.rs index 88f3d7b63c..bad6af396c 100644 --- a/proxy/src/influxdb/types.rs +++ b/proxy/src/influxdb/types.rs @@ -383,7 +383,7 @@ impl InfluxqlResultBuilder { let series = ordered_group_keys .into_iter() - .zip(self.value_groups.into_iter()) + .zip(self.value_groups) .map(|(group_key, value_group)| { let name = group_key.measurement; let tags = if group_key.group_by_tag_values.is_empty() { diff --git a/proxy/src/limiter.rs b/proxy/src/limiter.rs index e2c454ab04..45d4b05c0d 100644 --- a/proxy/src/limiter.rs +++ b/proxy/src/limiter.rs @@ -171,17 +171,11 @@ impl Limiter { } pub fn add_write_block_list(&self, block_list: Vec) { - self.write_block_list - .write() - .unwrap() - .extend(block_list.into_iter()) + self.write_block_list.write().unwrap().extend(block_list) } pub fn add_read_block_list(&self, block_list: Vec) { - self.read_block_list - .write() - .unwrap() - .extend(block_list.into_iter()) + self.read_block_list.write().unwrap().extend(block_list) } pub fn set_write_block_list(&self, block_list: Vec) { @@ -219,7 +213,7 @@ impl Limiter { } pub fn add_block_rules(&self, rules: Vec) { - self.rules.write().unwrap().extend(rules.into_iter()); + self.rules.write().unwrap().extend(rules); } pub fn remove_block_rules(&self, rules_to_remove: &[BlockRule]) { diff --git a/proxy/src/read.rs b/proxy/src/read.rs index a9b1750ea3..fcf61c880b 100644 --- a/proxy/src/read.rs +++ b/proxy/src/read.rs @@ -130,7 +130,7 @@ impl Proxy { let plan = frontend // TODO(yingwen): Check error, some error may indicate that the sql is invalid. Now we // return internal server error in those cases - .statement_to_plan(&mut sql_ctx, stmts.remove(0)) + .statement_to_plan(&sql_ctx, stmts.remove(0)) .box_err() .with_context(|| ErrWithCause { code: StatusCode::INTERNAL_SERVER_ERROR, diff --git a/proxy/src/write.rs b/proxy/src/write.rs index ba119fd662..e7d81e17a8 100644 --- a/proxy/src/write.rs +++ b/proxy/src/write.rs @@ -268,9 +268,9 @@ impl Proxy { function_registry: &*self.instance.function_registry, }; let frontend = Frontend::new(provider); - let mut ctx = FrontendContext::new(request_id, deadline); + let ctx = FrontendContext::new(request_id, deadline); let plan = frontend - .write_req_to_plan(&mut ctx, schema_config, write_table_req) + .write_req_to_plan(&ctx, schema_config, write_table_req) .box_err() .with_context(|| ErrWithCause { code: StatusCode::INTERNAL_SERVER_ERROR, diff --git a/query_engine/src/datafusion_impl/physical_plan_extension/prom_align.rs b/query_engine/src/datafusion_impl/physical_plan_extension/prom_align.rs index 7e7462245b..a6d151ea3e 100644 --- a/query_engine/src/datafusion_impl/physical_plan_extension/prom_align.rs +++ b/query_engine/src/datafusion_impl/physical_plan_extension/prom_align.rs @@ -425,7 +425,7 @@ impl PromAlignReader { })?; field_array .into_iter() - .zip(timestamp_array.into_iter()) + .zip(timestamp_array) .map(|(field, timestamp)| { Ok(Sample { value: field.with_context(|| PhysicalPlanNoCause { diff --git a/query_frontend/src/container.rs b/query_frontend/src/container.rs index 38e49c410d..be1fe3d5ed 100644 --- a/query_frontend/src/container.rs +++ b/query_frontend/src/container.rs @@ -142,9 +142,9 @@ impl TableContainer { ) { self.other_tables .entry(catalog) - .or_insert_with(HashMap::new) + .or_default() .entry(schema) - .or_insert_with(HashMap::new) + .or_default() .insert(table, resolved_table); } diff --git a/query_frontend/src/frontend.rs b/query_frontend/src/frontend.rs index 98994ed2a2..618381d108 100644 --- a/query_frontend/src/frontend.rs +++ b/query_frontend/src/frontend.rs @@ -123,11 +123,7 @@ impl

Frontend

{ } /// Parse the sql and returns the statements - pub fn parse_influxql( - &self, - _ctx: &mut Context, - influxql: &str, - ) -> Result> { + pub fn parse_influxql(&self, _ctx: &Context, influxql: &str) -> Result> { match influxql_parser::parse_statements(influxql) { Ok(stmts) => Ok(stmts), Err(e) => Err(Error::InvalidInfluxql { @@ -140,7 +136,7 @@ impl

Frontend

{ impl Frontend

{ /// Create logical plan for the statement - pub fn statement_to_plan(&self, ctx: &mut Context, stmt: Statement) -> Result { + pub fn statement_to_plan(&self, ctx: &Context, stmt: Statement) -> Result { let planner = Planner::new(&self.provider, ctx.request_id, ctx.read_parallelism); planner.statement_to_plan(stmt).context(CreatePlan) @@ -149,7 +145,7 @@ impl Frontend

{ /// Experimental native promql support, not used in production yet. pub fn promql_expr_to_plan( &self, - ctx: &mut Context, + ctx: &Context, expr: Expr, ) -> Result<(Plan, Arc)> { let planner = Planner::new(&self.provider, ctx.request_id, ctx.read_parallelism); @@ -160,25 +156,21 @@ impl Frontend

{ /// Prometheus remote query support pub fn prom_remote_query_to_plan( &self, - ctx: &mut Context, + ctx: &Context, query: PromRemoteQuery, ) -> Result { let planner = Planner::new(&self.provider, ctx.request_id, ctx.read_parallelism); planner.remote_prom_req_to_plan(query).context(CreatePlan) } - pub fn influxql_stmt_to_plan( - &self, - ctx: &mut Context, - stmt: InfluxqlStatement, - ) -> Result { + pub fn influxql_stmt_to_plan(&self, ctx: &Context, stmt: InfluxqlStatement) -> Result { let planner = Planner::new(&self.provider, ctx.request_id, ctx.read_parallelism); planner.influxql_stmt_to_plan(stmt).context(CreatePlan) } pub fn write_req_to_plan( &self, - ctx: &mut Context, + ctx: &Context, schema_config: &SchemaConfig, write_table: &WriteTableRequest, ) -> Result { diff --git a/query_frontend/src/lib.rs b/query_frontend/src/lib.rs index 85c59ccb12..4ab836621a 100644 --- a/query_frontend/src/lib.rs +++ b/query_frontend/src/lib.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(once_cell)] +#![feature(once_cell_try)] //! SQL frontend //! diff --git a/remote_engine_client/src/client.rs b/remote_engine_client/src/client.rs index 1ae1fc9039..3ee3c7e52b 100644 --- a/remote_engine_client/src/client.rs +++ b/remote_engine_client/src/client.rs @@ -187,12 +187,10 @@ impl Client { })?; let handle = self.io_runtime.spawn(async move { let mut rpc_client = RemoteEngineServiceClient::::new(channel); - let rpc_result = rpc_client + rpc_client .write_batch(Request::new(batch_request_pb)) .await - .box_err(); - - rpc_result + .box_err() }); remote_writes.push(handle); diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 3009106dba..c3953568f0 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -13,5 +13,5 @@ # limitations under the License. [toolchain] -channel = "nightly-2023-02-02" +channel = "nightly-2023-08-28" components = [ "rustfmt", "clippy" ] diff --git a/server/src/mysql/worker.rs b/server/src/mysql/worker.rs index 3f36fe776d..9fdcb3027b 100644 --- a/server/src/mysql/worker.rs +++ b/server/src/mysql/worker.rs @@ -38,7 +38,7 @@ where { pub fn new(proxy: Arc, timeout: Option) -> Self { Self { - generic_hold: PhantomData::default(), + generic_hold: PhantomData, proxy, timeout, } diff --git a/server/src/postgresql/handler.rs b/server/src/postgresql/handler.rs index 8743c9e7e2..1c654751f1 100644 --- a/server/src/postgresql/handler.rs +++ b/server/src/postgresql/handler.rs @@ -130,7 +130,7 @@ fn into_pg_reponse<'a>(out: Output) -> PgWireResult> { Ok(Response::Query(QueryResponse::new( schema, - stream::iter(data_rows.into_iter()), + stream::iter(data_rows), ))) } } diff --git a/table_engine/src/partition/rule/key.rs b/table_engine/src/partition/rule/key.rs index 554b0ddf71..a7171c3e5d 100644 --- a/table_engine/src/partition/rule/key.rs +++ b/table_engine/src/partition/rule/key.rs @@ -52,10 +52,10 @@ impl KeyRule { /// Above logics are preparing for implementing something like: /// fa1 && fa2 && fb = (fa1 && fb) && (fa2 && fb) /// Partitions about above expression will be calculated by following steps: - /// + partitions about "(fa1 && fb)" will be calculated first, - /// assume "partitions 1" - /// + partitions about "(fa2 && fb)" will be calculated after, - /// assume "partitions 2" + /// + partitions about "(fa1 && fb)" will be calculated first, assume + /// "partitions 1" + /// + partitions about "(fa2 && fb)" will be calculated after, assume + /// "partitions 2" /// + "total partitions" = "partitions 1" intersection "partitions 2" fn get_candidate_partition_keys_groups( &self, diff --git a/tools/src/bin/sst-metadata.rs b/tools/src/bin/sst-metadata.rs index 06dfb90e7f..c71e3f5bf2 100644 --- a/tools/src/bin/sst-metadata.rs +++ b/tools/src/bin/sst-metadata.rs @@ -216,7 +216,7 @@ async fn run(args: Args) -> Result<()> { for i in 0..fields.len() { let column_meta = row_group.column(i); let field_name = fields.get(i).unwrap().get_basic_info().name().to_string(); - let mut field_stats = field_stats_map + let field_stats = field_stats_map .entry(field_name) .or_insert(FieldStatistics::default()); field_stats.compressed_size += column_meta.compressed_size(); diff --git a/wal/src/message_queue_impl/region_context.rs b/wal/src/message_queue_impl/region_context.rs index 1de999036c..148511fa77 100644 --- a/wal/src/message_queue_impl/region_context.rs +++ b/wal/src/message_queue_impl/region_context.rs @@ -513,10 +513,7 @@ impl RegionContextBuilder { debug!("Apply region meta delta, delta:{:?}", delta); // It is likely that snapshot not exist(e.g. no table has ever flushed). - let mut table_meta = self - .table_metas - .entry(delta.table_id) - .or_insert_with(TableMetaInner::default); + let table_meta = self.table_metas.entry(delta.table_id).or_default(); table_meta.next_sequence_num = delta.sequence_num + 1; table_meta.current_high_watermark = delta.offset + 1; diff --git a/wal/src/tests/read_write.rs b/wal/src/tests/read_write.rs index 58055f8ad3..bcb2d7f41f 100644 --- a/wal/src/tests/read_write.rs +++ b/wal/src/tests/read_write.rs @@ -34,7 +34,7 @@ static INIT_LOG: Once = Once::new(); #[test] fn test_rocksdb_wal() { - let builder = RocksWalBuilder::default(); + let builder = RocksWalBuilder; test_all(builder, false); }