Skip to content

Commit

Permalink
Revert "feat: add page indexes for metadata (apache#1027)"
Browse files Browse the repository at this point in the history
This reverts commit 1fd7857.
  • Loading branch information
Rachelint committed Jul 11, 2023
1 parent cd2b688 commit b53ba7c
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 184 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

114 changes: 81 additions & 33 deletions analytic_engine/src/sst/parquet/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,17 @@ use datafusion::{
metrics::ExecutionPlanMetricsSet,
},
};
use futures::{Stream, StreamExt};
use futures::{future::BoxFuture, FutureExt, Stream, StreamExt, TryFutureExt};
use log::{debug, error};
use object_store::{ObjectStoreRef, Path};
use parquet::{
arrow::{arrow_reader::RowSelection, ParquetRecordBatchStreamBuilder, ProjectionMask},
arrow::{
arrow_reader::RowSelection, async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder,
ProjectionMask,
},
file::metadata::RowGroupMetaData,
};
use parquet_ext::{meta_data::ChunkReader, reader::ObjectStoreReader};
use parquet_ext::meta_data::ChunkReader;
use snafu::ResultExt;
use table_engine::predicate::PredicateRef;
use tokio::sync::mpsc::{self, Receiver, Sender};
Expand Down Expand Up @@ -278,23 +281,13 @@ impl<'a> Reader<'a> {

let mut streams = Vec::with_capacity(target_row_group_chunks.len());
for chunk in target_row_group_chunks {
let object_store_reader = ObjectStoreReader::new(
self.store.clone(),
self.path.clone(),
parquet_metadata.clone(),
);
let object_store_reader =
ObjectStoreReader::new(self.store.clone(), self.path.clone(), meta_data.clone());
let mut builder = ParquetRecordBatchStreamBuilder::new(object_store_reader)
.await
.with_context(|| ParquetError)?;

let row_selection =
self.build_row_selection(arrow_schema.clone(), &chunk, parquet_metadata)?;

debug!(
"Build row selection for file path:{}, result:{row_selection:?}, page indexes:{}",
self.path,
parquet_metadata.page_indexes().is_some()
);
if let Some(selection) = row_selection {
builder = builder.with_row_selection(selection);
};
Expand Down Expand Up @@ -347,32 +340,18 @@ impl<'a> Reader<'a> {
Ok(file_size)
}

async fn load_meta_data_from_storage(&self, ignore_sst_filter: bool) -> Result<MetaData> {
async fn load_meta_data_from_storage(&self) -> Result<parquet_ext::ParquetMetaData> {
let file_size = self.load_file_size().await?;
let chunk_reader_adapter = ChunkReaderAdapter::new(self.path, self.store);

let (parquet_meta_data, _) =
let (meta_data, _) =
parquet_ext::meta_data::fetch_parquet_metadata(file_size, &chunk_reader_adapter)
.await
.with_context(|| FetchAndDecodeSstMeta {
file_path: self.path.to_string(),
})?;

let object_store_reader = parquet_ext::reader::ObjectStoreReader::new(
self.store.clone(),
self.path.clone(),
Arc::new(parquet_meta_data),
);

let parquet_meta_data = parquet_ext::meta_data::meta_with_page_indexes(object_store_reader)
.await
.with_context(|| DecodePageIndexes {
file_path: self.path.to_string(),
})?;

MetaData::try_new(&parquet_meta_data, ignore_sst_filter)
.box_err()
.context(DecodeSstMeta)
Ok(meta_data)
}

fn need_update_cache(&self) -> bool {
Expand All @@ -396,8 +375,12 @@ impl<'a> Reader<'a> {
let empty_predicate = self.predicate.exprs().is_empty();

let meta_data = {
let parquet_meta_data = self.load_meta_data_from_storage().await?;

let ignore_sst_filter = avoid_update_cache && empty_predicate;
self.load_meta_data_from_storage(ignore_sst_filter).await?
MetaData::try_new(&parquet_meta_data, ignore_sst_filter)
.box_err()
.context(DecodeSstMeta)?
};

if avoid_update_cache || self.meta_cache.is_none() {
Expand Down Expand Up @@ -430,6 +413,71 @@ impl<'a> Drop for Reader<'a> {
}
}

#[derive(Clone)]
struct ObjectStoreReader {
storage: ObjectStoreRef,
path: Path,
meta_data: MetaData,
begin: Instant,
}

impl ObjectStoreReader {
fn new(storage: ObjectStoreRef, path: Path, meta_data: MetaData) -> Self {
Self {
storage,
path,
meta_data,
begin: Instant::now(),
}
}
}

impl Drop for ObjectStoreReader {
fn drop(&mut self) {
debug!(
"ObjectStoreReader dropped, path:{}, elapsed:{:?}",
&self.path,
self.begin.elapsed()
);
}
}

impl AsyncFileReader for ObjectStoreReader {
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
self.storage
.get_range(&self.path, range)
.map_err(|e| {
parquet::errors::ParquetError::General(format!(
"Failed to fetch range from object store, err:{e}"
))
})
.boxed()
}

fn get_byte_ranges(
&mut self,
ranges: Vec<Range<usize>>,
) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>> {
async move {
self.storage
.get_ranges(&self.path, &ranges)
.map_err(|e| {
parquet::errors::ParquetError::General(format!(
"Failed to fetch ranges from object store, err:{e}"
))
})
.await
}
.boxed()
}

fn get_metadata(
&mut self,
) -> BoxFuture<'_, parquet::errors::Result<Arc<parquet::file::metadata::ParquetMetaData>>> {
Box::pin(async move { Ok(self.meta_data.parquet().clone()) })
}
}

pub struct ChunkReaderAdapter<'a> {
path: &'a Path,
store: &'a ObjectStoreRef,
Expand Down
40 changes: 17 additions & 23 deletions analytic_engine/src/sst/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,70 +15,64 @@ pub mod error {
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Try to read again, path:{path}.\nBacktrace:\n{backtrace}"))]
#[snafu(display("Try to read again, path:{}.\nBacktrace:\n{}", path, backtrace))]
ReadAgain { backtrace: Backtrace, path: String },

#[snafu(display("Fail to read persisted file, path:{path}, err:{source}"))]
#[snafu(display("Fail to read persisted file, path:{}, err:{}", path, source))]
ReadPersist { path: String, source: GenericError },

#[snafu(display("Failed to decode record batch, err:{source}"))]
#[snafu(display("Failed to decode record batch, err:{}", source))]
DecodeRecordBatch { source: GenericError },

#[snafu(display(
"Failed to decode sst meta data, file_path:{file_path}, err:{source}.\nBacktrace:\n{backtrace:?}",
"Failed to decode sst meta data, file_path:{}, err:{}.\nBacktrace:\n{:?}",
file_path,
source,
backtrace
))]
FetchAndDecodeSstMeta {
file_path: String,
source: parquet::errors::ParquetError,
backtrace: Backtrace,
},

#[snafu(display(
"Failed to decode page indexes for meta data, file_path:{file_path}, err:{source}.\nBacktrace:\n{backtrace:?}",
))]
DecodePageIndexes {
file_path: String,
source: parquet::errors::ParquetError,
backtrace: Backtrace,
},

