From 4126a747a919d09929e5f08af214f84d1cf815c6 Mon Sep 17 00:00:00 2001 From: WEI Xikai Date: Mon, 26 Jun 2023 10:11:11 +0800 Subject: [PATCH] revert: "fix: add page index for metadata (#1000)" (#1026) This reverts commit 41fe63abf1d0e9224a3c3bd72755cd9125bebc81. ## Rationale #1000 leads to some commits missing. ## Detailed Changes Revert #1000 ## Test Plan --- Cargo.lock | 2 - analytic_engine/src/compaction/mod.rs | 108 +++++++---- analytic_engine/src/compaction/picker.rs | 66 +++---- analytic_engine/src/compaction/scheduler.rs | 21 +- analytic_engine/src/instance/close.rs | 16 +- analytic_engine/src/instance/drop.rs | 11 +- analytic_engine/src/instance/engine.rs | 101 +++------- .../src/instance/flush_compaction.rs | 9 +- analytic_engine/src/instance/mod.rs | 15 +- analytic_engine/src/instance/wal_replayer.rs | 34 ++-- analytic_engine/src/instance/write.rs | 8 +- analytic_engine/src/sst/meta_data/cache.rs | 182 +++++++++++++++++- .../src/sst/parquet/async_reader.rs | 116 +++++++---- analytic_engine/src/sst/parquet/encoding.rs | 4 +- analytic_engine/src/sst/reader.rs | 42 ++-- analytic_engine/src/table/data.rs | 107 +++++++--- analytic_engine/src/table/mod.rs | 74 ++++--- analytic_engine/src/table/version.rs | 4 +- common_types/src/schema.rs | 2 +- components/parquet_ext/Cargo.toml | 2 - components/parquet_ext/src/lib.rs | 1 - components/parquet_ext/src/meta_data.rs | 25 +-- components/parquet_ext/src/reader.rs | 81 -------- table_engine/src/table.rs | 3 + tools/src/bin/sst-metadata.rs | 23 +-- 25 files changed, 613 insertions(+), 444 deletions(-) delete mode 100644 components/parquet_ext/src/reader.rs diff --git a/Cargo.lock b/Cargo.lock index 1c12143f3a..a68c821e58 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4202,9 +4202,7 @@ dependencies = [ "bytes", "common_util", "datafusion", - "futures 0.3.28", "log", - "object_store 1.2.2", "parquet", "tokio", ] diff --git a/analytic_engine/src/compaction/mod.rs b/analytic_engine/src/compaction/mod.rs index bcbead4af9..e0485522b1 100644 --- a/analytic_engine/src/compaction/mod.rs +++ b/analytic_engine/src/compaction/mod.rs @@ -318,13 +318,26 @@ pub struct ExpiredFiles { #[derive(Default, Clone)] pub struct CompactionTask { - pub compaction_inputs: Vec, - pub expired: Vec, + inputs: Vec, + expired: Vec, +} + +impl Drop for CompactionTask { + fn drop(&mut self) { + // When a CompactionTask is dropped, it means + // 1. the task finished successfully, or + // 2. the task is cancelled for some reason, like memory limit + // + // In case 2, we need to mark files as not compacted in order for them to be + // scheduled again. In case 1, the files will be moved out of level controller, + // so it doesn't care what the flag is, so it's safe to set false here. + self.mark_files_being_compacted(false); + } } impl CompactionTask { - pub fn mark_files_being_compacted(&self, being_compacted: bool) { - for input in &self.compaction_inputs { + fn mark_files_being_compacted(&self, being_compacted: bool) { + for input in &self.inputs { for file in &input.files { file.set_being_compacted(being_compacted); } @@ -337,9 +350,10 @@ impl CompactionTask { } // Estimate the size of the total input files. + #[inline] pub fn estimated_total_input_file_size(&self) -> usize { let total_input_size: u64 = self - .compaction_inputs + .inputs .iter() .map(|v| v.files.iter().map(|f| f.size()).sum::()) .sum(); @@ -347,19 +361,65 @@ impl CompactionTask { total_input_size as usize } + #[inline] pub fn num_compact_files(&self) -> usize { - self.compaction_inputs.iter().map(|v| v.files.len()).sum() + self.inputs.iter().map(|v| v.files.len()).sum() } - pub fn num_expired_files(&self) -> usize { - self.expired.iter().map(|v| v.files.len()).sum() + #[inline] + pub fn is_empty(&self) -> bool { + self.is_input_empty() && self.expired.is_empty() + } + + #[inline] + pub fn is_input_empty(&self) -> bool { + self.inputs.is_empty() + } + + #[inline] + pub fn expired(&self) -> &[ExpiredFiles] { + &self.expired + } + + #[inline] + pub fn inputs(&self) -> &[CompactionInputFiles] { + &self.inputs + } +} + +pub struct CompactionTaskBuilder { + expired: Vec, + inputs: Vec, +} + +impl CompactionTaskBuilder { + pub fn with_expired(expired: Vec) -> Self { + Self { + expired, + inputs: Vec::new(), + } + } + + pub fn add_inputs(&mut self, files: CompactionInputFiles) { + self.inputs.push(files); + } + + pub fn build(self) -> CompactionTask { + let task = CompactionTask { + expired: self.expired, + inputs: self.inputs, + }; + + task.mark_files_being_compacted(true); + + task } } impl fmt::Debug for CompactionTask { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("CompactionTask") - .field("inputs", &self.compaction_inputs) + .field("inputs", &self.inputs) .field( "expired", &self @@ -380,36 +440,12 @@ impl fmt::Debug for CompactionTask { } } -pub struct PickerManager { - default_picker: CompactionPickerRef, - time_window_picker: CompactionPickerRef, - size_tiered_picker: CompactionPickerRef, -} - -impl Default for PickerManager { - fn default() -> Self { - let size_tiered_picker = Arc::new(CommonCompactionPicker::new( - CompactionStrategy::SizeTiered(SizeTieredCompactionOptions::default()), - )); - let time_window_picker = Arc::new(CommonCompactionPicker::new( - CompactionStrategy::TimeWindow(TimeWindowCompactionOptions::default()), - )); - - Self { - default_picker: time_window_picker.clone(), - size_tiered_picker, - time_window_picker, - } - } -} +#[derive(Default)] +pub struct PickerManager; impl PickerManager { pub fn get_picker(&self, strategy: CompactionStrategy) -> CompactionPickerRef { - match strategy { - CompactionStrategy::Default => self.default_picker.clone(), - CompactionStrategy::SizeTiered(_) => self.size_tiered_picker.clone(), - CompactionStrategy::TimeWindow(_) => self.time_window_picker.clone(), - } + Arc::new(CommonCompactionPicker::new(strategy)) } } diff --git a/analytic_engine/src/compaction/picker.rs b/analytic_engine/src/compaction/picker.rs index 96600199f0..e104aca7d2 100644 --- a/analytic_engine/src/compaction/picker.rs +++ b/analytic_engine/src/compaction/picker.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Compaction picker. @@ -15,8 +15,8 @@ use snafu::Snafu; use crate::{ compaction::{ - CompactionInputFiles, CompactionStrategy, CompactionTask, SizeTieredCompactionOptions, - TimeWindowCompactionOptions, + CompactionInputFiles, CompactionStrategy, CompactionTask, CompactionTaskBuilder, + SizeTieredCompactionOptions, TimeWindowCompactionOptions, }, sst::{ file::{FileHandle, Level}, @@ -60,7 +60,7 @@ pub trait CompactionPicker { fn pick_compaction( &self, ctx: PickerContext, - levels_controller: &LevelsController, + levels_controller: &mut LevelsController, ) -> Result; } @@ -86,10 +86,10 @@ pub struct CommonCompactionPicker { impl CommonCompactionPicker { pub fn new(strategy: CompactionStrategy) -> Self { let level_picker: LevelPickerRef = match strategy { - CompactionStrategy::SizeTiered(_) | CompactionStrategy::Default => { - Arc::new(SizeTieredPicker::default()) + CompactionStrategy::SizeTiered(_) => Arc::new(SizeTieredPicker::default()), + CompactionStrategy::TimeWindow(_) | CompactionStrategy::Default => { + Arc::new(TimeWindowPicker::default()) } - CompactionStrategy::TimeWindow(_) => Arc::new(TimeWindowPicker::default()), }; Self { level_picker } } @@ -123,13 +123,11 @@ impl CompactionPicker for CommonCompactionPicker { fn pick_compaction( &self, ctx: PickerContext, - levels_controller: &LevelsController, + levels_controller: &mut LevelsController, ) -> Result { let expire_time = ctx.ttl.map(Timestamp::expire_time); - let mut compaction_task = CompactionTask { - expired: levels_controller.expired_ssts(expire_time), - ..Default::default() - }; + let mut builder = + CompactionTaskBuilder::with_expired(levels_controller.expired_ssts(expire_time)); if let Some(input_files) = self.pick_compact_candidates(&ctx, levels_controller, expire_time) @@ -139,10 +137,10 @@ impl CompactionPicker for CommonCompactionPicker { ctx.strategy, input_files ); - compaction_task.compaction_inputs = vec![input_files]; + builder.add_inputs(input_files); } - Ok(compaction_task) + Ok(builder.build()) } } @@ -734,39 +732,39 @@ mod tests { }; let now = Timestamp::now(); { - let lc = build_old_bucket_case(now.as_i64()); - let task = twp.pick_compaction(ctx.clone(), &lc).unwrap(); - assert_eq!(task.compaction_inputs[0].files.len(), 2); - assert_eq!(task.compaction_inputs[0].files[0].id(), 0); - assert_eq!(task.compaction_inputs[0].files[1].id(), 1); + let mut lc = build_old_bucket_case(now.as_i64()); + let task = twp.pick_compaction(ctx.clone(), &mut lc).unwrap(); + assert_eq!(task.inputs[0].files.len(), 2); + assert_eq!(task.inputs[0].files[0].id(), 0); + assert_eq!(task.inputs[0].files[1].id(), 1); assert_eq!(task.expired[0].files.len(), 1); assert_eq!(task.expired[0].files[0].id(), 3); } { - let lc = build_newest_bucket_case(now.as_i64()); - let task = twp.pick_compaction(ctx.clone(), &lc).unwrap(); - assert_eq!(task.compaction_inputs[0].files.len(), 4); - assert_eq!(task.compaction_inputs[0].files[0].id(), 2); - assert_eq!(task.compaction_inputs[0].files[1].id(), 3); - assert_eq!(task.compaction_inputs[0].files[2].id(), 4); - assert_eq!(task.compaction_inputs[0].files[3].id(), 5); + let mut lc = build_newest_bucket_case(now.as_i64()); + let task = twp.pick_compaction(ctx.clone(), &mut lc).unwrap(); + assert_eq!(task.inputs[0].files.len(), 4); + assert_eq!(task.inputs[0].files[0].id(), 2); + assert_eq!(task.inputs[0].files[1].id(), 3); + assert_eq!(task.inputs[0].files[2].id(), 4); + assert_eq!(task.inputs[0].files[3].id(), 5); } { - let lc = build_newest_bucket_no_match_case(now.as_i64()); - let task = twp.pick_compaction(ctx.clone(), &lc).unwrap(); - assert_eq!(task.compaction_inputs.len(), 0); + let mut lc = build_newest_bucket_no_match_case(now.as_i64()); + let task = twp.pick_compaction(ctx.clone(), &mut lc).unwrap(); + assert_eq!(task.inputs.len(), 0); } // If ttl is None, then no file is expired. ctx.ttl = None; { - let lc = build_old_bucket_case(now.as_i64()); - let task = twp.pick_compaction(ctx, &lc).unwrap(); - assert_eq!(task.compaction_inputs[0].files.len(), 2); - assert_eq!(task.compaction_inputs[0].files[0].id(), 0); - assert_eq!(task.compaction_inputs[0].files[1].id(), 1); + let mut lc = build_old_bucket_case(now.as_i64()); + let task = twp.pick_compaction(ctx, &mut lc).unwrap(); + assert_eq!(task.inputs[0].files.len(), 2); + assert_eq!(task.inputs[0].files[0].id(), 0); + assert_eq!(task.inputs[0].files[1].id(), 1); assert!(task.expired[0].files.is_empty()); } } diff --git a/analytic_engine/src/compaction/scheduler.rs b/analytic_engine/src/compaction/scheduler.rs index 30cf277521..866629678d 100644 --- a/analytic_engine/src/compaction/scheduler.rs +++ b/analytic_engine/src/compaction/scheduler.rs @@ -237,7 +237,7 @@ impl OngoingTaskLimit { if dropped > 0 { warn!( - "Too many compaction pending tasks, limit: {}, dropped {} older tasks.", + "Too many compaction pending tasks, limit:{}, dropped:{}.", self.max_pending_compaction_tasks, dropped, ); } @@ -462,10 +462,7 @@ impl ScheduleWorker { waiter_notifier: WaiterNotifier, token: MemoryUsageToken, ) { - // Mark files being in compaction. - compaction_task.mark_files_being_compacted(true); - - let keep_scheduling_compaction = !compaction_task.compaction_inputs.is_empty(); + let keep_scheduling_compaction = !compaction_task.is_input_empty(); let runtime = self.runtime.clone(); let space_store = self.space_store.clone(); @@ -503,9 +500,6 @@ impl ScheduleWorker { .await; if let Err(e) = &res { - // Compaction is failed, we need to unset the compaction mark. - compaction_task.mark_files_being_compacted(false); - error!( "Failed to compact table, table_name:{}, table_id:{}, request_id:{}, err:{}", table_data.name, table_data.id, request_id, e @@ -656,7 +650,16 @@ impl ScheduleWorker { self.max_unflushed_duration, ); - let mut serial_exec = table_data.serial_exec.lock().await; + let mut serial_exec = if let Some(v) = table_data.acquire_serial_exec_ctx().await { + v + } else { + warn!( + "Table is closed, ignore this periodical flush, table:{}", + table_data.name + ); + continue; + }; + let flush_scheduler = serial_exec.flush_scheduler(); // Instance flush the table asynchronously. if let Err(e) = flusher diff --git a/analytic_engine/src/instance/close.rs b/analytic_engine/src/instance/close.rs index f45199c164..d933b01214 100644 --- a/analytic_engine/src/instance/close.rs +++ b/analytic_engine/src/instance/close.rs @@ -3,12 +3,12 @@ //! Close table logic of instance use log::{info, warn}; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use table_engine::engine::CloseTableRequest; use crate::{ instance::{ - engine::{DoManifestSnapshot, FlushTable, Result}, + engine::{DoManifestSnapshot, FlushTable, OperateClosedTable, Result}, flush_compaction::{Flusher, TableFlushOptions}, }, manifest::{ManifestRef, SnapshotRequest}, @@ -37,8 +37,11 @@ impl Closer { // Flush table. let opts = TableFlushOptions::default(); - let mut serial_exec = table_data.serial_exec.lock().await; - let flush_scheduler = serial_exec.flush_scheduler(); + let mut serial_exec_ctx = table_data + .acquire_serial_exec_ctx() + .await + .context(OperateClosedTable)?; + let flush_scheduler = serial_exec_ctx.flush_scheduler(); self.flusher .do_flush(flush_scheduler, &table_data, opts) @@ -67,9 +70,10 @@ impl Closer { let removed_table = self.space.remove_table(&request.table_name); assert!(removed_table.is_some()); + serial_exec_ctx.invalidate(); info!( - "table:{}-{} has been removed from the space_id:{}", - table_data.name, table_data.id, self.space.id + "table:{} has been removed from the space_id:{}, table_id:{}", + table_data.name, self.space.id, table_data.id, ); Ok(()) } diff --git a/analytic_engine/src/instance/drop.rs b/analytic_engine/src/instance/drop.rs index ac6d1653fd..08d673f820 100644 --- a/analytic_engine/src/instance/drop.rs +++ b/analytic_engine/src/instance/drop.rs @@ -3,12 +3,12 @@ //! Drop table logic of instance use log::{info, warn}; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use table_engine::engine::DropTableRequest; use crate::{ instance::{ - engine::{FlushTable, Result, WriteManifest}, + engine::{FlushTable, OperateClosedTable, Result, WriteManifest}, flush_compaction::{Flusher, TableFlushOptions}, SpaceStoreRef, }, @@ -36,7 +36,10 @@ impl Dropper { } }; - let mut serial_exec = table_data.serial_exec.lock().await; + let mut serial_exec_ctx = table_data + .acquire_serial_exec_ctx() + .await + .context(OperateClosedTable)?; if table_data.is_dropped() { warn!( @@ -51,7 +54,7 @@ impl Dropper { // be avoided. let opts = TableFlushOptions::default(); - let flush_scheduler = serial_exec.flush_scheduler(); + let flush_scheduler = serial_exec_ctx.flush_scheduler(); self.flusher .do_flush(flush_scheduler, &table_data, opts) .await diff --git a/analytic_engine/src/instance/engine.rs b/analytic_engine/src/instance/engine.rs index e32fa064d5..562c97ff2e 100644 --- a/analytic_engine/src/instance/engine.rs +++ b/analytic_engine/src/instance/engine.rs @@ -23,29 +23,21 @@ use crate::{ #[derive(Debug, Snafu)] #[snafu(visibility(pub(crate)))] pub enum Error { - #[snafu(display( - "The space of the table does not exist, space_id:{}, table:{}.\nBacktrace:\n{}", - space_id, - table, - backtrace, - ))] + #[snafu(display("The space of the table does not exist, space_id:{space_id}, table:{table}.\nBacktrace:\n{backtrace}"))] SpaceNotExist { space_id: SpaceId, table: String, backtrace: Backtrace, }, - #[snafu(display("Failed to read meta update, table_id:{}, err:{}", table_id, source))] + #[snafu(display("Failed to read meta update, table_id:{table_id}, err:{source}"))] ReadMetaUpdate { table_id: TableId, source: GenericError, }, #[snafu(display( - "Failed to recover table data, space_id:{}, table:{}, err:{}", - space_id, - table, - source + "Failed to recover table data, space_id:{space_id}, table:{table}, err:{source}" ))] RecoverTableData { space_id: SpaceId, @@ -53,14 +45,11 @@ pub enum Error { source: crate::table::data::Error, }, - #[snafu(display("Failed to read wal, err:{}", source))] + #[snafu(display("Failed to read wal, err:{source}"))] ReadWal { source: wal::manager::Error }, #[snafu(display( - "Failed to apply log entry to memtable, table:{}, table_id:{}, err:{}", - table, - table_id, - source + "Failed to apply log entry to memtable, table:{table}, table_id:{table_id}, err:{source}", ))] ApplyMemTable { space_id: SpaceId, @@ -70,11 +59,7 @@ pub enum Error { }, #[snafu(display( - "Flush failed, space_id:{}, table:{}, table_id:{}, err:{}", - space_id, - table, - table_id, - source + "Flush failed, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}", ))] FlushTable { space_id: SpaceId, @@ -84,11 +69,7 @@ pub enum Error { }, #[snafu(display( - "Failed to persist meta update to manifest, space_id:{}, table:{}, table_id:{}, err:{}", - space_id, - table, - table_id, - source + "Failed to persist meta update to manifest, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}", ))] WriteManifest { space_id: SpaceId, @@ -98,11 +79,7 @@ pub enum Error { }, #[snafu(display( - "Failed to persist meta update to WAL, space_id:{}, table:{}, table_id:{}, err:{}", - space_id, - table, - table_id, - source + "Failed to persist meta update to WAL, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}", ))] WriteWal { space_id: SpaceId, @@ -112,11 +89,7 @@ pub enum Error { }, #[snafu(display( - "Invalid options, space_id:{}, table:{}, table_id:{}, err:{}", - space_id, - table, - table_id, - source + "Invalid options, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}", ))] InvalidOptions { space_id: SpaceId, @@ -126,11 +99,7 @@ pub enum Error { }, #[snafu(display( - "Failed to create table data, space_id:{}, table:{}, table_id:{}, err:{}", - space_id, - table, - table_id, - source + "Failed to create table data, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}", ))] CreateTableData { space_id: SpaceId, @@ -140,11 +109,8 @@ pub enum Error { }, #[snafu(display( - "Try to update schema to elder version, table:{}, current_version:{}, given_version:{}.\nBacktrace:\n{}", - table, - current_version, - given_version, - backtrace, + "Try to update schema to elder version, table:{table}, current_version:{current_version}, \ + given_version:{given_version}.\nBacktrace:\n{backtrace}", ))] InvalidSchemaVersion { table: String, @@ -154,11 +120,8 @@ pub enum Error { }, #[snafu(display( - "Invalid previous schema version, table:{}, current_version:{}, pre_version:{}.\nBacktrace:\n{}", - table, - current_version, - pre_version, - backtrace, + "Invalid previous schema version, table:{table}, current_version:{current_version}, \ + pre_version:{pre_version}.\nBacktrace:\n{backtrace}", ))] InvalidPreVersion { table: String, @@ -167,21 +130,14 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display( - "Alter schema of a dropped table:{}.\nBacktrace:\n{}", - table, - backtrace - ))] + #[snafu(display("Alter schema of a dropped table:{table}.\nBacktrace:\n{backtrace}"))] AlterDroppedTable { table: String, backtrace: Backtrace }, - #[snafu(display("Failed to store version edit, err:{}", source))] + #[snafu(display("Failed to store version edit, err:{source}"))] StoreVersionEdit { source: GenericError }, #[snafu(display( - "Failed to encode payloads, table:{}, wal_location:{:?}, err:{}", - table, - wal_location, - source + "Failed to encode payloads, table:{table}, wal_location:{wal_location:?}, err:{source}" ))] EncodePayloads { table: String, @@ -190,10 +146,7 @@ pub enum Error { }, #[snafu(display( - "Failed to do manifest snapshot for table, space_id:{}, table:{}, err:{}", - space_id, - table, - source + "Failed to do manifest snapshot for table, space_id:{space_id}, table:{table}, err:{source}", ))] DoManifestSnapshot { space_id: SpaceId, @@ -202,30 +155,31 @@ pub enum Error { }, #[snafu(display( - "Table open failed and can not be created again, table:{}.\nBacktrace:\n{}", - table, - backtrace, + "Table open failed and can not be created again, table:{table}.\nBacktrace:\n{backtrace}", ))] CreateOpenFailedTable { table: String, backtrace: Backtrace }, - #[snafu(display("Failed to open manifest, err:{}", source))] + #[snafu(display("Failed to open manifest, err:{source}"))] OpenManifest { source: crate::manifest::details::Error, }, - #[snafu(display("Failed to find table, msg:{}.\nBacktrace:\n{}", msg, backtrace))] + #[snafu(display("Failed to find table, msg:{msg}.\nBacktrace:\n{backtrace}"))] TableNotExist { msg: String, backtrace: Backtrace }, - #[snafu(display("Failed to open shard, msg:{}.\nBacktrace:\n{}", msg, backtrace))] + #[snafu(display("Failed to open shard, msg:{msg}.\nBacktrace:\n{backtrace}"))] OpenTablesOfShard { msg: String, backtrace: Backtrace }, - #[snafu(display("Failed to replay wal, msg:{:?}, err:{}", msg, source))] + #[snafu(display("Try to operate a closed table.\nBacktrace:\n{backtrace}"))] + OperateClosedTable { backtrace: Backtrace }, + + #[snafu(display("Failed to replay wal, msg:{msg:?}, err:{source}"))] ReplayWalWithCause { msg: Option, source: GenericError, }, - #[snafu(display("Failed to replay wal, msg:{:?}.\nBacktrace:\n{}", msg, backtrace))] + #[snafu(display("Failed to replay wal, msg:{msg:?}.\nBacktrace:\n{backtrace}"))] ReplayWalNoCause { msg: Option, backtrace: Backtrace, @@ -264,6 +218,7 @@ impl From for table_engine::engine::Error { | Error::TableNotExist { .. } | Error::OpenTablesOfShard { .. } | Error::ReplayWalNoCause { .. } + | Error::OperateClosedTable { .. } | Error::ReplayWalWithCause { .. } => Self::Unexpected { source: Box::new(err), }, diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index 4e860def74..03b2f30337 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -669,22 +669,23 @@ impl SpaceStore { "Begin compact table, table_name:{}, id:{}, task:{:?}", table_data.name, table_data.id, task ); + let inputs = task.inputs(); let mut edit_meta = VersionEditMeta { space_id: table_data.space_id, table_id: table_data.id, flushed_sequence: 0, // Use the number of compaction inputs as the estimated number of files to add. - files_to_add: Vec::with_capacity(task.compaction_inputs.len()), + files_to_add: Vec::with_capacity(inputs.len()), files_to_delete: vec![], mems_to_remove: vec![], }; - if task.num_expired_files() == 0 && task.num_compact_files() == 0 { + if task.is_empty() { // Nothing to compact. return Ok(()); } - for files in &task.expired { + for files in task.expired() { self.delete_expired_files(table_data, request_id, files, &mut edit_meta); } @@ -696,7 +697,7 @@ impl SpaceStore { task.num_compact_files(), ); - for input in &task.compaction_inputs { + for input in inputs { self.compact_input_files( request_id, table_data, diff --git a/analytic_engine/src/instance/mod.rs b/analytic_engine/src/instance/mod.rs index 1faf254f08..492178b41c 100644 --- a/analytic_engine/src/instance/mod.rs +++ b/analytic_engine/src/instance/mod.rs @@ -28,14 +28,14 @@ use common_util::{ }; use log::{error, info}; use mem_collector::MemUsageCollector; -use snafu::{ResultExt, Snafu}; +use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::{engine::EngineRuntimes, table::FlushRequest}; use tokio::sync::oneshot::{self, error::RecvError}; use wal::manager::{WalLocation, WalManagerRef}; -use self::flush_compaction::{Flusher, TableFlushOptions}; use crate::{ compaction::{scheduler::CompactionSchedulerRef, TableCompactionRequest}, + instance::flush_compaction::{Flusher, TableFlushOptions}, manifest::ManifestRef, row_iter::IterOptions, space::{SpaceId, SpaceRef, SpacesRef}, @@ -66,6 +66,9 @@ pub enum Error { source: GenericError, }, + #[snafu(display("Try to operate a closed table, table:{table}.\nBacktrace:\n{backtrace}"))] + OperateClosedTable { table: String, backtrace: Backtrace }, + #[snafu(display("Failed to receive {} result, table:{}, err:{}", op, table, source))] RecvManualOpResult { op: String, @@ -195,7 +198,13 @@ impl Instance { }; let flusher = self.make_flusher(); - let mut serial_exec = table_data.serial_exec.lock().await; + let mut serial_exec = + table_data + .acquire_serial_exec_ctx() + .await + .context(OperateClosedTable { + table: &table_data.name, + })?; let flush_scheduler = serial_exec.flush_scheduler(); flusher .schedule_flush(flush_scheduler, table_data, flush_opts) diff --git a/analytic_engine/src/instance/wal_replayer.rs b/analytic_engine/src/instance/wal_replayer.rs index c1e6fd96b9..d5076c8936 100644 --- a/analytic_engine/src/instance/wal_replayer.rs +++ b/analytic_engine/src/instance/wal_replayer.rs @@ -14,7 +14,7 @@ use common_util::error::BoxError; use lazy_static::lazy_static; use log::{debug, error, info, trace}; use prometheus::{exponential_buckets, register_histogram, Histogram}; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use table_engine::table::TableId; use tokio::sync::MutexGuard; use wal::{ @@ -27,13 +27,13 @@ use wal::{ use crate::{ instance::{ self, - engine::{Error, ReplayWalWithCause, Result}, + engine::{Error, OperateClosedTable, ReplayWalWithCause, Result}, flush_compaction::{Flusher, TableFlushOptions}, serial_executor::TableOpSerialExecutor, write::MemTableWriter, }, payload::{ReadPayload, WalDecoder}, - table::data::TableDataRef, + table::data::{SerialExecContext, TableDataRef}, }; // Metrics of wal replayer @@ -200,7 +200,10 @@ impl TableBasedReplay { .box_err() .context(ReplayWalWithCause { msg: None })?; - let mut serial_exec = table_data.serial_exec.lock().await; + let mut serial_exec = table_data + .acquire_serial_exec_ctx() + .await + .context(OperateClosedTable)?; let mut log_entry_buf = VecDeque::with_capacity(context.wal_replay_batch_size); loop { // fetch entries to log_entry_buf @@ -284,14 +287,17 @@ impl RegionBasedReplay { let mut log_entry_buf = VecDeque::with_capacity(context.wal_replay_batch_size); // Lock all related tables. - let mut serial_exec_ctxs = HashMap::with_capacity(table_datas.len()); + let mut replay_table_ctxs = HashMap::with_capacity(table_datas.len()); for table_data in table_datas { - let serial_exec = table_data.serial_exec.lock().await; - let serial_exec_ctx = SerialExecContext { + let serial_exec_ctx = table_data + .acquire_serial_exec_ctx() + .await + .context(OperateClosedTable)?; + let replay_table_ctx = TableReplayContext { table_data: table_data.clone(), - serial_exec, + serial_exec_ctx, }; - serial_exec_ctxs.insert(table_data.id, serial_exec_ctx); + replay_table_ctxs.insert(table_data.id, replay_table_ctx); } // Split and replay logs. @@ -309,7 +315,7 @@ impl RegionBasedReplay { } let _timer = APPLY_LOGS_DURATION_HISTOGRAM.start_timer(); - Self::replay_single_batch(context, &log_entry_buf, &mut serial_exec_ctxs, faileds) + Self::replay_single_batch(context, &log_entry_buf, &mut replay_table_ctxs, faileds) .await?; } @@ -319,7 +325,7 @@ impl RegionBasedReplay { async fn replay_single_batch( context: &ReplayContext, log_batch: &VecDeque>, - serial_exec_ctxs: &mut HashMap>, + serial_exec_ctxs: &mut HashMap>, faileds: &mut FailedTables, ) -> Result<()> { let mut table_batches = Vec::new(); @@ -339,7 +345,7 @@ impl RegionBasedReplay { let result = replay_table_log_entries( &context.flusher, context.max_retry_flush_limit, - &mut ctx.serial_exec, + &mut ctx.serial_exec_ctx, &ctx.table_data, log_batch.range(table_batch.range), ) @@ -413,9 +419,9 @@ struct TableBatch { range: Range, } -struct SerialExecContext<'a> { +struct TableReplayContext<'a> { table_data: TableDataRef, - serial_exec: MutexGuard<'a, TableOpSerialExecutor>, + serial_exec_ctx: MutexGuard<'a, SerialExecContext>, } /// Replay all log entries into memtable and flush if necessary diff --git a/analytic_engine/src/instance/write.rs b/analytic_engine/src/instance/write.rs index 219e86102c..6017686b9a 100644 --- a/analytic_engine/src/instance/write.rs +++ b/analytic_engine/src/instance/write.rs @@ -608,9 +608,9 @@ impl<'a> Writer<'a> { "Try to trigger flush of other table:{} from the write procedure of table:{}", table_data.name, self.table_data.name ); - match table_data.serial_exec.try_lock() { - Ok(mut serial_exec) => { - let flush_scheduler = serial_exec.flush_scheduler(); + match table_data.try_acquire_serial_exec_ctx() { + Some(mut serial_exec_ctx) => { + let flush_scheduler = serial_exec_ctx.flush_scheduler(); // Set `block_on_write_thread` to false and let flush do in background. flusher .schedule_flush(flush_scheduler, table_data, opts) @@ -619,7 +619,7 @@ impl<'a> Writer<'a> { table: &table_data.name, }) } - Err(_) => { + None => { warn!( "Failed to acquire write lock for flush table:{}", table_data.name, diff --git a/analytic_engine/src/sst/meta_data/cache.rs b/analytic_engine/src/sst/meta_data/cache.rs index 296c4e2476..5e2bacdcbd 100644 --- a/analytic_engine/src/sst/meta_data/cache.rs +++ b/analytic_engine/src/sst/meta_data/cache.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. use std::{ fmt::Debug, @@ -7,7 +7,7 @@ use std::{ use lru::LruCache; use parquet::file::metadata::FileMetaData; -use snafu::{OptionExt, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use crate::sst::{ meta_data::{DecodeCustomMetaData, KvMetaDataNotFound, ParquetMetaDataRef, Result}, @@ -39,14 +39,24 @@ impl MetaData { let kv_metas = file_meta_data .key_value_metadata() .context(KvMetaDataNotFound)?; - let kv_meta = kv_metas - .iter() - .find(|kv| kv.key == encoding::META_KEY) - .context(KvMetaDataNotFound)?; + + ensure!(!kv_metas.is_empty(), KvMetaDataNotFound); + let mut other_kv_metas = Vec::with_capacity(kv_metas.len() - 1); + let mut custom_kv_meta = None; + for kv_meta in kv_metas { + // Remove our extended custom meta data from the parquet metadata for small + // memory consumption in the cache. + if kv_meta.key == encoding::META_KEY { + custom_kv_meta = Some(kv_meta); + } else { + other_kv_metas.push(kv_meta.clone()); + } + } let custom = { + let custom_kv_meta = custom_kv_meta.context(KvMetaDataNotFound)?; let mut sst_meta = - encoding::decode_sst_meta_data(kv_meta).context(DecodeCustomMetaData)?; + encoding::decode_sst_meta_data(custom_kv_meta).context(DecodeCustomMetaData)?; if ignore_sst_filter { sst_meta.parquet_filter = None; } @@ -56,13 +66,17 @@ impl MetaData { // let's build a new parquet metadata without the extended key value // metadata. + let other_kv_metas = if other_kv_metas.is_empty() { + None + } else { + Some(other_kv_metas) + }; let parquet = { let thin_file_meta_data = FileMetaData::new( file_meta_data.version(), file_meta_data.num_rows(), file_meta_data.created_by().map(|v| v.to_string()), - // Remove the key value metadata. - None, + other_kv_metas, file_meta_data.schema_descr_ptr(), file_meta_data.column_orders().cloned(), ); @@ -111,3 +125,153 @@ impl MetaCache { self.cache.write().unwrap().put(key, value); } } + +#[cfg(test)] +mod tests { + use std::{fs::File, path::Path, sync::Arc}; + + use arrow::{ + array::UInt64Builder, + datatypes::{DataType, Field, Schema}, + record_batch::RecordBatch, + }; + use bytes::Bytes; + use common_types::{ + column_schema::Builder as ColumnSchemaBuilder, + schema::Builder as CustomSchemaBuilder, + time::{TimeRange, Timestamp}, + }; + use parquet::{arrow::ArrowWriter, file::footer}; + use parquet_ext::ParquetMetaData; + + use super::MetaData; + use crate::sst::parquet::{encoding, meta_data::ParquetMetaData as CustomParquetMetaData}; + + fn check_parquet_meta_data(original: &ParquetMetaData, processed: &ParquetMetaData) { + assert_eq!(original.page_indexes(), processed.page_indexes()); + assert_eq!(original.offset_indexes(), processed.offset_indexes()); + assert_eq!(original.num_row_groups(), processed.num_row_groups()); + assert_eq!(original.row_groups(), processed.row_groups()); + + let original_file_md = original.file_metadata(); + let processed_file_md = processed.file_metadata(); + assert_eq!(original_file_md.num_rows(), processed_file_md.num_rows()); + assert_eq!(original_file_md.version(), processed_file_md.version()); + assert_eq!( + original_file_md.created_by(), + processed_file_md.created_by() + ); + assert_eq!(original_file_md.schema(), processed_file_md.schema()); + assert_eq!( + original_file_md.schema_descr(), + processed_file_md.schema_descr() + ); + assert_eq!( + original_file_md.schema_descr_ptr(), + processed_file_md.schema_descr_ptr() + ); + assert_eq!( + original_file_md.column_orders(), + processed_file_md.column_orders() + ); + + if let Some(kv_metas) = original_file_md.key_value_metadata() { + let processed_kv_metas = processed_file_md.key_value_metadata().unwrap(); + assert_eq!(kv_metas.len(), processed_kv_metas.len() + 1); + let mut idx_for_processed = 0; + for kv in kv_metas { + if kv.key == encoding::META_KEY { + continue; + } + assert_eq!(kv, &processed_kv_metas[idx_for_processed]); + idx_for_processed += 1; + } + } else { + assert!(processed_file_md.key_value_metadata().is_none()); + } + } + + fn write_parquet_file_with_metadata( + parquet_file_path: &Path, + custom_meta_data: &CustomParquetMetaData, + ) { + let tsid_array = { + let mut builder = UInt64Builder::new(); + builder.append_value(10); + builder.append_null(); + builder.append_value(11); + builder.finish() + }; + let timestamp_array = { + let mut builder = UInt64Builder::new(); + builder.append_value(1000); + builder.append_null(); + builder.append_value(1001); + builder.finish() + }; + let file = File::create(parquet_file_path).unwrap(); + let schema = Schema::new(vec![ + Field::new("tsid", DataType::UInt64, true), + Field::new("timestamp", DataType::UInt64, true), + ]); + + let batch = RecordBatch::try_new( + Arc::new(schema), + vec![Arc::new(tsid_array), Arc::new(timestamp_array)], + ) + .unwrap(); + let mut writer = ArrowWriter::try_new(file, batch.schema(), None).unwrap(); + + let encoded_meta_data = encoding::encode_sst_meta_data(custom_meta_data.clone()).unwrap(); + writer.append_key_value_metadata(encoded_meta_data); + + writer.write(&batch).unwrap(); + writer.close().unwrap(); + } + + #[test] + fn test_arrow_meta_data() { + let temp_dir = tempfile::tempdir().unwrap(); + let parquet_file_path = temp_dir.path().join("test_arrow_meta_data.par"); + let schema = { + let tsid_column_schema = ColumnSchemaBuilder::new( + "tsid".to_string(), + common_types::datum::DatumKind::UInt64, + ) + .build() + .unwrap(); + let timestamp_column_schema = ColumnSchemaBuilder::new( + "timestamp".to_string(), + common_types::datum::DatumKind::Timestamp, + ) + .build() + .unwrap(); + CustomSchemaBuilder::new() + .auto_increment_column_id(true) + .add_key_column(tsid_column_schema) + .unwrap() + .add_key_column(timestamp_column_schema) + .unwrap() + .build() + .unwrap() + }; + let custom_meta_data = CustomParquetMetaData { + min_key: Bytes::from_static(&[0, 1]), + max_key: Bytes::from_static(&[2, 2]), + time_range: TimeRange::new_unchecked(Timestamp::new(0), Timestamp::new(10)), + max_sequence: 1001, + schema, + parquet_filter: None, + collapsible_cols_idx: vec![], + }; + write_parquet_file_with_metadata(parquet_file_path.as_path(), &custom_meta_data); + + let parquet_file = File::open(parquet_file_path.as_path()).unwrap(); + let parquet_meta_data = footer::parse_metadata(&parquet_file).unwrap(); + + let meta_data = MetaData::try_new(&parquet_meta_data, false).unwrap(); + + assert_eq!(**meta_data.custom(), custom_meta_data); + check_parquet_meta_data(&parquet_meta_data, meta_data.parquet()); + } +} diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index 735e0cd9d2..47478eca6a 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Sst reader implementation based on parquet. @@ -30,14 +30,17 @@ use datafusion::{ metrics::ExecutionPlanMetricsSet, }, }; -use futures::{Stream, StreamExt}; +use futures::{future::BoxFuture, FutureExt, Stream, StreamExt, TryFutureExt}; use log::{debug, error}; use object_store::{ObjectStoreRef, Path}; use parquet::{ - arrow::{arrow_reader::RowSelection, ParquetRecordBatchStreamBuilder, ProjectionMask}, + arrow::{ + arrow_reader::RowSelection, async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder, + ProjectionMask, + }, file::metadata::RowGroupMetaData, }; -use parquet_ext::{meta_data::ChunkReader, reader::ObjectStoreReader}; +use parquet_ext::meta_data::ChunkReader; use snafu::ResultExt; use table_engine::predicate::PredicateRef; use tokio::sync::mpsc::{self, Receiver, Sender}; @@ -278,23 +281,13 @@ impl<'a> Reader<'a> { let mut streams = Vec::with_capacity(target_row_group_chunks.len()); for chunk in target_row_group_chunks { - let object_store_reader = ObjectStoreReader::new( - self.store.clone(), - self.path.clone(), - parquet_metadata.clone(), - ); + let object_store_reader = + ObjectStoreReader::new(self.store.clone(), self.path.clone(), meta_data.clone()); let mut builder = ParquetRecordBatchStreamBuilder::new(object_store_reader) .await .with_context(|| ParquetError)?; - let row_selection = self.build_row_selection(arrow_schema.clone(), &chunk, parquet_metadata)?; - - debug!( - "Build row selection for file path:{}, result:{row_selection:?}, page indexes:{}", - self.path, - parquet_metadata.page_indexes().is_some() - ); if let Some(selection) = row_selection { builder = builder.with_row_selection(selection); }; @@ -347,32 +340,18 @@ impl<'a> Reader<'a> { Ok(file_size) } - async fn load_meta_data_from_storage(&self, ignore_sst_filter: bool) -> Result { + async fn load_meta_data_from_storage(&self) -> Result { let file_size = self.load_file_size().await?; let chunk_reader_adapter = ChunkReaderAdapter::new(self.path, self.store); - let (parquet_meta_data, _) = + let (meta_data, _) = parquet_ext::meta_data::fetch_parquet_metadata(file_size, &chunk_reader_adapter) .await .with_context(|| FetchAndDecodeSstMeta { file_path: self.path.to_string(), })?; - let object_store_reader = parquet_ext::reader::ObjectStoreReader::new( - self.store.clone(), - self.path.clone(), - Arc::new(parquet_meta_data), - ); - - let parquet_meta_data = parquet_ext::meta_data::meta_with_page_indexes(object_store_reader) - .await - .with_context(|| DecodePageIndexes { - file_path: self.path.to_string(), - })?; - - MetaData::try_new(&parquet_meta_data, ignore_sst_filter) - .box_err() - .context(DecodeSstMeta) + Ok(meta_data) } fn need_update_cache(&self) -> bool { @@ -396,8 +375,12 @@ impl<'a> Reader<'a> { let empty_predicate = self.predicate.exprs().is_empty(); let meta_data = { + let parquet_meta_data = self.load_meta_data_from_storage().await?; + let ignore_sst_filter = avoid_update_cache && empty_predicate; - self.load_meta_data_from_storage(ignore_sst_filter).await? + MetaData::try_new(&parquet_meta_data, ignore_sst_filter) + .box_err() + .context(DecodeSstMeta)? }; if avoid_update_cache || self.meta_cache.is_none() { @@ -430,6 +413,71 @@ impl<'a> Drop for Reader<'a> { } } +#[derive(Clone)] +struct ObjectStoreReader { + storage: ObjectStoreRef, + path: Path, + meta_data: MetaData, + begin: Instant, +} + +impl ObjectStoreReader { + fn new(storage: ObjectStoreRef, path: Path, meta_data: MetaData) -> Self { + Self { + storage, + path, + meta_data, + begin: Instant::now(), + } + } +} + +impl Drop for ObjectStoreReader { + fn drop(&mut self) { + debug!( + "ObjectStoreReader dropped, path:{}, elapsed:{:?}", + &self.path, + self.begin.elapsed() + ); + } +} + +impl AsyncFileReader for ObjectStoreReader { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { + self.storage + .get_range(&self.path, range) + .map_err(|e| { + parquet::errors::ParquetError::General(format!( + "Failed to fetch range from object store, err:{e}" + )) + }) + .boxed() + } + + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, parquet::errors::Result>> { + async move { + self.storage + .get_ranges(&self.path, &ranges) + .map_err(|e| { + parquet::errors::ParquetError::General(format!( + "Failed to fetch ranges from object store, err:{e}" + )) + }) + .await + } + .boxed() + } + + fn get_metadata( + &mut self, + ) -> BoxFuture<'_, parquet::errors::Result>> { + Box::pin(async move { Ok(self.meta_data.parquet().clone()) }) + } +} + pub struct ChunkReaderAdapter<'a> { path: &'a Path, store: &'a ObjectStoreRef, diff --git a/analytic_engine/src/sst/parquet/encoding.rs b/analytic_engine/src/sst/parquet/encoding.rs index 2effc6bc36..d65f75dcd1 100644 --- a/analytic_engine/src/sst/parquet/encoding.rs +++ b/analytic_engine/src/sst/parquet/encoding.rs @@ -1030,11 +1030,11 @@ mod tests { ArrowRecordBatch::try_new(schema.to_arrow_schema_ref(), columns).unwrap(); let input_record_batch2 = ArrowRecordBatch::try_new(schema.to_arrow_schema_ref(), columns2).unwrap(); - let row_nums = encoder + let num_rows = encoder .encode(vec![input_record_batch, input_record_batch2]) .await .unwrap(); - assert_eq!(2, row_nums); + assert_eq!(2, num_rows); // read encoded records back, and then compare with input records encoder.close().await.unwrap(); diff --git a/analytic_engine/src/sst/reader.rs b/analytic_engine/src/sst/reader.rs index 915190a935..599f1691ba 100644 --- a/analytic_engine/src/sst/reader.rs +++ b/analytic_engine/src/sst/reader.rs @@ -1,4 +1,4 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Sst reader trait definition. @@ -15,17 +15,20 @@ pub mod error { #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { - #[snafu(display("Try to read again, path:{path}.\nBacktrace:\n{backtrace}"))] + #[snafu(display("Try to read again, path:{}.\nBacktrace:\n{}", path, backtrace))] ReadAgain { backtrace: Backtrace, path: String }, - #[snafu(display("Fail to read persisted file, path:{path}, err:{source}"))] + #[snafu(display("Fail to read persisted file, path:{}, err:{}", path, source))] ReadPersist { path: String, source: GenericError }, - #[snafu(display("Failed to decode record batch, err:{source}"))] + #[snafu(display("Failed to decode record batch, err:{}", source))] DecodeRecordBatch { source: GenericError }, #[snafu(display( - "Failed to decode sst meta data, file_path:{file_path}, err:{source}.\nBacktrace:\n{backtrace:?}", + "Failed to decode sst meta data, file_path:{}, err:{}.\nBacktrace:\n{:?}", + file_path, + source, + backtrace ))] FetchAndDecodeSstMeta { file_path: String, @@ -33,52 +36,43 @@ pub mod error { backtrace: Backtrace, }, - #[snafu(display( - "Failed to decode page indexes for meta data, file_path:{file_path}, err:{source}.\nBacktrace:\n{backtrace:?}", - ))] - DecodePageIndexes { - file_path: String, - source: parquet::errors::ParquetError, - backtrace: Backtrace, - }, - - #[snafu(display("Failed to decode sst meta data, err:{source}"))] + #[snafu(display("Failed to decode sst meta data, err:{}", source))] DecodeSstMeta { source: GenericError }, - #[snafu(display("Sst meta data is not found.\nBacktrace:\n{backtrace}"))] + #[snafu(display("Sst meta data is not found.\nBacktrace:\n{}", backtrace))] SstMetaNotFound { backtrace: Backtrace }, - #[snafu(display("Fail to projection, err:{source}"))] + #[snafu(display("Fail to projection, err:{}", source))] Projection { source: GenericError }, - #[snafu(display("Sst meta data is empty.\nBacktrace:\n{backtrace}"))] + #[snafu(display("Sst meta data is empty.\nBacktrace:\n{}", backtrace))] EmptySstMeta { backtrace: Backtrace }, - #[snafu(display("Invalid schema, err:{source}"))] + #[snafu(display("Invalid schema, err:{}", source))] InvalidSchema { source: common_types::schema::Error }, - #[snafu(display("Meet a datafusion error, err:{source}\nBacktrace:\n{backtrace}"))] + #[snafu(display("Meet a datafusion error, err:{}\nBacktrace:\n{}", source, backtrace))] DataFusionError { source: datafusion::error::DataFusionError, backtrace: Backtrace, }, - #[snafu(display("Meet a object store error, err:{source}\nBacktrace:\n{backtrace}"))] + #[snafu(display("Meet a object store error, err:{}\nBacktrace:\n{}", source, backtrace))] ObjectStoreError { source: object_store::ObjectStoreError, backtrace: Backtrace, }, - #[snafu(display("Meet a parquet error, err:{source}\nBacktrace:\n{backtrace}"))] + #[snafu(display("Meet a parquet error, err:{}\nBacktrace:\n{}", source, backtrace))] ParquetError { source: parquet::errors::ParquetError, backtrace: Backtrace, }, - #[snafu(display("Other kind of error:{source}"))] + #[snafu(display("Other kind of error:{}", source))] Other { source: GenericError }, - #[snafu(display("Other kind of error, msg:{msg}.\nBacktrace:\n{backtrace}"))] + #[snafu(display("Other kind of error, msg:{}.\nBacktrace:\n{}", msg, backtrace))] OtherNoCause { msg: String, backtrace: Backtrace }, } diff --git a/analytic_engine/src/table/data.rs b/analytic_engine/src/table/data.rs index 49d9d6cae8..01ba153fd8 100644 --- a/analytic_engine/src/table/data.rs +++ b/analytic_engine/src/table/data.rs @@ -7,6 +7,7 @@ use std::{ convert::TryInto, fmt, fmt::Formatter, + ops::{Deref, DerefMut}, sync::{ atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}, Arc, Mutex, @@ -28,6 +29,7 @@ use log::{debug, info}; use object_store::Path; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::table::TableId; +use tokio::sync::MutexGuard; use crate::{ instance::serial_executor::TableOpSerialExecutor, @@ -86,6 +88,36 @@ impl TableShardInfo { } } +/// The context for execution of serial operation on the table. +pub struct SerialExecContext { + /// Denotes whether `serial_exec` is valid. + /// + /// The `serial_exec` will be invalidated if the table is closed. + invalid: bool, + serial_exec: TableOpSerialExecutor, +} + +impl SerialExecContext { + #[inline] + pub fn invalidate(&mut self) { + self.invalid = true; + } +} + +impl Deref for SerialExecContext { + type Target = TableOpSerialExecutor; + + fn deref(&self) -> &Self::Target { + &self.serial_exec + } +} + +impl DerefMut for SerialExecContext { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.serial_exec + } +} + /// Data of a table pub struct TableData { /// Id of this table @@ -143,14 +175,13 @@ pub struct TableData { /// No write/alter is allowed if the table is dropped. dropped: AtomicBool, + serial_exec_ctx: tokio::sync::Mutex, + /// Metrics of this table pub metrics: Metrics, /// Shard info of the table pub shard_info: TableShardInfo, - - /// The table operation serial_exec - pub serial_exec: tokio::sync::Mutex, } impl fmt::Debug for TableData { @@ -217,6 +248,10 @@ impl TableData { preflush_write_buffer_size_ratio, )); + let serial_exec_ctx = tokio::sync::Mutex::new(SerialExecContext { + invalid: false, + serial_exec: TableOpSerialExecutor::new(table_id), + }); Ok(Self { id: table_id, name: table_name, @@ -235,7 +270,7 @@ impl TableData { dropped: AtomicBool::new(false), metrics, shard_info: TableShardInfo::new(shard_id), - serial_exec: tokio::sync::Mutex::new(TableOpSerialExecutor::new(table_id)), + serial_exec_ctx, }) } @@ -249,35 +284,17 @@ impl TableData { preflush_write_buffer_size_ratio: f32, mem_usage_collector: CollectorRef, ) -> Result { - let memtable_factory = Arc::new(SkiplistMemTableFactory); - let purge_queue = purger.create_purge_queue(add_meta.space_id, add_meta.table_id); - let current_version = TableVersion::new(purge_queue); - let metrics = Metrics::default(); - let mutable_limit = AtomicU32::new(compute_mutable_limit( - add_meta.opts.write_buffer_size, + Self::new( + add_meta.space_id, + add_meta.table_id, + add_meta.table_name, + add_meta.schema, + shard_id, + add_meta.opts, + purger, preflush_write_buffer_size_ratio, - )); - - Ok(Self { - id: add_meta.table_id, - name: add_meta.table_name, - schema: Mutex::new(add_meta.schema), - space_id: add_meta.space_id, - mutable_limit, - mutable_limit_write_buffer_ratio: preflush_write_buffer_size_ratio, - opts: ArcSwap::new(Arc::new(add_meta.opts)), - memtable_factory, mem_usage_collector, - current_version, - last_sequence: AtomicU64::new(0), - last_memtable_id: AtomicU64::new(0), - last_file_id: AtomicU64::new(0), - last_flush_time_ms: AtomicU64::new(0), - dropped: AtomicBool::new(false), - metrics, - shard_info: TableShardInfo::new(shard_id), - serial_exec: tokio::sync::Mutex::new(TableOpSerialExecutor::new(add_meta.table_id)), - }) + ) } /// Get current schema of the table. @@ -352,6 +369,34 @@ impl TableData { self.dropped.store(true, Ordering::SeqCst); } + /// Acquire the [`SerialExecContext`] if the table is not closed. + pub async fn acquire_serial_exec_ctx(&self) -> Option> { + let v = self.serial_exec_ctx.lock().await; + if v.invalid { + None + } else { + Some(v) + } + } + + /// Try to acquire the [SerialExecContext]. + /// + /// [None] will be returned if the serial_exec_ctx has been acquired + /// already, or the table is closed. + pub fn try_acquire_serial_exec_ctx(&self) -> Option> { + let v = self.serial_exec_ctx.try_lock(); + match v { + Ok(ctx) => { + if ctx.invalid { + None + } else { + Some(ctx) + } + } + Err(_) => None, + } + } + /// Returns total memtable memory usage in bytes. #[inline] pub fn memtable_memory_usage(&self) -> usize { diff --git a/analytic_engine/src/table/mod.rs b/analytic_engine/src/table/mod.rs index 82b2b54b9c..780b9cafcc 100644 --- a/analytic_engine/src/table/mod.rs +++ b/analytic_engine/src/table/mod.rs @@ -21,9 +21,9 @@ use table_engine::{ stream::{PartitionedStreams, SendableRecordBatchStream}, table::{ AlterOptions, AlterSchema, AlterSchemaRequest, Compact, Flush, FlushRequest, Get, - GetInvalidPrimaryKey, GetNullPrimaryKey, GetRequest, MergeWrite, ReadOptions, ReadOrder, - ReadRequest, Result, Scan, Table, TableId, TableStats, TooManyPendingWrites, - WaitForPendingWrites, Write, WriteRequest, + GetInvalidPrimaryKey, GetNullPrimaryKey, GetRequest, MergeWrite, OperateClosedTable, + ReadOptions, ReadOrder, ReadRequest, Result, Scan, Table, TableId, TableStats, + TooManyPendingWrites, WaitForPendingWrites, Write, WriteRequest, }, ANALYTIC_ENGINE_TYPE, }; @@ -255,7 +255,7 @@ impl TableImpl { // This is the first request in the queue, and we should // take responsibilities for merging and writing the // requests in the queue. - let serial_exec = self.table_data.serial_exec.lock().await; + let serial_exec_ctx = self.table_data.acquire_serial_exec_ctx().await; // The `serial_exec` is acquired, let's merge the pending requests and write // them all. let pending_writes = { @@ -266,9 +266,22 @@ impl TableImpl { !pending_writes.is_empty(), "The pending writes should contain at least the one just pushed." ); - let merged_write_request = - merge_pending_write_requests(pending_writes.writes, pending_writes.num_rows); - (merged_write_request, serial_exec, pending_writes.notifiers) + match serial_exec_ctx { + Some(v) => { + let merged_write_request = merge_pending_write_requests( + pending_writes.writes, + pending_writes.num_rows, + ); + (merged_write_request, v, pending_writes.notifiers) + } + None => { + // The table has been closed, notify all the waiters in + // the queue. + let write_err = OperateClosedTable.fail(); + self.notify_waiters(pending_writes.notifiers, &write_err); + return write_err; + } + } } QueueResult::Waiter(rx) => { // The request is successfully pushed into the queue, and just wait for the @@ -303,12 +316,18 @@ impl TableImpl { .box_err() .context(Write { table: self.name() }); - // There is no waiter for pending writes, return the write result. - if notifiers.is_empty() { - return write_res; - } - // Notify the waiters for the pending writes. + self.notify_waiters(notifiers, &write_res); + + write_res.map(|_| num_rows) + } + + #[inline] + fn should_queue_write_request(&self, request: &WriteRequest) -> bool { + request.row_group.num_rows() < self.instance.max_rows_in_write_queue + } + + fn notify_waiters(&self, notifiers: Vec>>, write_res: &Result) { match write_res { Ok(_) => { for notifier in notifiers { @@ -319,7 +338,6 @@ impl TableImpl { ); } } - Ok(num_rows) } Err(e) => { let err_msg = format!("Failed to do merge write, err:{e}"); @@ -332,15 +350,9 @@ impl TableImpl { ); } } - Err(e) } } } - - #[inline] - fn should_queue_write_request(&self, request: &WriteRequest) -> bool { - request.row_group.num_rows() < self.instance.max_rows_in_write_queue - } } #[async_trait] @@ -384,11 +396,15 @@ impl Table for TableImpl { return self.write_with_pending_queue(request).await; } - let mut serial_exec = self.table_data.serial_exec.lock().await; + let mut serial_exec_ctx = self + .table_data + .acquire_serial_exec_ctx() + .await + .context(OperateClosedTable)?; let mut writer = Writer::new( self.instance.clone(), self.space_table.clone(), - &mut serial_exec, + &mut serial_exec_ctx, ); writer .write(request) @@ -501,10 +517,14 @@ impl Table for TableImpl { } async fn alter_schema(&self, request: AlterSchemaRequest) -> Result { - let mut serial_exec = self.table_data.serial_exec.lock().await; + let mut serial_exec_ctx = self + .table_data + .acquire_serial_exec_ctx() + .await + .context(OperateClosedTable)?; let mut alterer = Alterer::new( self.table_data.clone(), - &mut serial_exec, + &mut serial_exec_ctx, self.instance.clone(), ) .await; @@ -518,10 +538,14 @@ impl Table for TableImpl { } async fn alter_options(&self, options: HashMap) -> Result { - let mut serial_exec = self.table_data.serial_exec.lock().await; + let mut serial_exec_ctx = self + .table_data + .acquire_serial_exec_ctx() + .await + .context(OperateClosedTable)?; let alterer = Alterer::new( self.table_data.clone(), - &mut serial_exec, + &mut serial_exec_ctx, self.instance.clone(), ) .await; diff --git a/analytic_engine/src/table/version.rs b/analytic_engine/src/table/version.rs index 329c09677a..ddb7ffd21d 100644 --- a/analytic_engine/src/table/version.rs +++ b/analytic_engine/src/table/version.rs @@ -727,9 +727,9 @@ impl TableVersion { picker_ctx: PickerContext, picker: &CompactionPickerRef, ) -> picker::Result { - let inner = self.inner.read().unwrap(); + let mut inner = self.inner.write().unwrap(); - picker.pick_compaction(picker_ctx, &inner.levels_controller) + picker.pick_compaction(picker_ctx, &mut inner.levels_controller) } pub fn has_expired_sst(&self, expire_time: Option) -> bool { diff --git a/common_types/src/schema.rs b/common_types/src/schema.rs index c5db4d0636..d94f10e52a 100644 --- a/common_types/src/schema.rs +++ b/common_types/src/schema.rs @@ -690,7 +690,7 @@ impl Schema { self.column_schemas.num_columns() } - /// Returns true if idx is primary key idnex + /// Returns true if idx is primary key index pub fn is_primary_key_index(&self, idx: &usize) -> bool { self.primary_key_indexes.contains(idx) } diff --git a/components/parquet_ext/Cargo.toml b/components/parquet_ext/Cargo.toml index ba31703d18..1b4b4b23c6 100644 --- a/components/parquet_ext/Cargo.toml +++ b/components/parquet_ext/Cargo.toml @@ -17,8 +17,6 @@ async-trait = { workspace = true } bytes = { workspace = true } common_util = { workspace = true } datafusion = { workspace = true } -futures = { workspace = true } log = { workspace = true } -object_store = { workspace = true } parquet = { workspace = true } tokio = { workspace = true } diff --git a/components/parquet_ext/src/lib.rs b/components/parquet_ext/src/lib.rs index 7264b38dd6..cd413c0afc 100644 --- a/components/parquet_ext/src/lib.rs +++ b/components/parquet_ext/src/lib.rs @@ -2,7 +2,6 @@ pub mod meta_data; pub mod prune; -pub mod reader; pub mod reverse_reader; #[cfg(test)] pub mod tests; diff --git a/components/parquet_ext/src/meta_data.rs b/components/parquet_ext/src/meta_data.rs index 1a95bb4f7f..5670bc93f0 100644 --- a/components/parquet_ext/src/meta_data.rs +++ b/components/parquet_ext/src/meta_data.rs @@ -1,18 +1,15 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. -use std::{ops::Range, sync::Arc}; +use std::ops::Range; use async_trait::async_trait; use bytes::Bytes; use common_util::error::GenericResult; use parquet::{ - arrow::{arrow_reader::ArrowReaderOptions, ParquetRecordBatchStreamBuilder}, errors::{ParquetError, Result}, file::{footer, metadata::ParquetMetaData}, }; -use crate::reader::ObjectStoreReader; - #[async_trait] pub trait ChunkReader: Sync + Send { async fn get_bytes(&self, range: Range) -> GenericResult; @@ -68,21 +65,3 @@ pub async fn fetch_parquet_metadata( footer::decode_metadata(&metadata_bytes).map(|v| (v, metadata_len)) } - -/// Build page indexes for meta data -/// -/// TODO: Currently there is no method to build page indexes for meta data in -/// `parquet`, maybe we can write a issue in `arrow-rs` . -pub async fn meta_with_page_indexes( - object_store_reader: ObjectStoreReader, -) -> Result> { - let read_options = ArrowReaderOptions::new().with_page_index(true); - let builder = - ParquetRecordBatchStreamBuilder::new_with_options(object_store_reader, read_options) - .await - .map_err(|e| { - let err_msg = format!("failed to build page indexes in metadata, err:{e}"); - ParquetError::General(err_msg) - })?; - Ok(builder.metadata().clone()) -} diff --git a/components/parquet_ext/src/reader.rs b/components/parquet_ext/src/reader.rs deleted file mode 100644 index 3a5cd5f170..0000000000 --- a/components/parquet_ext/src/reader.rs +++ /dev/null @@ -1,81 +0,0 @@ -// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. - -use std::{ops::Range, sync::Arc, time::Instant}; - -use bytes::Bytes; -use futures::{ - future::{BoxFuture, FutureExt}, - TryFutureExt, -}; -use log::debug; -use object_store::{ObjectStoreRef, Path}; -use parquet::{arrow::async_reader::AsyncFileReader, file::metadata::ParquetMetaData}; - -/// Implemention AsyncFileReader based on `ObjectStore` -/// -/// TODO: Perhaps we should avoid importing `object_store` in `parquet_ext` to -/// keep the crate `parquet_ext` more pure. -#[derive(Clone)] -pub struct ObjectStoreReader { - storage: ObjectStoreRef, - path: Path, - meta_data: Arc, - begin: Instant, -} - -impl ObjectStoreReader { - pub fn new(storage: ObjectStoreRef, path: Path, meta_data: Arc) -> Self { - Self { - storage, - path, - meta_data, - begin: Instant::now(), - } - } -} - -impl Drop for ObjectStoreReader { - fn drop(&mut self) { - debug!( - "ObjectStoreReader dropped, path:{}, elapsed:{:?}", - &self.path, - self.begin.elapsed() - ); - } -} - -impl AsyncFileReader for ObjectStoreReader { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { - self.storage - .get_range(&self.path, range) - .map_err(|e| { - parquet::errors::ParquetError::General(format!( - "Failed to fetch range from object store, err:{e}" - )) - }) - .boxed() - } - - fn get_byte_ranges( - &mut self, - ranges: Vec>, - ) -> BoxFuture<'_, parquet::errors::Result>> { - async move { - self.storage - .get_ranges(&self.path, &ranges) - .map_err(|e| { - parquet::errors::ParquetError::General(format!( - "Failed to fetch ranges from object store, err:{e}" - )) - }) - .await - } - .boxed() - } - - fn get_metadata( - &mut self, - ) -> BoxFuture<'_, parquet::errors::Result>> { - Box::pin(async move { Ok(self.meta_data.clone()) }) - } -} diff --git a/table_engine/src/table.rs b/table_engine/src/table.rs index 821bdb6195..aea9f64cf5 100644 --- a/table_engine/src/table.rs +++ b/table_engine/src/table.rs @@ -135,6 +135,9 @@ pub enum Error { source: GenericError, }, + #[snafu(display("Try to operate a closed table.\nBacktrace:\n{backtrace}"))] + OperateClosedTable { backtrace: Backtrace }, + #[snafu(display( "Failed to wait for pending writes, table:{table}.\nBacktrace:\n{backtrace}" ))] diff --git a/tools/src/bin/sst-metadata.rs b/tools/src/bin/sst-metadata.rs index a089ad2da5..95765f9b8d 100644 --- a/tools/src/bin/sst-metadata.rs +++ b/tools/src/bin/sst-metadata.rs @@ -13,7 +13,7 @@ use common_util::{ }; use futures::StreamExt; use object_store::{LocalFileSystem, ObjectMeta, ObjectStoreRef, Path}; -use parquet_ext::{meta_data::fetch_parquet_metadata, reader::ObjectStoreReader}; +use parquet_ext::meta_data::fetch_parquet_metadata; use tokio::{runtime::Handle, task::JoinSet}; #[derive(Parser, Debug)] @@ -30,10 +30,6 @@ struct Args { /// Thread num, 0 means cpu num #[clap(short, long, default_value_t = 0)] threads: usize, - - /// Print page indexes - #[clap(short, long, required(false))] - page_indexes: bool, } #[derive(Default, Debug)] @@ -96,7 +92,6 @@ async fn run(args: Args) -> Result<()> { let mut join_set = JoinSet::new(); let mut ssts = storage.list(None).await?; let verbose = args.verbose; - let page_indexes = args.page_indexes; while let Some(object_meta) = ssts.next().await { let object_meta = object_meta?; let storage = storage.clone(); @@ -104,8 +99,7 @@ async fn run(args: Args) -> Result<()> { join_set.spawn_on( async move { let (metadata, metadata_size, kv_size) = - parse_metadata(storage, location, object_meta.size, verbose, page_indexes) - .await?; + parse_metadata(storage, location, object_meta.size, verbose).await?; Ok::<_, anyhow::Error>((object_meta, metadata, metadata_size, kv_size)) }, &handle, @@ -201,11 +195,9 @@ async fn parse_metadata( path: Path, size: usize, verbose: bool, - page_indexes: bool, ) -> Result<(MetaData, usize, usize)> { let reader = ChunkReaderAdapter::new(&path, &storage); let (parquet_metadata, metadata_size) = fetch_parquet_metadata(size, &reader).await?; - let kv_metadata = parquet_metadata.file_metadata().key_value_metadata(); let kv_size = kv_metadata .map(|kvs| { @@ -225,15 +217,6 @@ async fn parse_metadata( }) .unwrap_or(0); - let md = if page_indexes { - let object_store_reader = - ObjectStoreReader::new(storage, path.clone(), Arc::new(parquet_metadata)); - let parquet_metadata = - parquet_ext::meta_data::meta_with_page_indexes(object_store_reader).await?; - MetaData::try_new(&parquet_metadata, false)? - } else { - MetaData::try_new(&parquet_metadata, false)? - }; - + let md = MetaData::try_new(&parquet_metadata, false)?; Ok((md, metadata_size, kv_size)) }