Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add page indexes for metadata #1027

Merged
merged 1 commit into from
Jun 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

114 changes: 33 additions & 81 deletions analytic_engine/src/sst/parquet/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,14 @@ use datafusion::{
metrics::ExecutionPlanMetricsSet,
},
};
use futures::{future::BoxFuture, FutureExt, Stream, StreamExt, TryFutureExt};
use futures::{Stream, StreamExt};
use log::{debug, error};
use object_store::{ObjectStoreRef, Path};
use parquet::{
arrow::{
arrow_reader::RowSelection, async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder,
ProjectionMask,
},
arrow::{arrow_reader::RowSelection, ParquetRecordBatchStreamBuilder, ProjectionMask},
file::metadata::RowGroupMetaData,
};
use parquet_ext::meta_data::ChunkReader;
use parquet_ext::{meta_data::ChunkReader, reader::ObjectStoreReader};
use snafu::ResultExt;
use table_engine::predicate::PredicateRef;
use tokio::sync::mpsc::{self, Receiver, Sender};
Expand Down Expand Up @@ -281,13 +278,23 @@ impl<'a> Reader<'a> {

let mut streams = Vec::with_capacity(target_row_group_chunks.len());
for chunk in target_row_group_chunks {
let object_store_reader =
ObjectStoreReader::new(self.store.clone(), self.path.clone(), meta_data.clone());
let object_store_reader = ObjectStoreReader::new(
self.store.clone(),
self.path.clone(),
parquet_metadata.clone(),
);
let mut builder = ParquetRecordBatchStreamBuilder::new(object_store_reader)
.await
.with_context(|| ParquetError)?;

let row_selection =
self.build_row_selection(arrow_schema.clone(), &chunk, parquet_metadata)?;

debug!(
"Build row selection for file path:{}, result:{row_selection:?}, page indexes:{}",
self.path,
parquet_metadata.page_indexes().is_some()
);
if let Some(selection) = row_selection {
builder = builder.with_row_selection(selection);
};
Expand Down Expand Up @@ -340,18 +347,32 @@ impl<'a> Reader<'a> {
Ok(file_size)
}

async fn load_meta_data_from_storage(&self) -> Result<parquet_ext::ParquetMetaData> {
async fn load_meta_data_from_storage(&self, ignore_sst_filter: bool) -> Result<MetaData> {
let file_size = self.load_file_size().await?;
let chunk_reader_adapter = ChunkReaderAdapter::new(self.path, self.store);

let (meta_data, _) =
let (parquet_meta_data, _) =
parquet_ext::meta_data::fetch_parquet_metadata(file_size, &chunk_reader_adapter)
.await
.with_context(|| FetchAndDecodeSstMeta {
file_path: self.path.to_string(),
})?;

Ok(meta_data)
let object_store_reader = parquet_ext::reader::ObjectStoreReader::new(
self.store.clone(),
self.path.clone(),
Arc::new(parquet_meta_data),
);

let parquet_meta_data = parquet_ext::meta_data::meta_with_page_indexes(object_store_reader)
.await
.with_context(|| DecodePageIndexes {
file_path: self.path.to_string(),
})?;

MetaData::try_new(&parquet_meta_data, ignore_sst_filter)
.box_err()
.context(DecodeSstMeta)
}

fn need_update_cache(&self) -> bool {
Expand All @@ -375,12 +396,8 @@ impl<'a> Reader<'a> {
let empty_predicate = self.predicate.exprs().is_empty();

let meta_data = {
let parquet_meta_data = self.load_meta_data_from_storage().await?;

let ignore_sst_filter = avoid_update_cache && empty_predicate;
MetaData::try_new(&parquet_meta_data, ignore_sst_filter)
.box_err()
.context(DecodeSstMeta)?
self.load_meta_data_from_storage(ignore_sst_filter).await?
};

if avoid_update_cache || self.meta_cache.is_none() {
Expand Down Expand Up @@ -413,71 +430,6 @@ impl<'a> Drop for Reader<'a> {
}
}

#[derive(Clone)]
struct ObjectStoreReader {
storage: ObjectStoreRef,
path: Path,
meta_data: MetaData,
begin: Instant,
}

impl ObjectStoreReader {
fn new(storage: ObjectStoreRef, path: Path, meta_data: MetaData) -> Self {
Self {
storage,
path,
meta_data,
begin: Instant::now(),
}
}
}

impl Drop for ObjectStoreReader {
fn drop(&mut self) {
debug!(
"ObjectStoreReader dropped, path:{}, elapsed:{:?}",
&self.path,
self.begin.elapsed()
);
}
}

impl AsyncFileReader for ObjectStoreReader {
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
self.storage
.get_range(&self.path, range)
.map_err(|e| {
parquet::errors::ParquetError::General(format!(
"Failed to fetch range from object store, err:{e}"
))
})
.boxed()
}

fn get_byte_ranges(
&mut self,
ranges: Vec<Range<usize>>,
) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>> {
async move {
self.storage
.get_ranges(&self.path, &ranges)
.map_err(|e| {
parquet::errors::ParquetError::General(format!(
"Failed to fetch ranges from object store, err:{e}"
))
})
.await
}
.boxed()
}

fn get_metadata(
&mut self,
) -> BoxFuture<'_, parquet::errors::Result<Arc<parquet::file::metadata::ParquetMetaData>>> {
Box::pin(async move { Ok(self.meta_data.parquet().clone()) })
}
}

pub struct ChunkReaderAdapter<'a> {
path: &'a Path,
store: &'a ObjectStoreRef,
Expand Down
40 changes: 23 additions & 17 deletions analytic_engine/src/sst/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,64 +15,70 @@ pub mod error {
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Try to read again, path:{}.\nBacktrace:\n{}", path, backtrace))]
#[snafu(display("Try to read again, path:{path}.\nBacktrace:\n{backtrace}"))]
ReadAgain { backtrace: Backtrace, path: String },

#[snafu(display("Fail to read persisted file, path:{}, err:{}", path, source))]
#[snafu(display("Fail to read persisted file, path:{path}, err:{source}"))]
ReadPersist { path: String, source: GenericError },

#[snafu(display("Failed to decode record batch, err:{}", source))]
#[snafu(display("Failed to decode record batch, err:{source}"))]
DecodeRecordBatch { source: GenericError },

#[snafu(display(
"Failed to decode sst meta data, file_path:{}, err:{}.\nBacktrace:\n{:?}",
file_path,
source,
backtrace
"Failed to decode sst meta data, file_path:{file_path}, err:{source}.\nBacktrace:\n{backtrace:?}",
))]
FetchAndDecodeSstMeta {
file_path: String,
source: parquet::errors::ParquetError,
backtrace: Backtrace,
},

#[snafu(display("Failed to decode sst meta data, err:{}", source))]
#[snafu(display(
"Failed to decode page indexes for meta data, file_path:{file_path}, err:{source}.\nBacktrace:\n{backtrace:?}",
))]
DecodePageIndexes {
file_path: String,
source: parquet::errors::ParquetError,
backtrace: Backtrace,
},

