diff --git a/Cargo.lock b/Cargo.lock index 1b3daaa1bd..d4183bacca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4202,7 +4202,9 @@ dependencies = [ "bytes", "common_util", "datafusion", + "futures 0.3.28", "log", + "object_store 1.2.2", "parquet", "tokio", ] diff --git a/analytic_engine/src/compaction/mod.rs b/analytic_engine/src/compaction/mod.rs index e0485522b1..bcbead4af9 100644 --- a/analytic_engine/src/compaction/mod.rs +++ b/analytic_engine/src/compaction/mod.rs @@ -318,26 +318,13 @@ pub struct ExpiredFiles { #[derive(Default, Clone)] pub struct CompactionTask { - inputs: Vec, - expired: Vec, -} - -impl Drop for CompactionTask { - fn drop(&mut self) { - // When a CompactionTask is dropped, it means - // 1. the task finished successfully, or - // 2. the task is cancelled for some reason, like memory limit - // - // In case 2, we need to mark files as not compacted in order for them to be - // scheduled again. In case 1, the files will be moved out of level controller, - // so it doesn't care what the flag is, so it's safe to set false here. - self.mark_files_being_compacted(false); - } + pub compaction_inputs: Vec, + pub expired: Vec, } impl CompactionTask { - fn mark_files_being_compacted(&self, being_compacted: bool) { - for input in &self.inputs { + pub fn mark_files_being_compacted(&self, being_compacted: bool) { + for input in &self.compaction_inputs { for file in &input.files { file.set_being_compacted(being_compacted); } @@ -350,10 +337,9 @@ impl CompactionTask { } // Estimate the size of the total input files. - #[inline] pub fn estimated_total_input_file_size(&self) -> usize { let total_input_size: u64 = self - .inputs + .compaction_inputs .iter() .map(|v| v.files.iter().map(|f| f.size()).sum::()) .sum(); @@ -361,65 +347,19 @@ impl CompactionTask { total_input_size as usize } - #[inline] pub fn num_compact_files(&self) -> usize { - self.inputs.iter().map(|v| v.files.len()).sum() + self.compaction_inputs.iter().map(|v| v.files.len()).sum() } - #[inline] - pub fn is_empty(&self) -> bool { - self.is_input_empty() && self.expired.is_empty() - } - - #[inline] - pub fn is_input_empty(&self) -> bool { - self.inputs.is_empty() - } - - #[inline] - pub fn expired(&self) -> &[ExpiredFiles] { - &self.expired - } - - #[inline] - pub fn inputs(&self) -> &[CompactionInputFiles] { - &self.inputs - } -} - -pub struct CompactionTaskBuilder { - expired: Vec, - inputs: Vec, -} - -impl CompactionTaskBuilder { - pub fn with_expired(expired: Vec) -> Self { - Self { - expired, - inputs: Vec::new(), - } - } - - pub fn add_inputs(&mut self, files: CompactionInputFiles) { - self.inputs.push(files); - } - - pub fn build(self) -> CompactionTask { - let task = CompactionTask { - expired: self.expired, - inputs: self.inputs, - }; - - task.mark_files_being_compacted(true); - - task + pub fn num_expired_files(&self) -> usize { + self.expired.iter().map(|v| v.files.len()).sum() } } impl fmt::Debug for CompactionTask { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("CompactionTask") - .field("inputs", &self.inputs) + .field("inputs", &self.compaction_inputs) .field( "expired", &self @@ -440,12 +380,36 @@ impl fmt::Debug for CompactionTask { } } -#[derive(Default)] -pub struct PickerManager; +pub struct PickerManager { + default_picker: CompactionPickerRef, + time_window_picker: CompactionPickerRef, + size_tiered_picker: CompactionPickerRef, +} + +impl Default for PickerManager { + fn default() -> Self { + let size_tiered_picker = Arc::new(CommonCompactionPicker::new( + CompactionStrategy::SizeTiered(SizeTieredCompactionOptions::default()), + )); + let time_window_picker = Arc::new(CommonCompactionPicker::new( + CompactionStrategy::TimeWindow(TimeWindowCompactionOptions::default()), + )); + + Self { + default_picker: time_window_picker.clone(), + size_tiered_picker, + time_window_picker, + } + } +} impl PickerManager { pub fn get_picker(&self, strategy: CompactionStrategy) -> CompactionPickerRef { - Arc::new(CommonCompactionPicker::new(strategy)) + match strategy { + CompactionStrategy::Default => self.default_picker.clone(), + CompactionStrategy::SizeTiered(_) => self.size_tiered_picker.clone(), + CompactionStrategy::TimeWindow(_) => self.time_window_picker.clone(), + } } } diff --git a/analytic_engine/src/compaction/picker.rs b/analytic_engine/src/compaction/picker.rs index 784f0ba894..96600199f0 100644 --- a/analytic_engine/src/compaction/picker.rs +++ b/analytic_engine/src/compaction/picker.rs @@ -15,8 +15,8 @@ use snafu::Snafu; use crate::{ compaction::{ - CompactionInputFiles, CompactionStrategy, CompactionTask, CompactionTaskBuilder, - SizeTieredCompactionOptions, TimeWindowCompactionOptions, + CompactionInputFiles, CompactionStrategy, CompactionTask, SizeTieredCompactionOptions, + TimeWindowCompactionOptions, }, sst::{ file::{FileHandle, Level}, @@ -60,7 +60,7 @@ pub trait CompactionPicker { fn pick_compaction( &self, ctx: PickerContext, - levels_controller: &mut LevelsController, + levels_controller: &LevelsController, ) -> Result; } @@ -86,10 +86,10 @@ pub struct CommonCompactionPicker { impl CommonCompactionPicker { pub fn new(strategy: CompactionStrategy) -> Self { let level_picker: LevelPickerRef = match strategy { - CompactionStrategy::SizeTiered(_) => Arc::new(SizeTieredPicker::default()), - CompactionStrategy::TimeWindow(_) | CompactionStrategy::Default => { - Arc::new(TimeWindowPicker::default()) + CompactionStrategy::SizeTiered(_) | CompactionStrategy::Default => { + Arc::new(SizeTieredPicker::default()) } + CompactionStrategy::TimeWindow(_) => Arc::new(TimeWindowPicker::default()), }; Self { level_picker } } @@ -123,11 +123,13 @@ impl CompactionPicker for CommonCompactionPicker { fn pick_compaction( &self, ctx: PickerContext, - levels_controller: &mut LevelsController, + levels_controller: &LevelsController, ) -> Result { let expire_time = ctx.ttl.map(Timestamp::expire_time); - let mut builder = - CompactionTaskBuilder::with_expired(levels_controller.expired_ssts(expire_time)); + let mut compaction_task = CompactionTask { + expired: levels_controller.expired_ssts(expire_time), + ..Default::default() + }; if let Some(input_files) = self.pick_compact_candidates(&ctx, levels_controller, expire_time) @@ -137,10 +139,10 @@ impl CompactionPicker for CommonCompactionPicker { ctx.strategy, input_files ); - builder.add_inputs(input_files); + compaction_task.compaction_inputs = vec![input_files]; } - Ok(builder.build()) + Ok(compaction_task) } } @@ -732,39 +734,39 @@ mod tests { }; let now = Timestamp::now(); { - let mut lc = build_old_bucket_case(now.as_i64()); - let task = twp.pick_compaction(ctx.clone(), &mut lc).unwrap(); - assert_eq!(task.inputs[0].files.len(), 2); - assert_eq!(task.inputs[0].files[0].id(), 0); - assert_eq!(task.inputs[0].files[1].id(), 1); + let lc = build_old_bucket_case(now.as_i64()); + let task = twp.pick_compaction(ctx.clone(), &lc).unwrap(); + assert_eq!(task.compaction_inputs[0].files.len(), 2); + assert_eq!(task.compaction_inputs[0].files[0].id(), 0); + assert_eq!(task.compaction_inputs[0].files[1].id(), 1); assert_eq!(task.expired[0].files.len(), 1); assert_eq!(task.expired[0].files[0].id(), 3); } { - let mut lc = build_newest_bucket_case(now.as_i64()); - let task = twp.pick_compaction(ctx.clone(), &mut lc).unwrap(); - assert_eq!(task.inputs[0].files.len(), 4); - assert_eq!(task.inputs[0].files[0].id(), 2); - assert_eq!(task.inputs[0].files[1].id(), 3); - assert_eq!(task.inputs[0].files[2].id(), 4); - assert_eq!(task.inputs[0].files[3].id(), 5); + let lc = build_newest_bucket_case(now.as_i64()); + let task = twp.pick_compaction(ctx.clone(), &lc).unwrap(); + assert_eq!(task.compaction_inputs[0].files.len(), 4); + assert_eq!(task.compaction_inputs[0].files[0].id(), 2); + assert_eq!(task.compaction_inputs[0].files[1].id(), 3); + assert_eq!(task.compaction_inputs[0].files[2].id(), 4); + assert_eq!(task.compaction_inputs[0].files[3].id(), 5); } { - let mut lc = build_newest_bucket_no_match_case(now.as_i64()); - let task = twp.pick_compaction(ctx.clone(), &mut lc).unwrap(); - assert_eq!(task.inputs.len(), 0); + let lc = build_newest_bucket_no_match_case(now.as_i64()); + let task = twp.pick_compaction(ctx.clone(), &lc).unwrap(); + assert_eq!(task.compaction_inputs.len(), 0); } // If ttl is None, then no file is expired. ctx.ttl = None; { - let mut lc = build_old_bucket_case(now.as_i64()); - let task = twp.pick_compaction(ctx, &mut lc).unwrap(); - assert_eq!(task.inputs[0].files.len(), 2); - assert_eq!(task.inputs[0].files[0].id(), 0); - assert_eq!(task.inputs[0].files[1].id(), 1); + let lc = build_old_bucket_case(now.as_i64()); + let task = twp.pick_compaction(ctx, &lc).unwrap(); + assert_eq!(task.compaction_inputs[0].files.len(), 2); + assert_eq!(task.compaction_inputs[0].files[0].id(), 0); + assert_eq!(task.compaction_inputs[0].files[1].id(), 1); assert!(task.expired[0].files.is_empty()); } } diff --git a/analytic_engine/src/compaction/scheduler.rs b/analytic_engine/src/compaction/scheduler.rs index 866629678d..30cf277521 100644 --- a/analytic_engine/src/compaction/scheduler.rs +++ b/analytic_engine/src/compaction/scheduler.rs @@ -237,7 +237,7 @@ impl OngoingTaskLimit { if dropped > 0 { warn!( - "Too many compaction pending tasks, limit:{}, dropped:{}.", + "Too many compaction pending tasks, limit: {}, dropped {} older tasks.", self.max_pending_compaction_tasks, dropped, ); } @@ -462,7 +462,10 @@ impl ScheduleWorker { waiter_notifier: WaiterNotifier, token: MemoryUsageToken, ) { - let keep_scheduling_compaction = !compaction_task.is_input_empty(); + // Mark files being in compaction. + compaction_task.mark_files_being_compacted(true); + + let keep_scheduling_compaction = !compaction_task.compaction_inputs.is_empty(); let runtime = self.runtime.clone(); let space_store = self.space_store.clone(); @@ -500,6 +503,9 @@ impl ScheduleWorker { .await; if let Err(e) = &res { + // Compaction is failed, we need to unset the compaction mark. + compaction_task.mark_files_being_compacted(false); + error!( "Failed to compact table, table_name:{}, table_id:{}, request_id:{}, err:{}", table_data.name, table_data.id, request_id, e @@ -650,16 +656,7 @@ impl ScheduleWorker { self.max_unflushed_duration, ); - let mut serial_exec = if let Some(v) = table_data.acquire_serial_exec_ctx().await { - v - } else { - warn!( - "Table is closed, ignore this periodical flush, table:{}", - table_data.name - ); - continue; - }; - + let mut serial_exec = table_data.serial_exec.lock().await; let flush_scheduler = serial_exec.flush_scheduler(); // Instance flush the table asynchronously. if let Err(e) = flusher diff --git a/analytic_engine/src/instance/close.rs b/analytic_engine/src/instance/close.rs index d933b01214..f45199c164 100644 --- a/analytic_engine/src/instance/close.rs +++ b/analytic_engine/src/instance/close.rs @@ -3,12 +3,12 @@ //! Close table logic of instance use log::{info, warn}; -use snafu::{OptionExt, ResultExt}; +use snafu::ResultExt; use table_engine::engine::CloseTableRequest; use crate::{ instance::{ - engine::{DoManifestSnapshot, FlushTable, OperateClosedTable, Result}, + engine::{DoManifestSnapshot, FlushTable, Result}, flush_compaction::{Flusher, TableFlushOptions}, }, manifest::{ManifestRef, SnapshotRequest}, @@ -37,11 +37,8 @@ impl Closer { // Flush table. let opts = TableFlushOptions::default(); - let mut serial_exec_ctx = table_data - .acquire_serial_exec_ctx() - .await - .context(OperateClosedTable)?; - let flush_scheduler = serial_exec_ctx.flush_scheduler(); + let mut serial_exec = table_data.serial_exec.lock().await; + let flush_scheduler = serial_exec.flush_scheduler(); self.flusher .do_flush(flush_scheduler, &table_data, opts) @@ -70,10 +67,9 @@ impl Closer { let removed_table = self.space.remove_table(&request.table_name); assert!(removed_table.is_some()); - serial_exec_ctx.invalidate(); info!( - "table:{} has been removed from the space_id:{}, table_id:{}", - table_data.name, self.space.id, table_data.id, + "table:{}-{} has been removed from the space_id:{}", + table_data.name, table_data.id, self.space.id ); Ok(()) } diff --git a/analytic_engine/src/instance/drop.rs b/analytic_engine/src/instance/drop.rs index 08d673f820..ac6d1653fd 100644 --- a/analytic_engine/src/instance/drop.rs +++ b/analytic_engine/src/instance/drop.rs @@ -3,12 +3,12 @@ //! Drop table logic of instance use log::{info, warn}; -use snafu::{OptionExt, ResultExt}; +use snafu::ResultExt; use table_engine::engine::DropTableRequest; use crate::{ instance::{ - engine::{FlushTable, OperateClosedTable, Result, WriteManifest}, + engine::{FlushTable, Result, WriteManifest}, flush_compaction::{Flusher, TableFlushOptions}, SpaceStoreRef, }, @@ -36,10 +36,7 @@ impl Dropper { } }; - let mut serial_exec_ctx = table_data - .acquire_serial_exec_ctx() - .await - .context(OperateClosedTable)?; + let mut serial_exec = table_data.serial_exec.lock().await; if table_data.is_dropped() { warn!( @@ -54,7 +51,7 @@ impl Dropper { // be avoided. let opts = TableFlushOptions::default(); - let flush_scheduler = serial_exec_ctx.flush_scheduler(); + let flush_scheduler = serial_exec.flush_scheduler(); self.flusher .do_flush(flush_scheduler, &table_data, opts) .await diff --git a/analytic_engine/src/instance/engine.rs b/analytic_engine/src/instance/engine.rs index 562c97ff2e..e32fa064d5 100644 --- a/analytic_engine/src/instance/engine.rs +++ b/analytic_engine/src/instance/engine.rs @@ -23,21 +23,29 @@ use crate::{ #[derive(Debug, Snafu)] #[snafu(visibility(pub(crate)))] pub enum Error { - #[snafu(display("The space of the table does not exist, space_id:{space_id}, table:{table}.\nBacktrace:\n{backtrace}"))] + #[snafu(display( + "The space of the table does not exist, space_id:{}, table:{}.\nBacktrace:\n{}", + space_id, + table, + backtrace, + ))] SpaceNotExist { space_id: SpaceId, table: String, backtrace: Backtrace, }, - #[snafu(display("Failed to read meta update, table_id:{table_id}, err:{source}"))] + #[snafu(display("Failed to read meta update, table_id:{}, err:{}", table_id, source))] ReadMetaUpdate { table_id: TableId, source: GenericError, }, #[snafu(display( - "Failed to recover table data, space_id:{space_id}, table:{table}, err:{source}" + "Failed to recover table data, space_id:{}, table:{}, err:{}", + space_id, + table, + source ))] RecoverTableData { space_id: SpaceId, @@ -45,11 +53,14 @@ pub enum Error { source: crate::table::data::Error, }, - #[snafu(display("Failed to read wal, err:{source}"))] + #[snafu(display("Failed to read wal, err:{}", source))] ReadWal { source: wal::manager::Error }, #[snafu(display( - "Failed to apply log entry to memtable, table:{table}, table_id:{table_id}, err:{source}", + "Failed to apply log entry to memtable, table:{}, table_id:{}, err:{}", + table, + table_id, + source ))] ApplyMemTable { space_id: SpaceId, @@ -59,7 +70,11 @@ pub enum Error { }, #[snafu(display( - "Flush failed, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}", + "Flush failed, space_id:{}, table:{}, table_id:{}, err:{}", + space_id, + table, + table_id, + source ))] FlushTable { space_id: SpaceId, @@ -69,7 +84,11 @@ pub enum Error { }, #[snafu(display( - "Failed to persist meta update to manifest, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}", + "Failed to persist meta update to manifest, space_id:{}, table:{}, table_id:{}, err:{}", + space_id, + table, + table_id, + source ))] WriteManifest { space_id: SpaceId, @@ -79,7 +98,11 @@ pub enum Error { }, #[snafu(display( - "Failed to persist meta update to WAL, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}", + "Failed to persist meta update to WAL, space_id:{}, table:{}, table_id:{}, err:{}", + space_id, + table, + table_id, + source ))] WriteWal { space_id: SpaceId, @@ -89,7 +112,11 @@ pub enum Error { }, #[snafu(display( - "Invalid options, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}", + "Invalid options, space_id:{}, table:{}, table_id:{}, err:{}", + space_id, + table, + table_id, + source ))] InvalidOptions { space_id: SpaceId, @@ -99,7 +126,11 @@ pub enum Error { }, #[snafu(display( - "Failed to create table data, space_id:{space_id}, table:{table}, table_id:{table_id}, err:{source}", + "Failed to create table data, space_id:{}, table:{}, table_id:{}, err:{}", + space_id, + table, + table_id, + source ))] CreateTableData { space_id: SpaceId, @@ -109,8 +140,11 @@ pub enum Error { }, #[snafu(display( - "Try to update schema to elder version, table:{table}, current_version:{current_version}, \ - given_version:{given_version}.\nBacktrace:\n{backtrace}", + "Try to update schema to elder version, table:{}, current_version:{}, given_version:{}.\nBacktrace:\n{}", + table, + current_version, + given_version, + backtrace, ))] InvalidSchemaVersion { table: String, @@ -120,8 +154,11 @@ pub enum Error { }, #[snafu(display( - "Invalid previous schema version, table:{table}, current_version:{current_version}, \ - pre_version:{pre_version}.\nBacktrace:\n{backtrace}", + "Invalid previous schema version, table:{}, current_version:{}, pre_version:{}.\nBacktrace:\n{}", + table, + current_version, + pre_version, + backtrace, ))] InvalidPreVersion { table: String, @@ -130,14 +167,21 @@ pub enum Error { backtrace: Backtrace, }, - #[snafu(display("Alter schema of a dropped table:{table}.\nBacktrace:\n{backtrace}"))] + #[snafu(display( + "Alter schema of a dropped table:{}.\nBacktrace:\n{}", + table, + backtrace + ))] AlterDroppedTable { table: String, backtrace: Backtrace }, - #[snafu(display("Failed to store version edit, err:{source}"))] + #[snafu(display("Failed to store version edit, err:{}", source))] StoreVersionEdit { source: GenericError }, #[snafu(display( - "Failed to encode payloads, table:{table}, wal_location:{wal_location:?}, err:{source}" + "Failed to encode payloads, table:{}, wal_location:{:?}, err:{}", + table, + wal_location, + source ))] EncodePayloads { table: String, @@ -146,7 +190,10 @@ pub enum Error { }, #[snafu(display( - "Failed to do manifest snapshot for table, space_id:{space_id}, table:{table}, err:{source}", + "Failed to do manifest snapshot for table, space_id:{}, table:{}, err:{}", + space_id, + table, + source ))] DoManifestSnapshot { space_id: SpaceId, @@ -155,31 +202,30 @@ pub enum Error { }, #[snafu(display( - "Table open failed and can not be created again, table:{table}.\nBacktrace:\n{backtrace}", + "Table open failed and can not be created again, table:{}.\nBacktrace:\n{}", + table, + backtrace, ))] CreateOpenFailedTable { table: String, backtrace: Backtrace }, - #[snafu(display("Failed to open manifest, err:{source}"))] + #[snafu(display("Failed to open manifest, err:{}", source))] OpenManifest { source: crate::manifest::details::Error, }, - #[snafu(display("Failed to find table, msg:{msg}.\nBacktrace:\n{backtrace}"))] + #[snafu(display("Failed to find table, msg:{}.\nBacktrace:\n{}", msg, backtrace))] TableNotExist { msg: String, backtrace: Backtrace }, - #[snafu(display("Failed to open shard, msg:{msg}.\nBacktrace:\n{backtrace}"))] + #[snafu(display("Failed to open shard, msg:{}.\nBacktrace:\n{}", msg, backtrace))] OpenTablesOfShard { msg: String, backtrace: Backtrace }, - #[snafu(display("Try to operate a closed table.\nBacktrace:\n{backtrace}"))] - OperateClosedTable { backtrace: Backtrace }, - - #[snafu(display("Failed to replay wal, msg:{msg:?}, err:{source}"))] + #[snafu(display("Failed to replay wal, msg:{:?}, err:{}", msg, source))] ReplayWalWithCause { msg: Option, source: GenericError, }, - #[snafu(display("Failed to replay wal, msg:{msg:?}.\nBacktrace:\n{backtrace}"))] + #[snafu(display("Failed to replay wal, msg:{:?}.\nBacktrace:\n{}", msg, backtrace))] ReplayWalNoCause { msg: Option, backtrace: Backtrace, @@ -218,7 +264,6 @@ impl From for table_engine::engine::Error { | Error::TableNotExist { .. } | Error::OpenTablesOfShard { .. } | Error::ReplayWalNoCause { .. } - | Error::OperateClosedTable { .. } | Error::ReplayWalWithCause { .. } => Self::Unexpected { source: Box::new(err), }, diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index 03b2f30337..4e860def74 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -669,23 +669,22 @@ impl SpaceStore { "Begin compact table, table_name:{}, id:{}, task:{:?}", table_data.name, table_data.id, task ); - let inputs = task.inputs(); let mut edit_meta = VersionEditMeta { space_id: table_data.space_id, table_id: table_data.id, flushed_sequence: 0, // Use the number of compaction inputs as the estimated number of files to add. - files_to_add: Vec::with_capacity(inputs.len()), + files_to_add: Vec::with_capacity(task.compaction_inputs.len()), files_to_delete: vec![], mems_to_remove: vec![], }; - if task.is_empty() { + if task.num_expired_files() == 0 && task.num_compact_files() == 0 { // Nothing to compact. return Ok(()); } - for files in task.expired() { + for files in &task.expired { self.delete_expired_files(table_data, request_id, files, &mut edit_meta); } @@ -697,7 +696,7 @@ impl SpaceStore { task.num_compact_files(), ); - for input in inputs { + for input in &task.compaction_inputs { self.compact_input_files( request_id, table_data, diff --git a/analytic_engine/src/instance/mod.rs b/analytic_engine/src/instance/mod.rs index 492178b41c..1faf254f08 100644 --- a/analytic_engine/src/instance/mod.rs +++ b/analytic_engine/src/instance/mod.rs @@ -28,14 +28,14 @@ use common_util::{ }; use log::{error, info}; use mem_collector::MemUsageCollector; -use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; +use snafu::{ResultExt, Snafu}; use table_engine::{engine::EngineRuntimes, table::FlushRequest}; use tokio::sync::oneshot::{self, error::RecvError}; use wal::manager::{WalLocation, WalManagerRef}; +use self::flush_compaction::{Flusher, TableFlushOptions}; use crate::{ compaction::{scheduler::CompactionSchedulerRef, TableCompactionRequest}, - instance::flush_compaction::{Flusher, TableFlushOptions}, manifest::ManifestRef, row_iter::IterOptions, space::{SpaceId, SpaceRef, SpacesRef}, @@ -66,9 +66,6 @@ pub enum Error { source: GenericError, }, - #[snafu(display("Try to operate a closed table, table:{table}.\nBacktrace:\n{backtrace}"))] - OperateClosedTable { table: String, backtrace: Backtrace }, - #[snafu(display("Failed to receive {} result, table:{}, err:{}", op, table, source))] RecvManualOpResult { op: String, @@ -198,13 +195,7 @@ impl Instance { }; let flusher = self.make_flusher(); - let mut serial_exec = - table_data - .acquire_serial_exec_ctx() - .await - .context(OperateClosedTable { - table: &table_data.name, - })?; + let mut serial_exec = table_data.serial_exec.lock().await; let flush_scheduler = serial_exec.flush_scheduler(); flusher .schedule_flush(flush_scheduler, table_data, flush_opts) diff --git a/analytic_engine/src/instance/wal_replayer.rs b/analytic_engine/src/instance/wal_replayer.rs index 0144833ec6..5d494450e4 100644 --- a/analytic_engine/src/instance/wal_replayer.rs +++ b/analytic_engine/src/instance/wal_replayer.rs @@ -12,7 +12,7 @@ use async_trait::async_trait; use common_types::{schema::IndexInWriterSchema, table::ShardId}; use common_util::error::BoxError; use log::{debug, error, info, trace}; -use snafu::{OptionExt, ResultExt}; +use snafu::ResultExt; use table_engine::table::TableId; use tokio::sync::MutexGuard; use wal::{ @@ -25,13 +25,13 @@ use wal::{ use crate::{ instance::{ self, - engine::{Error, OperateClosedTable, ReplayWalWithCause, Result}, + engine::{Error, ReplayWalWithCause, Result}, flush_compaction::{Flusher, TableFlushOptions}, serial_executor::TableOpSerialExecutor, write::MemTableWriter, }, payload::{ReadPayload, WalDecoder}, - table::data::{SerialExecContext, TableDataRef}, + table::data::TableDataRef, }; /// Wal replayer supporting both table based and region based @@ -182,10 +182,7 @@ impl TableBasedReplay { .box_err() .context(ReplayWalWithCause { msg: None })?; - let mut serial_exec = table_data - .acquire_serial_exec_ctx() - .await - .context(OperateClosedTable)?; + let mut serial_exec = table_data.serial_exec.lock().await; let mut log_entry_buf = VecDeque::with_capacity(context.wal_replay_batch_size); loop { // fetch entries to log_entry_buf @@ -267,17 +264,14 @@ impl RegionBasedReplay { let mut log_entry_buf = VecDeque::with_capacity(context.wal_replay_batch_size); // Lock all related tables. - let mut replay_table_ctxs = HashMap::with_capacity(table_datas.len()); + let mut serial_exec_ctxs = HashMap::with_capacity(table_datas.len()); for table_data in table_datas { - let serial_exec_ctx = table_data - .acquire_serial_exec_ctx() - .await - .context(OperateClosedTable)?; - let replay_table_ctx = TableReplayContext { + let serial_exec = table_data.serial_exec.lock().await; + let serial_exec_ctx = SerialExecContext { table_data: table_data.clone(), - serial_exec_ctx, + serial_exec, }; - replay_table_ctxs.insert(table_data.id, replay_table_ctx); + serial_exec_ctxs.insert(table_data.id, serial_exec_ctx); } // Split and replay logs. @@ -293,7 +287,7 @@ impl RegionBasedReplay { break; } - Self::replay_single_batch(context, &log_entry_buf, &mut replay_table_ctxs, faileds) + Self::replay_single_batch(context, &log_entry_buf, &mut serial_exec_ctxs, faileds) .await?; } @@ -303,7 +297,7 @@ impl RegionBasedReplay { async fn replay_single_batch( context: &ReplayContext, log_batch: &VecDeque>, - serial_exec_ctxs: &mut HashMap>, + serial_exec_ctxs: &mut HashMap>, faileds: &mut FailedTables, ) -> Result<()> { let mut table_batches = Vec::new(); @@ -323,7 +317,7 @@ impl RegionBasedReplay { let result = replay_table_log_entries( &context.flusher, context.max_retry_flush_limit, - &mut ctx.serial_exec_ctx, + &mut ctx.serial_exec, &ctx.table_data, log_batch.range(table_batch.range), ) @@ -397,9 +391,9 @@ struct TableBatch { range: Range, } -struct TableReplayContext<'a> { +struct SerialExecContext<'a> { table_data: TableDataRef, - serial_exec_ctx: MutexGuard<'a, SerialExecContext>, + serial_exec: MutexGuard<'a, TableOpSerialExecutor>, } /// Replay all log entries into memtable and flush if necessary diff --git a/analytic_engine/src/instance/write.rs b/analytic_engine/src/instance/write.rs index 6017686b9a..219e86102c 100644 --- a/analytic_engine/src/instance/write.rs +++ b/analytic_engine/src/instance/write.rs @@ -608,9 +608,9 @@ impl<'a> Writer<'a> { "Try to trigger flush of other table:{} from the write procedure of table:{}", table_data.name, self.table_data.name ); - match table_data.try_acquire_serial_exec_ctx() { - Some(mut serial_exec_ctx) => { - let flush_scheduler = serial_exec_ctx.flush_scheduler(); + match table_data.serial_exec.try_lock() { + Ok(mut serial_exec) => { + let flush_scheduler = serial_exec.flush_scheduler(); // Set `block_on_write_thread` to false and let flush do in background. flusher .schedule_flush(flush_scheduler, table_data, opts) @@ -619,7 +619,7 @@ impl<'a> Writer<'a> { table: &table_data.name, }) } - None => { + Err(_) => { warn!( "Failed to acquire write lock for flush table:{}", table_data.name, diff --git a/analytic_engine/src/sst/meta_data/cache.rs b/analytic_engine/src/sst/meta_data/cache.rs index 5e2bacdcbd..296c4e2476 100644 --- a/analytic_engine/src/sst/meta_data/cache.rs +++ b/analytic_engine/src/sst/meta_data/cache.rs @@ -1,4 +1,4 @@ -// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. use std::{ fmt::Debug, @@ -7,7 +7,7 @@ use std::{ use lru::LruCache; use parquet::file::metadata::FileMetaData; -use snafu::{ensure, OptionExt, ResultExt}; +use snafu::{OptionExt, ResultExt}; use crate::sst::{ meta_data::{DecodeCustomMetaData, KvMetaDataNotFound, ParquetMetaDataRef, Result}, @@ -39,24 +39,14 @@ impl MetaData { let kv_metas = file_meta_data .key_value_metadata() .context(KvMetaDataNotFound)?; - - ensure!(!kv_metas.is_empty(), KvMetaDataNotFound); - let mut other_kv_metas = Vec::with_capacity(kv_metas.len() - 1); - let mut custom_kv_meta = None; - for kv_meta in kv_metas { - // Remove our extended custom meta data from the parquet metadata for small - // memory consumption in the cache. - if kv_meta.key == encoding::META_KEY { - custom_kv_meta = Some(kv_meta); - } else { - other_kv_metas.push(kv_meta.clone()); - } - } + let kv_meta = kv_metas + .iter() + .find(|kv| kv.key == encoding::META_KEY) + .context(KvMetaDataNotFound)?; let custom = { - let custom_kv_meta = custom_kv_meta.context(KvMetaDataNotFound)?; let mut sst_meta = - encoding::decode_sst_meta_data(custom_kv_meta).context(DecodeCustomMetaData)?; + encoding::decode_sst_meta_data(kv_meta).context(DecodeCustomMetaData)?; if ignore_sst_filter { sst_meta.parquet_filter = None; } @@ -66,17 +56,13 @@ impl MetaData { // let's build a new parquet metadata without the extended key value // metadata. - let other_kv_metas = if other_kv_metas.is_empty() { - None - } else { - Some(other_kv_metas) - }; let parquet = { let thin_file_meta_data = FileMetaData::new( file_meta_data.version(), file_meta_data.num_rows(), file_meta_data.created_by().map(|v| v.to_string()), - other_kv_metas, + // Remove the key value metadata. + None, file_meta_data.schema_descr_ptr(), file_meta_data.column_orders().cloned(), ); @@ -125,153 +111,3 @@ impl MetaCache { self.cache.write().unwrap().put(key, value); } } - -#[cfg(test)] -mod tests { - use std::{fs::File, path::Path, sync::Arc}; - - use arrow::{ - array::UInt64Builder, - datatypes::{DataType, Field, Schema}, - record_batch::RecordBatch, - }; - use bytes::Bytes; - use common_types::{ - column_schema::Builder as ColumnSchemaBuilder, - schema::Builder as CustomSchemaBuilder, - time::{TimeRange, Timestamp}, - }; - use parquet::{arrow::ArrowWriter, file::footer}; - use parquet_ext::ParquetMetaData; - - use super::MetaData; - use crate::sst::parquet::{encoding, meta_data::ParquetMetaData as CustomParquetMetaData}; - - fn check_parquet_meta_data(original: &ParquetMetaData, processed: &ParquetMetaData) { - assert_eq!(original.page_indexes(), processed.page_indexes()); - assert_eq!(original.offset_indexes(), processed.offset_indexes()); - assert_eq!(original.num_row_groups(), processed.num_row_groups()); - assert_eq!(original.row_groups(), processed.row_groups()); - - let original_file_md = original.file_metadata(); - let processed_file_md = processed.file_metadata(); - assert_eq!(original_file_md.num_rows(), processed_file_md.num_rows()); - assert_eq!(original_file_md.version(), processed_file_md.version()); - assert_eq!( - original_file_md.created_by(), - processed_file_md.created_by() - ); - assert_eq!(original_file_md.schema(), processed_file_md.schema()); - assert_eq!( - original_file_md.schema_descr(), - processed_file_md.schema_descr() - ); - assert_eq!( - original_file_md.schema_descr_ptr(), - processed_file_md.schema_descr_ptr() - ); - assert_eq!( - original_file_md.column_orders(), - processed_file_md.column_orders() - ); - - if let Some(kv_metas) = original_file_md.key_value_metadata() { - let processed_kv_metas = processed_file_md.key_value_metadata().unwrap(); - assert_eq!(kv_metas.len(), processed_kv_metas.len() + 1); - let mut idx_for_processed = 0; - for kv in kv_metas { - if kv.key == encoding::META_KEY { - continue; - } - assert_eq!(kv, &processed_kv_metas[idx_for_processed]); - idx_for_processed += 1; - } - } else { - assert!(processed_file_md.key_value_metadata().is_none()); - } - } - - fn write_parquet_file_with_metadata( - parquet_file_path: &Path, - custom_meta_data: &CustomParquetMetaData, - ) { - let tsid_array = { - let mut builder = UInt64Builder::new(); - builder.append_value(10); - builder.append_null(); - builder.append_value(11); - builder.finish() - }; - let timestamp_array = { - let mut builder = UInt64Builder::new(); - builder.append_value(1000); - builder.append_null(); - builder.append_value(1001); - builder.finish() - }; - let file = File::create(parquet_file_path).unwrap(); - let schema = Schema::new(vec![ - Field::new("tsid", DataType::UInt64, true), - Field::new("timestamp", DataType::UInt64, true), - ]); - - let batch = RecordBatch::try_new( - Arc::new(schema), - vec![Arc::new(tsid_array), Arc::new(timestamp_array)], - ) - .unwrap(); - let mut writer = ArrowWriter::try_new(file, batch.schema(), None).unwrap(); - - let encoded_meta_data = encoding::encode_sst_meta_data(custom_meta_data.clone()).unwrap(); - writer.append_key_value_metadata(encoded_meta_data); - - writer.write(&batch).unwrap(); - writer.close().unwrap(); - } - - #[test] - fn test_arrow_meta_data() { - let temp_dir = tempfile::tempdir().unwrap(); - let parquet_file_path = temp_dir.path().join("test_arrow_meta_data.par"); - let schema = { - let tsid_column_schema = ColumnSchemaBuilder::new( - "tsid".to_string(), - common_types::datum::DatumKind::UInt64, - ) - .build() - .unwrap(); - let timestamp_column_schema = ColumnSchemaBuilder::new( - "timestamp".to_string(), - common_types::datum::DatumKind::Timestamp, - ) - .build() - .unwrap(); - CustomSchemaBuilder::new() - .auto_increment_column_id(true) - .add_key_column(tsid_column_schema) - .unwrap() - .add_key_column(timestamp_column_schema) - .unwrap() - .build() - .unwrap() - }; - let custom_meta_data = CustomParquetMetaData { - min_key: Bytes::from_static(&[0, 1]), - max_key: Bytes::from_static(&[2, 2]), - time_range: TimeRange::new_unchecked(Timestamp::new(0), Timestamp::new(10)), - max_sequence: 1001, - schema, - parquet_filter: None, - collapsible_cols_idx: vec![], - }; - write_parquet_file_with_metadata(parquet_file_path.as_path(), &custom_meta_data); - - let parquet_file = File::open(parquet_file_path.as_path()).unwrap(); - let parquet_meta_data = footer::parse_metadata(&parquet_file).unwrap(); - - let meta_data = MetaData::try_new(&parquet_meta_data, false).unwrap(); - - assert_eq!(**meta_data.custom(), custom_meta_data); - check_parquet_meta_data(&parquet_meta_data, meta_data.parquet()); - } -} diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index 47478eca6a..735e0cd9d2 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -1,4 +1,4 @@ -// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. //! Sst reader implementation based on parquet. @@ -30,17 +30,14 @@ use datafusion::{ metrics::ExecutionPlanMetricsSet, }, }; -use futures::{future::BoxFuture, FutureExt, Stream, StreamExt, TryFutureExt}; +use futures::{Stream, StreamExt}; use log::{debug, error}; use object_store::{ObjectStoreRef, Path}; use parquet::{ - arrow::{ - arrow_reader::RowSelection, async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder, - ProjectionMask, - }, + arrow::{arrow_reader::RowSelection, ParquetRecordBatchStreamBuilder, ProjectionMask}, file::metadata::RowGroupMetaData, }; -use parquet_ext::meta_data::ChunkReader; +use parquet_ext::{meta_data::ChunkReader, reader::ObjectStoreReader}; use snafu::ResultExt; use table_engine::predicate::PredicateRef; use tokio::sync::mpsc::{self, Receiver, Sender}; @@ -281,13 +278,23 @@ impl<'a> Reader<'a> { let mut streams = Vec::with_capacity(target_row_group_chunks.len()); for chunk in target_row_group_chunks { - let object_store_reader = - ObjectStoreReader::new(self.store.clone(), self.path.clone(), meta_data.clone()); + let object_store_reader = ObjectStoreReader::new( + self.store.clone(), + self.path.clone(), + parquet_metadata.clone(), + ); let mut builder = ParquetRecordBatchStreamBuilder::new(object_store_reader) .await .with_context(|| ParquetError)?; + let row_selection = self.build_row_selection(arrow_schema.clone(), &chunk, parquet_metadata)?; + + debug!( + "Build row selection for file path:{}, result:{row_selection:?}, page indexes:{}", + self.path, + parquet_metadata.page_indexes().is_some() + ); if let Some(selection) = row_selection { builder = builder.with_row_selection(selection); }; @@ -340,18 +347,32 @@ impl<'a> Reader<'a> { Ok(file_size) } - async fn load_meta_data_from_storage(&self) -> Result { + async fn load_meta_data_from_storage(&self, ignore_sst_filter: bool) -> Result { let file_size = self.load_file_size().await?; let chunk_reader_adapter = ChunkReaderAdapter::new(self.path, self.store); - let (meta_data, _) = + let (parquet_meta_data, _) = parquet_ext::meta_data::fetch_parquet_metadata(file_size, &chunk_reader_adapter) .await .with_context(|| FetchAndDecodeSstMeta { file_path: self.path.to_string(), })?; - Ok(meta_data) + let object_store_reader = parquet_ext::reader::ObjectStoreReader::new( + self.store.clone(), + self.path.clone(), + Arc::new(parquet_meta_data), + ); + + let parquet_meta_data = parquet_ext::meta_data::meta_with_page_indexes(object_store_reader) + .await + .with_context(|| DecodePageIndexes { + file_path: self.path.to_string(), + })?; + + MetaData::try_new(&parquet_meta_data, ignore_sst_filter) + .box_err() + .context(DecodeSstMeta) } fn need_update_cache(&self) -> bool { @@ -375,12 +396,8 @@ impl<'a> Reader<'a> { let empty_predicate = self.predicate.exprs().is_empty(); let meta_data = { - let parquet_meta_data = self.load_meta_data_from_storage().await?; - let ignore_sst_filter = avoid_update_cache && empty_predicate; - MetaData::try_new(&parquet_meta_data, ignore_sst_filter) - .box_err() - .context(DecodeSstMeta)? + self.load_meta_data_from_storage(ignore_sst_filter).await? }; if avoid_update_cache || self.meta_cache.is_none() { @@ -413,71 +430,6 @@ impl<'a> Drop for Reader<'a> { } } -#[derive(Clone)] -struct ObjectStoreReader { - storage: ObjectStoreRef, - path: Path, - meta_data: MetaData, - begin: Instant, -} - -impl ObjectStoreReader { - fn new(storage: ObjectStoreRef, path: Path, meta_data: MetaData) -> Self { - Self { - storage, - path, - meta_data, - begin: Instant::now(), - } - } -} - -impl Drop for ObjectStoreReader { - fn drop(&mut self) { - debug!( - "ObjectStoreReader dropped, path:{}, elapsed:{:?}", - &self.path, - self.begin.elapsed() - ); - } -} - -impl AsyncFileReader for ObjectStoreReader { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { - self.storage - .get_range(&self.path, range) - .map_err(|e| { - parquet::errors::ParquetError::General(format!( - "Failed to fetch range from object store, err:{e}" - )) - }) - .boxed() - } - - fn get_byte_ranges( - &mut self, - ranges: Vec>, - ) -> BoxFuture<'_, parquet::errors::Result>> { - async move { - self.storage - .get_ranges(&self.path, &ranges) - .map_err(|e| { - parquet::errors::ParquetError::General(format!( - "Failed to fetch ranges from object store, err:{e}" - )) - }) - .await - } - .boxed() - } - - fn get_metadata( - &mut self, - ) -> BoxFuture<'_, parquet::errors::Result>> { - Box::pin(async move { Ok(self.meta_data.parquet().clone()) }) - } -} - pub struct ChunkReaderAdapter<'a> { path: &'a Path, store: &'a ObjectStoreRef, diff --git a/analytic_engine/src/sst/parquet/encoding.rs b/analytic_engine/src/sst/parquet/encoding.rs index d65f75dcd1..2effc6bc36 100644 --- a/analytic_engine/src/sst/parquet/encoding.rs +++ b/analytic_engine/src/sst/parquet/encoding.rs @@ -1030,11 +1030,11 @@ mod tests { ArrowRecordBatch::try_new(schema.to_arrow_schema_ref(), columns).unwrap(); let input_record_batch2 = ArrowRecordBatch::try_new(schema.to_arrow_schema_ref(), columns2).unwrap(); - let num_rows = encoder + let row_nums = encoder .encode(vec![input_record_batch, input_record_batch2]) .await .unwrap(); - assert_eq!(2, num_rows); + assert_eq!(2, row_nums); // read encoded records back, and then compare with input records encoder.close().await.unwrap(); diff --git a/analytic_engine/src/sst/reader.rs b/analytic_engine/src/sst/reader.rs index 99872d448a..915190a935 100644 --- a/analytic_engine/src/sst/reader.rs +++ b/analytic_engine/src/sst/reader.rs @@ -15,20 +15,17 @@ pub mod error { #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { - #[snafu(display("Try to read again, path:{}.\nBacktrace:\n{}", path, backtrace))] + #[snafu(display("Try to read again, path:{path}.\nBacktrace:\n{backtrace}"))] ReadAgain { backtrace: Backtrace, path: String }, - #[snafu(display("Fail to read persisted file, path:{}, err:{}", path, source))] + #[snafu(display("Fail to read persisted file, path:{path}, err:{source}"))] ReadPersist { path: String, source: GenericError }, - #[snafu(display("Failed to decode record batch, err:{}", source))] + #[snafu(display("Failed to decode record batch, err:{source}"))] DecodeRecordBatch { source: GenericError }, #[snafu(display( - "Failed to decode sst meta data, file_path:{}, err:{}.\nBacktrace:\n{:?}", - file_path, - source, - backtrace + "Failed to decode sst meta data, file_path:{file_path}, err:{source}.\nBacktrace:\n{backtrace:?}", ))] FetchAndDecodeSstMeta { file_path: String, @@ -36,43 +33,52 @@ pub mod error { backtrace: Backtrace, }, - #[snafu(display("Failed to decode sst meta data, err:{}", source))] + #[snafu(display( + "Failed to decode page indexes for meta data, file_path:{file_path}, err:{source}.\nBacktrace:\n{backtrace:?}", + ))] + DecodePageIndexes { + file_path: String, + source: parquet::errors::ParquetError, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to decode sst meta data, err:{source}"))] DecodeSstMeta { source: GenericError }, - #[snafu(display("Sst meta data is not found.\nBacktrace:\n{}", backtrace))] + #[snafu(display("Sst meta data is not found.\nBacktrace:\n{backtrace}"))] SstMetaNotFound { backtrace: Backtrace }, - #[snafu(display("Fail to projection, err:{}", source))] + #[snafu(display("Fail to projection, err:{source}"))] Projection { source: GenericError }, - #[snafu(display("Sst meta data is empty.\nBacktrace:\n{}", backtrace))] + #[snafu(display("Sst meta data is empty.\nBacktrace:\n{backtrace}"))] EmptySstMeta { backtrace: Backtrace }, - #[snafu(display("Invalid schema, err:{}", source))] + #[snafu(display("Invalid schema, err:{source}"))] InvalidSchema { source: common_types::schema::Error }, - #[snafu(display("Meet a datafusion error, err:{}\nBacktrace:\n{}", source, backtrace))] + #[snafu(display("Meet a datafusion error, err:{source}\nBacktrace:\n{backtrace}"))] DataFusionError { source: datafusion::error::DataFusionError, backtrace: Backtrace, }, - #[snafu(display("Meet a object store error, err:{}\nBacktrace:\n{}", source, backtrace))] + #[snafu(display("Meet a object store error, err:{source}\nBacktrace:\n{backtrace}"))] ObjectStoreError { source: object_store::ObjectStoreError, backtrace: Backtrace, }, - #[snafu(display("Meet a parquet error, err:{}\nBacktrace:\n{}", source, backtrace))] + #[snafu(display("Meet a parquet error, err:{source}\nBacktrace:\n{backtrace}"))] ParquetError { source: parquet::errors::ParquetError, backtrace: Backtrace, }, - #[snafu(display("Other kind of error:{}", source))] + #[snafu(display("Other kind of error:{source}"))] Other { source: GenericError }, - #[snafu(display("Other kind of error, msg:{}.\nBacktrace:\n{}", msg, backtrace))] + #[snafu(display("Other kind of error, msg:{msg}.\nBacktrace:\n{backtrace}"))] OtherNoCause { msg: String, backtrace: Backtrace }, } diff --git a/analytic_engine/src/table/data.rs b/analytic_engine/src/table/data.rs index 01ba153fd8..49d9d6cae8 100644 --- a/analytic_engine/src/table/data.rs +++ b/analytic_engine/src/table/data.rs @@ -7,7 +7,6 @@ use std::{ convert::TryInto, fmt, fmt::Formatter, - ops::{Deref, DerefMut}, sync::{ atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering}, Arc, Mutex, @@ -29,7 +28,6 @@ use log::{debug, info}; use object_store::Path; use snafu::{Backtrace, OptionExt, ResultExt, Snafu}; use table_engine::table::TableId; -use tokio::sync::MutexGuard; use crate::{ instance::serial_executor::TableOpSerialExecutor, @@ -88,36 +86,6 @@ impl TableShardInfo { } } -/// The context for execution of serial operation on the table. -pub struct SerialExecContext { - /// Denotes whether `serial_exec` is valid. - /// - /// The `serial_exec` will be invalidated if the table is closed. - invalid: bool, - serial_exec: TableOpSerialExecutor, -} - -impl SerialExecContext { - #[inline] - pub fn invalidate(&mut self) { - self.invalid = true; - } -} - -impl Deref for SerialExecContext { - type Target = TableOpSerialExecutor; - - fn deref(&self) -> &Self::Target { - &self.serial_exec - } -} - -impl DerefMut for SerialExecContext { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.serial_exec - } -} - /// Data of a table pub struct TableData { /// Id of this table @@ -175,13 +143,14 @@ pub struct TableData { /// No write/alter is allowed if the table is dropped. dropped: AtomicBool, - serial_exec_ctx: tokio::sync::Mutex, - /// Metrics of this table pub metrics: Metrics, /// Shard info of the table pub shard_info: TableShardInfo, + + /// The table operation serial_exec + pub serial_exec: tokio::sync::Mutex, } impl fmt::Debug for TableData { @@ -248,10 +217,6 @@ impl TableData { preflush_write_buffer_size_ratio, )); - let serial_exec_ctx = tokio::sync::Mutex::new(SerialExecContext { - invalid: false, - serial_exec: TableOpSerialExecutor::new(table_id), - }); Ok(Self { id: table_id, name: table_name, @@ -270,7 +235,7 @@ impl TableData { dropped: AtomicBool::new(false), metrics, shard_info: TableShardInfo::new(shard_id), - serial_exec_ctx, + serial_exec: tokio::sync::Mutex::new(TableOpSerialExecutor::new(table_id)), }) } @@ -284,17 +249,35 @@ impl TableData { preflush_write_buffer_size_ratio: f32, mem_usage_collector: CollectorRef, ) -> Result { - Self::new( - add_meta.space_id, - add_meta.table_id, - add_meta.table_name, - add_meta.schema, - shard_id, - add_meta.opts, - purger, + let memtable_factory = Arc::new(SkiplistMemTableFactory); + let purge_queue = purger.create_purge_queue(add_meta.space_id, add_meta.table_id); + let current_version = TableVersion::new(purge_queue); + let metrics = Metrics::default(); + let mutable_limit = AtomicU32::new(compute_mutable_limit( + add_meta.opts.write_buffer_size, preflush_write_buffer_size_ratio, + )); + + Ok(Self { + id: add_meta.table_id, + name: add_meta.table_name, + schema: Mutex::new(add_meta.schema), + space_id: add_meta.space_id, + mutable_limit, + mutable_limit_write_buffer_ratio: preflush_write_buffer_size_ratio, + opts: ArcSwap::new(Arc::new(add_meta.opts)), + memtable_factory, mem_usage_collector, - ) + current_version, + last_sequence: AtomicU64::new(0), + last_memtable_id: AtomicU64::new(0), + last_file_id: AtomicU64::new(0), + last_flush_time_ms: AtomicU64::new(0), + dropped: AtomicBool::new(false), + metrics, + shard_info: TableShardInfo::new(shard_id), + serial_exec: tokio::sync::Mutex::new(TableOpSerialExecutor::new(add_meta.table_id)), + }) } /// Get current schema of the table. @@ -369,34 +352,6 @@ impl TableData { self.dropped.store(true, Ordering::SeqCst); } - /// Acquire the [`SerialExecContext`] if the table is not closed. - pub async fn acquire_serial_exec_ctx(&self) -> Option> { - let v = self.serial_exec_ctx.lock().await; - if v.invalid { - None - } else { - Some(v) - } - } - - /// Try to acquire the [SerialExecContext]. - /// - /// [None] will be returned if the serial_exec_ctx has been acquired - /// already, or the table is closed. - pub fn try_acquire_serial_exec_ctx(&self) -> Option> { - let v = self.serial_exec_ctx.try_lock(); - match v { - Ok(ctx) => { - if ctx.invalid { - None - } else { - Some(ctx) - } - } - Err(_) => None, - } - } - /// Returns total memtable memory usage in bytes. #[inline] pub fn memtable_memory_usage(&self) -> usize { diff --git a/analytic_engine/src/table/mod.rs b/analytic_engine/src/table/mod.rs index 780b9cafcc..82b2b54b9c 100644 --- a/analytic_engine/src/table/mod.rs +++ b/analytic_engine/src/table/mod.rs @@ -21,9 +21,9 @@ use table_engine::{ stream::{PartitionedStreams, SendableRecordBatchStream}, table::{ AlterOptions, AlterSchema, AlterSchemaRequest, Compact, Flush, FlushRequest, Get, - GetInvalidPrimaryKey, GetNullPrimaryKey, GetRequest, MergeWrite, OperateClosedTable, - ReadOptions, ReadOrder, ReadRequest, Result, Scan, Table, TableId, TableStats, - TooManyPendingWrites, WaitForPendingWrites, Write, WriteRequest, + GetInvalidPrimaryKey, GetNullPrimaryKey, GetRequest, MergeWrite, ReadOptions, ReadOrder, + ReadRequest, Result, Scan, Table, TableId, TableStats, TooManyPendingWrites, + WaitForPendingWrites, Write, WriteRequest, }, ANALYTIC_ENGINE_TYPE, }; @@ -255,7 +255,7 @@ impl TableImpl { // This is the first request in the queue, and we should // take responsibilities for merging and writing the // requests in the queue. - let serial_exec_ctx = self.table_data.acquire_serial_exec_ctx().await; + let serial_exec = self.table_data.serial_exec.lock().await; // The `serial_exec` is acquired, let's merge the pending requests and write // them all. let pending_writes = { @@ -266,22 +266,9 @@ impl TableImpl { !pending_writes.is_empty(), "The pending writes should contain at least the one just pushed." ); - match serial_exec_ctx { - Some(v) => { - let merged_write_request = merge_pending_write_requests( - pending_writes.writes, - pending_writes.num_rows, - ); - (merged_write_request, v, pending_writes.notifiers) - } - None => { - // The table has been closed, notify all the waiters in - // the queue. - let write_err = OperateClosedTable.fail(); - self.notify_waiters(pending_writes.notifiers, &write_err); - return write_err; - } - } + let merged_write_request = + merge_pending_write_requests(pending_writes.writes, pending_writes.num_rows); + (merged_write_request, serial_exec, pending_writes.notifiers) } QueueResult::Waiter(rx) => { // The request is successfully pushed into the queue, and just wait for the @@ -316,18 +303,12 @@ impl TableImpl { .box_err() .context(Write { table: self.name() }); - // Notify the waiters for the pending writes. - self.notify_waiters(notifiers, &write_res); - - write_res.map(|_| num_rows) - } - - #[inline] - fn should_queue_write_request(&self, request: &WriteRequest) -> bool { - request.row_group.num_rows() < self.instance.max_rows_in_write_queue - } + // There is no waiter for pending writes, return the write result. + if notifiers.is_empty() { + return write_res; + } - fn notify_waiters(&self, notifiers: Vec>>, write_res: &Result) { + // Notify the waiters for the pending writes. match write_res { Ok(_) => { for notifier in notifiers { @@ -338,6 +319,7 @@ impl TableImpl { ); } } + Ok(num_rows) } Err(e) => { let err_msg = format!("Failed to do merge write, err:{e}"); @@ -350,9 +332,15 @@ impl TableImpl { ); } } + Err(e) } } } + + #[inline] + fn should_queue_write_request(&self, request: &WriteRequest) -> bool { + request.row_group.num_rows() < self.instance.max_rows_in_write_queue + } } #[async_trait] @@ -396,15 +384,11 @@ impl Table for TableImpl { return self.write_with_pending_queue(request).await; } - let mut serial_exec_ctx = self - .table_data - .acquire_serial_exec_ctx() - .await - .context(OperateClosedTable)?; + let mut serial_exec = self.table_data.serial_exec.lock().await; let mut writer = Writer::new( self.instance.clone(), self.space_table.clone(), - &mut serial_exec_ctx, + &mut serial_exec, ); writer .write(request) @@ -517,14 +501,10 @@ impl Table for TableImpl { } async fn alter_schema(&self, request: AlterSchemaRequest) -> Result { - let mut serial_exec_ctx = self - .table_data - .acquire_serial_exec_ctx() - .await - .context(OperateClosedTable)?; + let mut serial_exec = self.table_data.serial_exec.lock().await; let mut alterer = Alterer::new( self.table_data.clone(), - &mut serial_exec_ctx, + &mut serial_exec, self.instance.clone(), ) .await; @@ -538,14 +518,10 @@ impl Table for TableImpl { } async fn alter_options(&self, options: HashMap) -> Result { - let mut serial_exec_ctx = self - .table_data - .acquire_serial_exec_ctx() - .await - .context(OperateClosedTable)?; + let mut serial_exec = self.table_data.serial_exec.lock().await; let alterer = Alterer::new( self.table_data.clone(), - &mut serial_exec_ctx, + &mut serial_exec, self.instance.clone(), ) .await; diff --git a/analytic_engine/src/table/version.rs b/analytic_engine/src/table/version.rs index ddb7ffd21d..329c09677a 100644 --- a/analytic_engine/src/table/version.rs +++ b/analytic_engine/src/table/version.rs @@ -727,9 +727,9 @@ impl TableVersion { picker_ctx: PickerContext, picker: &CompactionPickerRef, ) -> picker::Result { - let mut inner = self.inner.write().unwrap(); + let inner = self.inner.read().unwrap(); - picker.pick_compaction(picker_ctx, &mut inner.levels_controller) + picker.pick_compaction(picker_ctx, &inner.levels_controller) } pub fn has_expired_sst(&self, expire_time: Option) -> bool { diff --git a/common_types/src/schema.rs b/common_types/src/schema.rs index d94f10e52a..c5db4d0636 100644 --- a/common_types/src/schema.rs +++ b/common_types/src/schema.rs @@ -690,7 +690,7 @@ impl Schema { self.column_schemas.num_columns() } - /// Returns true if idx is primary key index + /// Returns true if idx is primary key idnex pub fn is_primary_key_index(&self, idx: &usize) -> bool { self.primary_key_indexes.contains(idx) } diff --git a/components/parquet_ext/Cargo.toml b/components/parquet_ext/Cargo.toml index 1b4b4b23c6..ba31703d18 100644 --- a/components/parquet_ext/Cargo.toml +++ b/components/parquet_ext/Cargo.toml @@ -17,6 +17,8 @@ async-trait = { workspace = true } bytes = { workspace = true } common_util = { workspace = true } datafusion = { workspace = true } +futures = { workspace = true } log = { workspace = true } +object_store = { workspace = true } parquet = { workspace = true } tokio = { workspace = true } diff --git a/components/parquet_ext/src/lib.rs b/components/parquet_ext/src/lib.rs index cd413c0afc..7264b38dd6 100644 --- a/components/parquet_ext/src/lib.rs +++ b/components/parquet_ext/src/lib.rs @@ -2,6 +2,7 @@ pub mod meta_data; pub mod prune; +pub mod reader; pub mod reverse_reader; #[cfg(test)] pub mod tests; diff --git a/components/parquet_ext/src/meta_data.rs b/components/parquet_ext/src/meta_data.rs index e796244c16..1a95bb4f7f 100644 --- a/components/parquet_ext/src/meta_data.rs +++ b/components/parquet_ext/src/meta_data.rs @@ -1,15 +1,18 @@ // Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. -use std::ops::Range; +use std::{ops::Range, sync::Arc}; use async_trait::async_trait; use bytes::Bytes; use common_util::error::GenericResult; use parquet::{ + arrow::{arrow_reader::ArrowReaderOptions, ParquetRecordBatchStreamBuilder}, errors::{ParquetError, Result}, file::{footer, metadata::ParquetMetaData}, }; +use crate::reader::ObjectStoreReader; + #[async_trait] pub trait ChunkReader: Sync + Send { async fn get_bytes(&self, range: Range) -> GenericResult; @@ -65,3 +68,21 @@ pub async fn fetch_parquet_metadata( footer::decode_metadata(&metadata_bytes).map(|v| (v, metadata_len)) } + +/// Build page indexes for meta data +/// +/// TODO: Currently there is no method to build page indexes for meta data in +/// `parquet`, maybe we can write a issue in `arrow-rs` . +pub async fn meta_with_page_indexes( + object_store_reader: ObjectStoreReader, +) -> Result> { + let read_options = ArrowReaderOptions::new().with_page_index(true); + let builder = + ParquetRecordBatchStreamBuilder::new_with_options(object_store_reader, read_options) + .await + .map_err(|e| { + let err_msg = format!("failed to build page indexes in metadata, err:{e}"); + ParquetError::General(err_msg) + })?; + Ok(builder.metadata().clone()) +} diff --git a/components/parquet_ext/src/reader.rs b/components/parquet_ext/src/reader.rs new file mode 100644 index 0000000000..3a5cd5f170 --- /dev/null +++ b/components/parquet_ext/src/reader.rs @@ -0,0 +1,81 @@ +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + +use std::{ops::Range, sync::Arc, time::Instant}; + +use bytes::Bytes; +use futures::{ + future::{BoxFuture, FutureExt}, + TryFutureExt, +}; +use log::debug; +use object_store::{ObjectStoreRef, Path}; +use parquet::{arrow::async_reader::AsyncFileReader, file::metadata::ParquetMetaData}; + +/// Implemention AsyncFileReader based on `ObjectStore` +/// +/// TODO: Perhaps we should avoid importing `object_store` in `parquet_ext` to +/// keep the crate `parquet_ext` more pure. +#[derive(Clone)] +pub struct ObjectStoreReader { + storage: ObjectStoreRef, + path: Path, + meta_data: Arc, + begin: Instant, +} + +impl ObjectStoreReader { + pub fn new(storage: ObjectStoreRef, path: Path, meta_data: Arc) -> Self { + Self { + storage, + path, + meta_data, + begin: Instant::now(), + } + } +} + +impl Drop for ObjectStoreReader { + fn drop(&mut self) { + debug!( + "ObjectStoreReader dropped, path:{}, elapsed:{:?}", + &self.path, + self.begin.elapsed() + ); + } +} + +impl AsyncFileReader for ObjectStoreReader { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { + self.storage + .get_range(&self.path, range) + .map_err(|e| { + parquet::errors::ParquetError::General(format!( + "Failed to fetch range from object store, err:{e}" + )) + }) + .boxed() + } + + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, parquet::errors::Result>> { + async move { + self.storage + .get_ranges(&self.path, &ranges) + .map_err(|e| { + parquet::errors::ParquetError::General(format!( + "Failed to fetch ranges from object store, err:{e}" + )) + }) + .await + } + .boxed() + } + + fn get_metadata( + &mut self, + ) -> BoxFuture<'_, parquet::errors::Result>> { + Box::pin(async move { Ok(self.meta_data.clone()) }) + } +} diff --git a/table_engine/src/table.rs b/table_engine/src/table.rs index aea9f64cf5..821bdb6195 100644 --- a/table_engine/src/table.rs +++ b/table_engine/src/table.rs @@ -135,9 +135,6 @@ pub enum Error { source: GenericError, }, - #[snafu(display("Try to operate a closed table.\nBacktrace:\n{backtrace}"))] - OperateClosedTable { backtrace: Backtrace }, - #[snafu(display( "Failed to wait for pending writes, table:{table}.\nBacktrace:\n{backtrace}" ))] diff --git a/tools/src/bin/sst-metadata.rs b/tools/src/bin/sst-metadata.rs index 9eb81422bb..16ae26ab94 100644 --- a/tools/src/bin/sst-metadata.rs +++ b/tools/src/bin/sst-metadata.rs @@ -13,7 +13,7 @@ use common_util::{ }; use futures::StreamExt; use object_store::{LocalFileSystem, ObjectMeta, ObjectStoreRef, Path}; -use parquet_ext::meta_data::fetch_parquet_metadata; +use parquet_ext::{meta_data::fetch_parquet_metadata, reader::ObjectStoreReader}; use tokio::{runtime::Handle, task::JoinSet}; #[derive(Parser, Debug)] @@ -30,6 +30,10 @@ struct Args { /// Thread num, 0 means cpu num #[clap(short, long, default_value_t = 0)] threads: usize, + + /// Print page indexes + #[clap(short, long, required(false))] + page_indexes: bool, } fn new_runtime(thread_num: usize) -> Runtime { @@ -64,6 +68,7 @@ async fn run(args: Args) -> Result<()> { let mut join_set = JoinSet::new(); let mut ssts = storage.list(None).await?; let verbose = args.verbose; + let page_indexes = args.page_indexes; while let Some(object_meta) = ssts.next().await { let object_meta = object_meta?; let storage = storage.clone(); @@ -71,7 +76,8 @@ async fn run(args: Args) -> Result<()> { join_set.spawn_on( async move { let (metadata, metadata_size, kv_size) = - parse_metadata(storage, location, object_meta.size, verbose).await?; + parse_metadata(storage, location, object_meta.size, verbose, page_indexes) + .await?; Ok::<_, anyhow::Error>((object_meta, metadata, metadata_size, kv_size)) }, &handle, @@ -133,9 +139,11 @@ async fn parse_metadata( path: Path, size: usize, verbose: bool, + page_indexes: bool, ) -> Result<(MetaData, usize, usize)> { let reader = ChunkReaderAdapter::new(&path, &storage); let (parquet_metadata, metadata_size) = fetch_parquet_metadata(size, &reader).await?; + let kv_metadata = parquet_metadata.file_metadata().key_value_metadata(); let kv_size = kv_metadata .map(|kvs| { @@ -155,6 +163,15 @@ async fn parse_metadata( }) .unwrap_or(0); - let md = MetaData::try_new(&parquet_metadata, false)?; + let md = if page_indexes { + let object_store_reader = + ObjectStoreReader::new(storage, path.clone(), Arc::new(parquet_metadata)); + let parquet_metadata = + parquet_ext::meta_data::meta_with_page_indexes(object_store_reader).await?; + MetaData::try_new(&parquet_metadata, false)? + } else { + MetaData::try_new(&parquet_metadata, false)? + }; + Ok((md, metadata_size, kv_size)) }