#[snafu(display("Failed to decode sst meta data, err:{source}"))]
#[snafu(display("Failed to decode sst meta data, err:{}", source))]
DecodeSstMeta { source: GenericError },

#[snafu(display("Sst meta data is not found.\nBacktrace:\n{backtrace}"))]
#[snafu(display("Sst meta data is not found.\nBacktrace:\n{}", backtrace))]
SstMetaNotFound { backtrace: Backtrace },

#[snafu(display("Fail to projection, err:{source}"))]
#[snafu(display("Fail to projection, err:{}", source))]
Projection { source: GenericError },

#[snafu(display("Sst meta data is empty.\nBacktrace:\n{backtrace}"))]
#[snafu(display("Sst meta data is empty.\nBacktrace:\n{}", backtrace))]
EmptySstMeta { backtrace: Backtrace },

#[snafu(display("Invalid schema, err:{source}"))]
#[snafu(display("Invalid schema, err:{}", source))]
InvalidSchema { source: common_types::schema::Error },

#[snafu(display("Meet a datafusion error, err:{source}\nBacktrace:\n{backtrace}"))]
#[snafu(display("Meet a datafusion error, err:{}\nBacktrace:\n{}", source, backtrace))]
DataFusionError {
source: datafusion::error::DataFusionError,
backtrace: Backtrace,
},