#[snafu(display("Failed to decode sst meta data, err:{source}"))]
DecodeSstMeta { source: GenericError },

#[snafu(display("Sst meta data is not found.\nBacktrace:\n{}", backtrace))]
#[snafu(display("Sst meta data is not found.\nBacktrace:\n{backtrace}"))]
SstMetaNotFound { backtrace: Backtrace },

#[snafu(display("Fail to projection, err:{}", source))]
#[snafu(display("Fail to projection, err:{source}"))]
Projection { source: GenericError },

#[snafu(display("Sst meta data is empty.\nBacktrace:\n{}", backtrace))]
#[snafu(display("Sst meta data is empty.\nBacktrace:\n{backtrace}"))]
EmptySstMeta { backtrace: Backtrace },

#[snafu(display("Invalid schema, err:{}", source))]
#[snafu(display("Invalid schema, err:{source}"))]
InvalidSchema { source: common_types::schema::Error },

#[snafu(display("Meet a datafusion error, err:{}\nBacktrace:\n{}", source, backtrace))]
#[snafu(display("Meet a datafusion error, err:{source}\nBacktrace:\n{backtrace}"))]
DataFusionError {
source: datafusion::error::DataFusionError,
backtrace: Backtrace,
},

#[snafu(display("Meet a object store error, err:{}\nBacktrace:\n{}", source, backtrace))]
#[snafu(display("Meet a object store error, err:{source}\nBacktrace:\n{backtrace}"))]
ObjectStoreError {
source: object_store::ObjectStoreError,
backtrace: Backtrace,
},

#[snafu(display("Meet a parquet error, err:{}\nBacktrace:\n{}", source, backtrace))]
#[snafu(display("Meet a parquet error, err:{source}\nBacktrace:\n{backtrace}"))]
ParquetError {
source: parquet::errors::ParquetError,
backtrace: Backtrace,
},

#[snafu(display("Other kind of error:{}", source))]
#[snafu(display("Other kind of error:{source}"))]
Other { source: GenericError },

#[snafu(display("Other kind of error, msg:{}.\nBacktrace:\n{}", msg, backtrace))]
#[snafu(display("Other kind of error, msg:{msg}.\nBacktrace:\n{backtrace}"))]
OtherNoCause { msg: String, backtrace: Backtrace },
}

Expand Down
2 changes: 2 additions & 0 deletions components/parquet_ext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ async-trait = { workspace = true }
bytes = { workspace = true }
common_util = { workspace = true }
datafusion = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
object_store = { workspace = true }
parquet = { workspace = true }
tokio = { workspace = true }
1 change: 1 addition & 0 deletions components/parquet_ext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

pub mod meta_data;
pub mod prune;
pub mod reader;
pub mod reverse_reader;
#[cfg(test)]
pub mod tests;
Expand Down
23 changes: 22 additions & 1 deletion components/parquet_ext/src/meta_data.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

use std::ops::Range;
use std::{ops::Range, sync::Arc};

use async_trait::async_trait;
use bytes::Bytes;
use common_util::error::GenericResult;
use parquet::{
arrow::{arrow_reader::ArrowReaderOptions, ParquetRecordBatchStreamBuilder},
errors::{ParquetError, Result},
file::{footer, metadata::ParquetMetaData},
};

use crate::reader::ObjectStoreReader;

#[async_trait]
pub trait ChunkReader: Sync + Send {
async fn get_bytes(&self, range: Range<usize>) -> GenericResult<Bytes>;
Expand Down Expand Up @@ -65,3 +68,21 @@ pub async fn fetch_parquet_metadata(

footer::decode_metadata(&metadata_bytes).map(|v| (v, metadata_len))
}

/// Build page indexes for meta data
///
/// TODO: Currently there is no method to build page indexes for meta data in
/// `parquet`, maybe we can write a issue in `arrow-rs` .
pub async fn meta_with_page_indexes(
object_store_reader: ObjectStoreReader,
) -> Result<Arc<ParquetMetaData>> {
let read_options = ArrowReaderOptions::new().with_page_index(true);
let builder =
ParquetRecordBatchStreamBuilder::new_with_options(object_store_reader, read_options)
.await
.map_err(|e| {
let err_msg = format!("failed to build page indexes in metadata, err:{e}");
ParquetError::General(err_msg)
})?;
Ok(builder.metadata().clone())
}
Loading
Loading