From bd5163876b7d63a6b49319d79c5a9decd3ca9e58 Mon Sep 17 00:00:00 2001 From: lee <690585471@qq.com> Date: Mon, 26 Jun 2023 15:08:32 +0800 Subject: [PATCH] feat: add page indexes for metadata (#1027) ## Rationale #1000 was reverted in #1026, here to create a new one. See details in #1000 ## Detailed Changes add page index for metadata. --- Cargo.lock | 2 + .../src/sst/parquet/async_reader.rs | 114 +++++------------- analytic_engine/src/sst/reader.rs | 40 +++--- components/parquet_ext/Cargo.toml | 2 + components/parquet_ext/src/lib.rs | 1 + components/parquet_ext/src/meta_data.rs | 23 +++- components/parquet_ext/src/reader.rs | 81 +++++++++++++ tools/src/bin/sst-metadata.rs | 23 +++- 8 files changed, 184 insertions(+), 102 deletions(-) create mode 100644 components/parquet_ext/src/reader.rs diff --git a/Cargo.lock b/Cargo.lock index e9d3fb4efa..dd397a2d26 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4136,7 +4136,9 @@ dependencies = [ "bytes", "common_util", "datafusion", + "futures 0.3.28", "log", + "object_store 1.2.2", "parquet", "tokio", ] diff --git a/analytic_engine/src/sst/parquet/async_reader.rs b/analytic_engine/src/sst/parquet/async_reader.rs index 47478eca6a..b2181f727b 100644 --- a/analytic_engine/src/sst/parquet/async_reader.rs +++ b/analytic_engine/src/sst/parquet/async_reader.rs @@ -30,17 +30,14 @@ use datafusion::{ metrics::ExecutionPlanMetricsSet, }, }; -use futures::{future::BoxFuture, FutureExt, Stream, StreamExt, TryFutureExt}; +use futures::{Stream, StreamExt}; use log::{debug, error}; use object_store::{ObjectStoreRef, Path}; use parquet::{ - arrow::{ - arrow_reader::RowSelection, async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder, - ProjectionMask, - }, + arrow::{arrow_reader::RowSelection, ParquetRecordBatchStreamBuilder, ProjectionMask}, file::metadata::RowGroupMetaData, }; -use parquet_ext::meta_data::ChunkReader; +use parquet_ext::{meta_data::ChunkReader, reader::ObjectStoreReader}; use snafu::ResultExt; use table_engine::predicate::PredicateRef; use tokio::sync::mpsc::{self, Receiver, Sender}; @@ -281,13 +278,23 @@ impl<'a> Reader<'a> { let mut streams = Vec::with_capacity(target_row_group_chunks.len()); for chunk in target_row_group_chunks { - let object_store_reader = - ObjectStoreReader::new(self.store.clone(), self.path.clone(), meta_data.clone()); + let object_store_reader = ObjectStoreReader::new( + self.store.clone(), + self.path.clone(), + parquet_metadata.clone(), + ); let mut builder = ParquetRecordBatchStreamBuilder::new(object_store_reader) .await .with_context(|| ParquetError)?; + let row_selection = self.build_row_selection(arrow_schema.clone(), &chunk, parquet_metadata)?; + + debug!( + "Build row selection for file path:{}, result:{row_selection:?}, page indexes:{}", + self.path, + parquet_metadata.page_indexes().is_some() + ); if let Some(selection) = row_selection { builder = builder.with_row_selection(selection); }; @@ -340,18 +347,32 @@ impl<'a> Reader<'a> { Ok(file_size) } - async fn load_meta_data_from_storage(&self) -> Result { + async fn load_meta_data_from_storage(&self, ignore_sst_filter: bool) -> Result { let file_size = self.load_file_size().await?; let chunk_reader_adapter = ChunkReaderAdapter::new(self.path, self.store); - let (meta_data, _) = + let (parquet_meta_data, _) = parquet_ext::meta_data::fetch_parquet_metadata(file_size, &chunk_reader_adapter) .await .with_context(|| FetchAndDecodeSstMeta { file_path: self.path.to_string(), })?; - Ok(meta_data) + let object_store_reader = parquet_ext::reader::ObjectStoreReader::new( + self.store.clone(), + self.path.clone(), + Arc::new(parquet_meta_data), + ); + + let parquet_meta_data = parquet_ext::meta_data::meta_with_page_indexes(object_store_reader) + .await + .with_context(|| DecodePageIndexes { + file_path: self.path.to_string(), + })?; + + MetaData::try_new(&parquet_meta_data, ignore_sst_filter) + .box_err() + .context(DecodeSstMeta) } fn need_update_cache(&self) -> bool { @@ -375,12 +396,8 @@ impl<'a> Reader<'a> { let empty_predicate = self.predicate.exprs().is_empty(); let meta_data = { - let parquet_meta_data = self.load_meta_data_from_storage().await?; - let ignore_sst_filter = avoid_update_cache && empty_predicate; - MetaData::try_new(&parquet_meta_data, ignore_sst_filter) - .box_err() - .context(DecodeSstMeta)? + self.load_meta_data_from_storage(ignore_sst_filter).await? }; if avoid_update_cache || self.meta_cache.is_none() { @@ -413,71 +430,6 @@ impl<'a> Drop for Reader<'a> { } } -#[derive(Clone)] -struct ObjectStoreReader { - storage: ObjectStoreRef, - path: Path, - meta_data: MetaData, - begin: Instant, -} - -impl ObjectStoreReader { - fn new(storage: ObjectStoreRef, path: Path, meta_data: MetaData) -> Self { - Self { - storage, - path, - meta_data, - begin: Instant::now(), - } - } -} - -impl Drop for ObjectStoreReader { - fn drop(&mut self) { - debug!( - "ObjectStoreReader dropped, path:{}, elapsed:{:?}", - &self.path, - self.begin.elapsed() - ); - } -} - -impl AsyncFileReader for ObjectStoreReader { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { - self.storage - .get_range(&self.path, range) - .map_err(|e| { - parquet::errors::ParquetError::General(format!( - "Failed to fetch range from object store, err:{e}" - )) - }) - .boxed() - } - - fn get_byte_ranges( - &mut self, - ranges: Vec>, - ) -> BoxFuture<'_, parquet::errors::Result>> { - async move { - self.storage - .get_ranges(&self.path, &ranges) - .map_err(|e| { - parquet::errors::ParquetError::General(format!( - "Failed to fetch ranges from object store, err:{e}" - )) - }) - .await - } - .boxed() - } - - fn get_metadata( - &mut self, - ) -> BoxFuture<'_, parquet::errors::Result>> { - Box::pin(async move { Ok(self.meta_data.parquet().clone()) }) - } -} - pub struct ChunkReaderAdapter<'a> { path: &'a Path, store: &'a ObjectStoreRef, diff --git a/analytic_engine/src/sst/reader.rs b/analytic_engine/src/sst/reader.rs index 599f1691ba..029b0aa34a 100644 --- a/analytic_engine/src/sst/reader.rs +++ b/analytic_engine/src/sst/reader.rs @@ -15,20 +15,17 @@ pub mod error { #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { - #[snafu(display("Try to read again, path:{}.\nBacktrace:\n{}", path, backtrace))] + #[snafu(display("Try to read again, path:{path}.\nBacktrace:\n{backtrace}"))] ReadAgain { backtrace: Backtrace, path: String }, - #[snafu(display("Fail to read persisted file, path:{}, err:{}", path, source))] + #[snafu(display("Fail to read persisted file, path:{path}, err:{source}"))] ReadPersist { path: String, source: GenericError }, - #[snafu(display("Failed to decode record batch, err:{}", source))] + #[snafu(display("Failed to decode record batch, err:{source}"))] DecodeRecordBatch { source: GenericError }, #[snafu(display( - "Failed to decode sst meta data, file_path:{}, err:{}.\nBacktrace:\n{:?}", - file_path, - source, - backtrace + "Failed to decode sst meta data, file_path:{file_path}, err:{source}.\nBacktrace:\n{backtrace:?}", ))] FetchAndDecodeSstMeta { file_path: String, @@ -36,43 +33,52 @@ pub mod error { backtrace: Backtrace, }, - #[snafu(display("Failed to decode sst meta data, err:{}", source))] + #[snafu(display( + "Failed to decode page indexes for meta data, file_path:{file_path}, err:{source}.\nBacktrace:\n{backtrace:?}", + ))] + DecodePageIndexes { + file_path: String, + source: parquet::errors::ParquetError, + backtrace: Backtrace, + }, + + #[snafu(display("Failed to decode sst meta data, err:{source}"))] DecodeSstMeta { source: GenericError }, - #[snafu(display("Sst meta data is not found.\nBacktrace:\n{}", backtrace))] + #[snafu(display("Sst meta data is not found.\nBacktrace:\n{backtrace}"))] SstMetaNotFound { backtrace: Backtrace }, - #[snafu(display("Fail to projection, err:{}", source))] + #[snafu(display("Fail to projection, err:{source}"))] Projection { source: GenericError }, - #[snafu(display("Sst meta data is empty.\nBacktrace:\n{}", backtrace))] + #[snafu(display("Sst meta data is empty.\nBacktrace:\n{backtrace}"))] EmptySstMeta { backtrace: Backtrace }, - #[snafu(display("Invalid schema, err:{}", source))] + #[snafu(display("Invalid schema, err:{source}"))] InvalidSchema { source: common_types::schema::Error }, - #[snafu(display("Meet a datafusion error, err:{}\nBacktrace:\n{}", source, backtrace))] + #[snafu(display("Meet a datafusion error, err:{source}\nBacktrace:\n{backtrace}"))] DataFusionError { source: datafusion::error::DataFusionError, backtrace: Backtrace, }, - #[snafu(display("Meet a object store error, err:{}\nBacktrace:\n{}", source, backtrace))] + #[snafu(display("Meet a object store error, err:{source}\nBacktrace:\n{backtrace}"))] ObjectStoreError { source: object_store::ObjectStoreError, backtrace: Backtrace, }, - #[snafu(display("Meet a parquet error, err:{}\nBacktrace:\n{}", source, backtrace))] + #[snafu(display("Meet a parquet error, err:{source}\nBacktrace:\n{backtrace}"))] ParquetError { source: parquet::errors::ParquetError, backtrace: Backtrace, }, - #[snafu(display("Other kind of error:{}", source))] + #[snafu(display("Other kind of error:{source}"))] Other { source: GenericError }, - #[snafu(display("Other kind of error, msg:{}.\nBacktrace:\n{}", msg, backtrace))] + #[snafu(display("Other kind of error, msg:{msg}.\nBacktrace:\n{backtrace}"))] OtherNoCause { msg: String, backtrace: Backtrace }, } diff --git a/components/parquet_ext/Cargo.toml b/components/parquet_ext/Cargo.toml index 1b4b4b23c6..ba31703d18 100644 --- a/components/parquet_ext/Cargo.toml +++ b/components/parquet_ext/Cargo.toml @@ -17,6 +17,8 @@ async-trait = { workspace = true } bytes = { workspace = true } common_util = { workspace = true } datafusion = { workspace = true } +futures = { workspace = true } log = { workspace = true } +object_store = { workspace = true } parquet = { workspace = true } tokio = { workspace = true } diff --git a/components/parquet_ext/src/lib.rs b/components/parquet_ext/src/lib.rs index cd413c0afc..7264b38dd6 100644 --- a/components/parquet_ext/src/lib.rs +++ b/components/parquet_ext/src/lib.rs @@ -2,6 +2,7 @@ pub mod meta_data; pub mod prune; +pub mod reader; pub mod reverse_reader; #[cfg(test)] pub mod tests; diff --git a/components/parquet_ext/src/meta_data.rs b/components/parquet_ext/src/meta_data.rs index 5670bc93f0..56bf777ff8 100644 --- a/components/parquet_ext/src/meta_data.rs +++ b/components/parquet_ext/src/meta_data.rs @@ -1,15 +1,18 @@ // Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. -use std::ops::Range; +use std::{ops::Range, sync::Arc}; use async_trait::async_trait; use bytes::Bytes; use common_util::error::GenericResult; use parquet::{ + arrow::{arrow_reader::ArrowReaderOptions, ParquetRecordBatchStreamBuilder}, errors::{ParquetError, Result}, file::{footer, metadata::ParquetMetaData}, }; +use crate::reader::ObjectStoreReader; + #[async_trait] pub trait ChunkReader: Sync + Send { async fn get_bytes(&self, range: Range) -> GenericResult; @@ -65,3 +68,21 @@ pub async fn fetch_parquet_metadata( footer::decode_metadata(&metadata_bytes).map(|v| (v, metadata_len)) } + +/// Build page indexes for meta data +/// +/// TODO: Currently there is no method to build page indexes for meta data in +/// `parquet`, maybe we can write a issue in `arrow-rs` . +pub async fn meta_with_page_indexes( + object_store_reader: ObjectStoreReader, +) -> Result> { + let read_options = ArrowReaderOptions::new().with_page_index(true); + let builder = + ParquetRecordBatchStreamBuilder::new_with_options(object_store_reader, read_options) + .await + .map_err(|e| { + let err_msg = format!("failed to build page indexes in metadata, err:{e}"); + ParquetError::General(err_msg) + })?; + Ok(builder.metadata().clone()) +} diff --git a/components/parquet_ext/src/reader.rs b/components/parquet_ext/src/reader.rs new file mode 100644 index 0000000000..3a5cd5f170 --- /dev/null +++ b/components/parquet_ext/src/reader.rs @@ -0,0 +1,81 @@ +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + +use std::{ops::Range, sync::Arc, time::Instant}; + +use bytes::Bytes; +use futures::{ + future::{BoxFuture, FutureExt}, + TryFutureExt, +}; +use log::debug; +use object_store::{ObjectStoreRef, Path}; +use parquet::{arrow::async_reader::AsyncFileReader, file::metadata::ParquetMetaData}; + +/// Implemention AsyncFileReader based on `ObjectStore` +/// +/// TODO: Perhaps we should avoid importing `object_store` in `parquet_ext` to +/// keep the crate `parquet_ext` more pure. +#[derive(Clone)] +pub struct ObjectStoreReader { + storage: ObjectStoreRef, + path: Path, + meta_data: Arc, + begin: Instant, +} + +impl ObjectStoreReader { + pub fn new(storage: ObjectStoreRef, path: Path, meta_data: Arc) -> Self { + Self { + storage, + path, + meta_data, + begin: Instant::now(), + } + } +} + +impl Drop for ObjectStoreReader { + fn drop(&mut self) { + debug!( + "ObjectStoreReader dropped, path:{}, elapsed:{:?}", + &self.path, + self.begin.elapsed() + ); + } +} + +impl AsyncFileReader for ObjectStoreReader { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { + self.storage + .get_range(&self.path, range) + .map_err(|e| { + parquet::errors::ParquetError::General(format!( + "Failed to fetch range from object store, err:{e}" + )) + }) + .boxed() + } + + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, parquet::errors::Result>> { + async move { + self.storage + .get_ranges(&self.path, &ranges) + .map_err(|e| { + parquet::errors::ParquetError::General(format!( + "Failed to fetch ranges from object store, err:{e}" + )) + }) + .await + } + .boxed() + } + + fn get_metadata( + &mut self, + ) -> BoxFuture<'_, parquet::errors::Result>> { + Box::pin(async move { Ok(self.meta_data.clone()) }) + } +} diff --git a/tools/src/bin/sst-metadata.rs b/tools/src/bin/sst-metadata.rs index 95765f9b8d..a089ad2da5 100644 --- a/tools/src/bin/sst-metadata.rs +++ b/tools/src/bin/sst-metadata.rs @@ -13,7 +13,7 @@ use common_util::{ }; use futures::StreamExt; use object_store::{LocalFileSystem, ObjectMeta, ObjectStoreRef, Path}; -use parquet_ext::meta_data::fetch_parquet_metadata; +use parquet_ext::{meta_data::fetch_parquet_metadata, reader::ObjectStoreReader}; use tokio::{runtime::Handle, task::JoinSet}; #[derive(Parser, Debug)] @@ -30,6 +30,10 @@ struct Args { /// Thread num, 0 means cpu num #[clap(short, long, default_value_t = 0)] threads: usize, + + /// Print page indexes + #[clap(short, long, required(false))] + page_indexes: bool, } #[derive(Default, Debug)] @@ -92,6 +96,7 @@ async fn run(args: Args) -> Result<()> { let mut join_set = JoinSet::new(); let mut ssts = storage.list(None).await?; let verbose = args.verbose; + let page_indexes = args.page_indexes; while let Some(object_meta) = ssts.next().await { let object_meta = object_meta?; let storage = storage.clone(); @@ -99,7 +104,8 @@ async fn run(args: Args) -> Result<()> { join_set.spawn_on( async move { let (metadata, metadata_size, kv_size) = - parse_metadata(storage, location, object_meta.size, verbose).await?; + parse_metadata(storage, location, object_meta.size, verbose, page_indexes) + .await?; Ok::<_, anyhow::Error>((object_meta, metadata, metadata_size, kv_size)) }, &handle, @@ -195,9 +201,11 @@ async fn parse_metadata( path: Path, size: usize, verbose: bool, + page_indexes: bool, ) -> Result<(MetaData, usize, usize)> { let reader = ChunkReaderAdapter::new(&path, &storage); let (parquet_metadata, metadata_size) = fetch_parquet_metadata(size, &reader).await?; + let kv_metadata = parquet_metadata.file_metadata().key_value_metadata(); let kv_size = kv_metadata .map(|kvs| { @@ -217,6 +225,15 @@ async fn parse_metadata( }) .unwrap_or(0); - let md = MetaData::try_new(&parquet_metadata, false)?; + let md = if page_indexes { + let object_store_reader = + ObjectStoreReader::new(storage, path.clone(), Arc::new(parquet_metadata)); + let parquet_metadata = + parquet_ext::meta_data::meta_with_page_indexes(object_store_reader).await?; + MetaData::try_new(&parquet_metadata, false)? + } else { + MetaData::try_new(&parquet_metadata, false)? + }; + Ok((md, metadata_size, kv_size)) }