#[snafu(display("Meet a object store error, err:{source}\nBacktrace:\n{backtrace}"))]
#[snafu(display("Meet a object store error, err:{}\nBacktrace:\n{}", source, backtrace))]
ObjectStoreError {
source: object_store::ObjectStoreError,
backtrace: Backtrace,
},

#[snafu(display("Meet a parquet error, err:{source}\nBacktrace:\n{backtrace}"))]
#[snafu(display("Meet a parquet error, err:{}\nBacktrace:\n{}", source, backtrace))]
ParquetError {
source: parquet::errors::ParquetError,
backtrace: Backtrace,
},

#[snafu(display("Other kind of error:{source}"))]
#[snafu(display("Other kind of error:{}", source))]
Other { source: GenericError },

#[snafu(display("Other kind of error, msg:{msg}.\nBacktrace:\n{backtrace}"))]
#[snafu(display("Other kind of error, msg:{}.\nBacktrace:\n{}", msg, backtrace))]
OtherNoCause { msg: String, backtrace: Backtrace },
}

Expand Down
2 changes: 0 additions & 2 deletions components/parquet_ext/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ async-trait = { workspace = true }
bytes = { workspace = true }
common_util = { workspace = true }
datafusion = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
object_store = { workspace = true }
parquet = { workspace = true }
tokio = { workspace = true }
1 change: 0 additions & 1 deletion components/parquet_ext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

pub mod meta_data;
pub mod prune;
pub mod reader;
pub mod reverse_reader;
#[cfg(test)]
pub mod tests;
Expand Down
23 changes: 1 addition & 22 deletions components/parquet_ext/src/meta_data.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0.

use std::{ops::Range, sync::Arc};
use std::ops::Range;

use async_trait::async_trait;
use bytes::Bytes;
use common_util::error::GenericResult;
use parquet::{
arrow::{arrow_reader::ArrowReaderOptions, ParquetRecordBatchStreamBuilder},
errors::{ParquetError, Result},
file::{footer, metadata::ParquetMetaData},
};

use crate::reader::ObjectStoreReader;

#[async_trait]
pub trait ChunkReader: Sync + Send {
async fn get_bytes(&self, range: Range<usize>) -> GenericResult<Bytes>;
Expand Down Expand Up @@ -68,21 +65,3 @@ pub async fn fetch_parquet_metadata(

footer::decode_metadata(&metadata_bytes).map(|v| (v, metadata_len))
}

/// Build page indexes for meta data
///
/// TODO: Currently there is no method to build page indexes for meta data in
/// `parquet`, maybe we can write a issue in `arrow-rs` .
pub async fn meta_with_page_indexes(
object_store_reader: ObjectStoreReader,
) -> Result<Arc<ParquetMetaData>> {
let read_options = ArrowReaderOptions::new().with_page_index(true);
let builder =
ParquetRecordBatchStreamBuilder::new_with_options(object_store_reader, read_options)
.await
.map_err(|e| {
let err_msg = format!("failed to build page indexes in metadata, err:{e}");
ParquetError::General(err_msg)
})?;
Ok(builder.metadata().clone())
}
Loading

0 comments on commit b53ba7c

Please sign in to comment.