diff --git a/Cargo.lock b/Cargo.lock index 34e4aa6983..53836cc5d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4137,9 +4137,7 @@ dependencies = [ "bytes", "common_util", "datafusion", - "futures 0.3.28", "log", - "object_store 1.2.2", "parquet", "tokio", ] diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index b2181f727b..47478eca6a 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -30,14 +30,17 @@ use datafusion::{ metrics::ExecutionPlanMetricsSet, }, }; -use futures::{Stream, StreamExt}; +use futures::{future::BoxFuture, FutureExt, Stream, StreamExt, TryFutureExt}; use log::{debug, error}; use object_store::{ObjectStoreRef, Path}; use parquet::{ - arrow::{arrow_reader::RowSelection, ParquetRecordBatchStreamBuilder, ProjectionMask}, + arrow::{ + arrow_reader::RowSelection, async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder, + ProjectionMask, + }, file::metadata::RowGroupMetaData, }; -use parquet_ext::{meta_data::ChunkReader, reader::ObjectStoreReader}; +use parquet_ext::meta_data::ChunkReader; use snafu::ResultExt; use table_engine::predicate::PredicateRef; use tokio::sync::mpsc::{self, Receiver, Sender}; @@ -278,23 +281,13 @@ impl<'a> Reader<'a> { let mut streams = Vec::with_capacity(target_row_group_chunks.len()); for chunk in target_row_group_chunks { - let object_store_reader = ObjectStoreReader::new( - self.store.clone(), - self.path.clone(), - parquet_metadata.clone(), - ); + let object_store_reader = + ObjectStoreReader::new(self.store.clone(), self.path.clone(), meta_data.clone()); let mut builder = ParquetRecordBatchStreamBuilder::new(object_store_reader) .await .with_context(|| ParquetError)?; - let row_selection = self.build_row_selection(arrow_schema.clone(), &chunk, parquet_metadata)?; - - debug!( - "Build row selection for file path:{}, result:{row_selection:?}, page indexes:{}", - self.path, - parquet_metadata.page_indexes().is_some() - ); if let Some(selection) = row_selection { builder = builder.with_row_selection(selection); }; @@ -347,32 +340,18 @@ impl<'a> Reader<'a> { Ok(file_size) } - async fn load_meta_data_from_storage(&self, ignore_sst_filter: bool) -> Result { + async fn load_meta_data_from_storage(&self) -> Result { let file_size = self.load_file_size().await?; let chunk_reader_adapter = ChunkReaderAdapter::new(self.path, self.store); - let (parquet_meta_data, _) = + let (meta_data, _) = parquet_ext::meta_data::fetch_parquet_metadata(file_size, &chunk_reader_adapter) .await .with_context(|| FetchAndDecodeSstMeta { file_path: self.path.to_string(), })?; - let object_store_reader = parquet_ext::reader::ObjectStoreReader::new( - self.store.clone(), - self.path.clone(), - Arc::new(parquet_meta_data), - ); - - let parquet_meta_data = parquet_ext::meta_data::meta_with_page_indexes(object_store_reader) - .await - .with_context(|| DecodePageIndexes { - file_path: self.path.to_string(), - })?; - - MetaData::try_new(&parquet_meta_data, ignore_sst_filter) - .box_err() - .context(DecodeSstMeta) + Ok(meta_data) } fn need_update_cache(&self) -> bool { @@ -396,8 +375,12 @@ impl<'a> Reader<'a> { let empty_predicate = self.predicate.exprs().is_empty(); let meta_data = { + let parquet_meta_data = self.load_meta_data_from_storage().await?; + let ignore_sst_filter = avoid_update_cache && empty_predicate; - self.load_meta_data_from_storage(ignore_sst_filter).await? + MetaData::try_new(&parquet_meta_data, ignore_sst_filter) + .box_err() + .context(DecodeSstMeta)? }; if avoid_update_cache || self.meta_cache.is_none() { @@ -430,6 +413,71 @@ impl<'a> Drop for Reader<'a> { } } +#[derive(Clone)] +struct ObjectStoreReader { + storage: ObjectStoreRef, + path: Path, + meta_data: MetaData, + begin: Instant, +} + +impl ObjectStoreReader { + fn new(storage: ObjectStoreRef, path: Path, meta_data: MetaData) -> Self { + Self { + storage, + path, + meta_data, + begin: Instant::now(), + } + } +} + +impl Drop for ObjectStoreReader { + fn drop(&mut self) { + debug!( + "ObjectStoreReader dropped, path:{}, elapsed:{:?}", + &self.path, + self.begin.elapsed() + ); + } +} + +impl AsyncFileReader for ObjectStoreReader { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { + self.storage + .get_range(&self.path, range) + .map_err(|e| { + parquet::errors::ParquetError::General(format!( + "Failed to fetch range from object store, err:{e}" + )) + }) + .boxed() + } + + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, parquet::errors::Result>> { + async move { + self.storage + .get_ranges(&self.path, &ranges) + .map_err(|e| { + parquet::errors::ParquetError::General(format!( + "Failed to fetch ranges from object store, err:{e}" + )) + }) + .await + } + .boxed() + } + + fn get_metadata( + &mut self, + ) -> BoxFuture<'_, parquet::errors::Result>> { + Box::pin(async move { Ok(self.meta_data.parquet().clone()) }) + } +} + pub struct ChunkReaderAdapter<'a> { path: &'a Path, store: &'a ObjectStoreRef, diff --git a/analytic_engine/src/sst/reader.rs b/analytic_engine/src/sst/reader.rs index 029b0aa34a..599f1691ba 100644 --- a/analytic_engine/src/sst/reader.rs +++ b/analytic_engine/src/sst/reader.rs @@ -15,17 +15,20 @@ pub mod error { #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { - #[snafu(display("Try to read again, path:{path}.\nBacktrace:\n{backtrace}"))] + #[snafu(display("Try to read again, path:{}.\nBacktrace:\n{}", path, backtrace))] ReadAgain { backtrace: Backtrace, path: String }, - #[snafu(display("Fail to read persisted file, path:{path}, err:{source}"))] + #[snafu(display("Fail to read persisted file, path:{}, err:{}", path, source))] ReadPersist { path: String, source: GenericError }, - #[snafu(display("Failed to decode record batch, err:{source}"))] + #[snafu(display("Failed to decode record batch, err:{}", source))] DecodeRecordBatch { source: GenericError }, #[snafu(display( - "Failed to decode sst meta data, file_path:{file_path}, err:{source}.\nBacktrace:\n{backtrace:?}", + "Failed to decode sst meta data, file_path:{}, err:{}.\nBacktrace:\n{:?}", + file_path, + source, + backtrace ))] FetchAndDecodeSstMeta { file_path: String, @@ -33,52 +36,43 @@ pub mod error { backtrace: Backtrace, }, - #[snafu(display( - "Failed to decode page indexes for meta data, file_path:{file_path}, err:{source}.\nBacktrace:\n{backtrace:?}", - ))] - DecodePageIndexes { - file_path: String, - source: parquet::errors::ParquetError, - backtrace: Backtrace, - }, - - #[snafu(display("Failed to decode sst meta data, err:{source}"))] + #[snafu(display("Failed to decode sst meta data, err:{}", source))] DecodeSstMeta { source: GenericError }, - #[snafu(display("Sst meta data is not found.\nBacktrace:\n{backtrace}"))] + #[snafu(display("Sst meta data is not found.\nBacktrace:\n{}", backtrace))] SstMetaNotFound { backtrace: Backtrace }, - #[snafu(display("Fail to projection, err:{source}"))] + #[snafu(display("Fail to projection, err:{}", source))] Projection { source: GenericError }, - #[snafu(display("Sst meta data is empty.\nBacktrace:\n{backtrace}"))] + #[snafu(display("Sst meta data is empty.\nBacktrace:\n{}", backtrace))] EmptySstMeta { backtrace: Backtrace }, - #[snafu(display("Invalid schema, err:{source}"))] + #[snafu(display("Invalid schema, err:{}", source))] InvalidSchema { source: common_types::schema::Error }, - #[snafu(display("Meet a datafusion error, err:{source}\nBacktrace:\n{backtrace}"))] + #[snafu(display("Meet a datafusion error, err:{}\nBacktrace:\n{}", source, backtrace))] DataFusionError { source: datafusion::error::DataFusionError, backtrace: Backtrace, }, - #[snafu(display("Meet a object store error, err:{source}\nBacktrace:\n{backtrace}"))] + #[snafu(display("Meet a object store error, err:{}\nBacktrace:\n{}", source, backtrace))] ObjectStoreError { source: object_store::ObjectStoreError, backtrace: Backtrace, }, - #[snafu(display("Meet a parquet error, err:{source}\nBacktrace:\n{backtrace}"))] + #[snafu(display("Meet a parquet error, err:{}\nBacktrace:\n{}", source, backtrace))] ParquetError { source: parquet::errors::ParquetError, backtrace: Backtrace, }, - #[snafu(display("Other kind of error:{source}"))] + #[snafu(display("Other kind of error:{}", source))] Other { source: GenericError }, - #[snafu(display("Other kind of error, msg:{msg}.\nBacktrace:\n{backtrace}"))] + #[snafu(display("Other kind of error, msg:{}.\nBacktrace:\n{}", msg, backtrace))] OtherNoCause { msg: String, backtrace: Backtrace }, } diff --git a/components/parquet_ext/Cargo.toml b/components/parquet_ext/Cargo.toml index ba31703d18..1b4b4b23c6 100644 --- a/components/parquet_ext/Cargo.toml +++ b/components/parquet_ext/Cargo.toml @@ -17,8 +17,6 @@ async-trait = { workspace = true } bytes = { workspace = true } common_util = { workspace = true } datafusion = { workspace = true } -futures = { workspace = true } log = { workspace = true } -object_store = { workspace = true } parquet = { workspace = true } tokio = { workspace = true } diff --git a/components/parquet_ext/src/lib.rs b/components/parquet_ext/src/lib.rs index 7264b38dd6..cd413c0afc 100644 --- a/components/parquet_ext/src/lib.rs +++ b/components/parquet_ext/src/lib.rs @@ -2,7 +2,6 @@ pub mod meta_data; pub mod prune; -pub mod reader; pub mod reverse_reader; #[cfg(test)] pub mod tests; diff --git a/components/parquet_ext/src/meta_data.rs b/components/parquet_ext/src/meta_data.rs index 56bf777ff8..5670bc93f0 100644 --- a/components/parquet_ext/src/meta_data.rs +++ b/components/parquet_ext/src/meta_data.rs @@ -1,18 +1,15 @@ // Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. -use std::{ops::Range, sync::Arc}; +use std::ops::Range; use async_trait::async_trait; use bytes::Bytes; use common_util::error::GenericResult; use parquet::{ - arrow::{arrow_reader::ArrowReaderOptions, ParquetRecordBatchStreamBuilder}, errors::{ParquetError, Result}, file::{footer, metadata::ParquetMetaData}, }; -use crate::reader::ObjectStoreReader; - #[async_trait] pub trait ChunkReader: Sync + Send { async fn get_bytes(&self, range: Range) -> GenericResult; @@ -68,21 +65,3 @@ pub async fn fetch_parquet_metadata( footer::decode_metadata(&metadata_bytes).map(|v| (v, metadata_len)) } - -/// Build page indexes for meta data -/// -/// TODO: Currently there is no method to build page indexes for meta data in -/// `parquet`, maybe we can write a issue in `arrow-rs` . -pub async fn meta_with_page_indexes( - object_store_reader: ObjectStoreReader, -) -> Result> { - let read_options = ArrowReaderOptions::new().with_page_index(true); - let builder = - ParquetRecordBatchStreamBuilder::new_with_options(object_store_reader, read_options) - .await - .map_err(|e| { - let err_msg = format!("failed to build page indexes in metadata, err:{e}"); - ParquetError::General(err_msg) - })?; - Ok(builder.metadata().clone()) -} diff --git a/components/parquet_ext/src/reader.rs b/components/parquet_ext/src/reader.rs deleted file mode 100644 index 3a5cd5f170..0000000000 --- a/components/parquet_ext/src/reader.rs +++ /dev/null @@ -1,81 +0,0 @@ -// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. - -use std::{ops::Range, sync::Arc, time::Instant}; - -use bytes::Bytes; -use futures::{ - future::{BoxFuture, FutureExt}, - TryFutureExt, -}; -use log::debug; -use object_store::{ObjectStoreRef, Path}; -use parquet::{arrow::async_reader::AsyncFileReader, file::metadata::ParquetMetaData}; - -/// Implemention AsyncFileReader based on `ObjectStore` -/// -/// TODO: Perhaps we should avoid importing `object_store` in `parquet_ext` to -/// keep the crate `parquet_ext` more pure. -#[derive(Clone)] -pub struct ObjectStoreReader { - storage: ObjectStoreRef, - path: Path, - meta_data: Arc, - begin: Instant, -} - -impl ObjectStoreReader { - pub fn new(storage: ObjectStoreRef, path: Path, meta_data: Arc) -> Self { - Self { - storage, - path, - meta_data, - begin: Instant::now(), - } - } -} - -impl Drop for ObjectStoreReader { - fn drop(&mut self) { - debug!( - "ObjectStoreReader dropped, path:{}, elapsed:{:?}", - &self.path, - self.begin.elapsed() - ); - } -} - -impl AsyncFileReader for ObjectStoreReader { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { - self.storage - .get_range(&self.path, range) - .map_err(|e| { - parquet::errors::ParquetError::General(format!( - "Failed to fetch range from object store, err:{e}" - )) - }) - .boxed() - } - - fn get_byte_ranges( - &mut self, - ranges: Vec>, - ) -> BoxFuture<'_, parquet::errors::Result>> { - async move { - self.storage - .get_ranges(&self.path, &ranges) - .map_err(|e| { - parquet::errors::ParquetError::General(format!( - "Failed to fetch ranges from object store, err:{e}" - )) - }) - .await - } - .boxed() - } - - fn get_metadata( - &mut self, - ) -> BoxFuture<'_, parquet::errors::Result>> { - Box::pin(async move { Ok(self.meta_data.clone()) }) - } -} diff --git a/tools/src/bin/sst-metadata.rs b/tools/src/bin/sst-metadata.rs index a089ad2da5..95765f9b8d 100644 --- a/tools/src/bin/sst-metadata.rs +++ b/tools/src/bin/sst-metadata.rs @@ -13,7 +13,7 @@ use common_util::{ }; use futures::StreamExt; use object_store::{LocalFileSystem, ObjectMeta, ObjectStoreRef, Path}; -use parquet_ext::{meta_data::fetch_parquet_metadata, reader::ObjectStoreReader}; +use parquet_ext::meta_data::fetch_parquet_metadata; use tokio::{runtime::Handle, task::JoinSet}; #[derive(Parser, Debug)] @@ -30,10 +30,6 @@ struct Args { /// Thread num, 0 means cpu num #[clap(short, long, default_value_t = 0)] threads: usize, - - /// Print page indexes - #[clap(short, long, required(false))] - page_indexes: bool, } #[derive(Default, Debug)] @@ -96,7 +92,6 @@ async fn run(args: Args) -> Result<()> { let mut join_set = JoinSet::new(); let mut ssts = storage.list(None).await?; let verbose = args.verbose; - let page_indexes = args.page_indexes; while let Some(object_meta) = ssts.next().await { let object_meta = object_meta?; let storage = storage.clone(); @@ -104,8 +99,7 @@ async fn run(args: Args) -> Result<()> { join_set.spawn_on( async move { let (metadata, metadata_size, kv_size) = - parse_metadata(storage, location, object_meta.size, verbose, page_indexes) - .await?; + parse_metadata(storage, location, object_meta.size, verbose).await?; Ok::<_, anyhow::Error>((object_meta, metadata, metadata_size, kv_size)) }, &handle, @@ -201,11 +195,9 @@ async fn parse_metadata( path: Path, size: usize, verbose: bool, - page_indexes: bool, ) -> Result<(MetaData, usize, usize)> { let reader = ChunkReaderAdapter::new(&path, &storage); let (parquet_metadata, metadata_size) = fetch_parquet_metadata(size, &reader).await?; - let kv_metadata = parquet_metadata.file_metadata().key_value_metadata(); let kv_size = kv_metadata .map(|kvs| { @@ -225,15 +217,6 @@ async fn parse_metadata( }) .unwrap_or(0); - let md = if page_indexes { - let object_store_reader = - ObjectStoreReader::new(storage, path.clone(), Arc::new(parquet_metadata)); - let parquet_metadata = - parquet_ext::meta_data::meta_with_page_indexes(object_store_reader).await?; - MetaData::try_new(&parquet_metadata, false)? - } else { - MetaData::try_new(&parquet_metadata, false)? - }; - + let md = MetaData::try_new(&parquet_metadata, false)?; Ok((md, metadata_size, kv_size)) }