Skip to content

Commit

Permalink
chore: remove hybrid related logic (#1172)
Browse files Browse the repository at this point in the history
## Rationale
Hybrid format have no obvious advantages, and maintain it in our
codebase need lots of work, so we decide to remove from our codebase.

## Detailed Changes


## Test Plan
Existing UT
  • Loading branch information
jiacai2050 authored Aug 28, 2023
1 parent 92abd71 commit 06e985f
Show file tree
Hide file tree
Showing 19 changed files with 96 additions and 1,789 deletions.
1 change: 0 additions & 1 deletion analytic_engine/src/compaction/picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,6 @@ mod tests {
max_sequence: 200,
schema: build_schema(),
parquet_filter: Default::default(),
collapsible_cols_idx: Vec::new(),
};

SstMetaData::Parquet(Arc::new(parquet_meta_data))
Expand Down
9 changes: 1 addition & 8 deletions analytic_engine/src/sst/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ impl Factory for FactoryImpl {
};

match storage_format {
StorageFormat::Columnar | StorageFormat::Hybrid => {
StorageFormat::Columnar => {
let reader = AsyncParquetReader::new(
path,
options,
Expand All @@ -197,16 +197,9 @@ impl Factory for FactoryImpl {
store_picker: &'a ObjectStorePickerRef,
level: Level,
) -> Result<Box<dyn SstWriter + Send + 'a>> {
let hybrid_encoding = match options.storage_format_hint {
StorageFormatHint::Specific(format) => matches!(format, StorageFormat::Hybrid),
// `Auto` is mapped to columnar parquet format now, may change in future.
StorageFormatHint::Auto => false,
};

Ok(Box::new(ParquetSstWriter::new(
path,
level,
hybrid_encoding,
store_picker,
options,
)))
Expand Down
60 changes: 42 additions & 18 deletions analytic_engine/src/sst/meta_data/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,18 @@ mod tests {
schema::Builder as CustomSchemaBuilder,
time::{TimeRange, Timestamp},
};
use object_store::LocalFileSystem;
use object_store::{LocalFileSystem, ObjectStoreRef};
use parquet::{arrow::ArrowWriter, file::footer};
use parquet_ext::ParquetMetaData;

use super::MetaData;
use crate::sst::parquet::{encoding, meta_data::ParquetMetaData as CustomParquetMetaData};
use crate::{
sst::parquet::{
encoding::{self, META_PATH_KEY, META_VERSION_KEY},
meta_data::ParquetMetaData as CustomParquetMetaData,
},
table::sst_util::new_metadata_path,
};

// TODO: remove it and use the suggested api.
#[allow(deprecated)]
Expand Down Expand Up @@ -202,21 +208,28 @@ mod tests {

if let Some(kv_metas) = original_file_md.key_value_metadata() {
let processed_kv_metas = processed_file_md.key_value_metadata().unwrap();
assert_eq!(kv_metas.len(), processed_kv_metas.len() + 1);
let mut idx_for_processed = 0;
assert_eq!(kv_metas.len(), processed_kv_metas.len() + 2);
for kv in kv_metas {
if kv.key == encoding::META_KEY {
continue;
match kv.key.as_str() {
"ARROW:schema" => {
// don't care this
}
encoding::META_KEY => assert!(kv.value.is_none()),
encoding::META_VERSION_KEY => assert_eq!("2", kv.value.clone().unwrap()),
encoding::META_PATH_KEY => {
let meta_path = kv.value.as_ref().unwrap();
assert!(meta_path.ends_with(".metadata"));
}
_ => panic!("Unknown parquet kv, value:{kv:?}"),
}
assert_eq!(kv, &processed_kv_metas[idx_for_processed]);
idx_for_processed += 1;
}
} else {
assert!(processed_file_md.key_value_metadata().is_none());
}
}

fn write_parquet_file_with_metadata(
async fn write_parquet_file_with_metadata(
store: ObjectStoreRef,
parquet_file_path: &Path,
custom_meta_data: &CustomParquetMetaData,
) {
Expand Down Expand Up @@ -246,13 +259,21 @@ mod tests {
)
.unwrap();
let mut writer = ArrowWriter::try_new(file, batch.schema(), None).unwrap();

let encoded_meta_data =
encoding::encode_sst_meta_data_v1(custom_meta_data.clone()).unwrap();
writer.append_key_value_metadata(encoded_meta_data);

let meta_path = new_metadata_path(parquet_file_path.to_str().unwrap());
writer.append_key_value_metadata(parquet::format::KeyValue {
key: META_PATH_KEY.to_string(),
value: Some(meta_path.clone()),
});
writer.append_key_value_metadata(parquet::format::KeyValue {
key: META_VERSION_KEY.to_string(),
value: Some("2".to_string()),
});
writer.write(&batch).unwrap();
writer.close().unwrap();

let bytes = encoding::encode_sst_meta_data(custom_meta_data.clone()).unwrap();
let meta_path = object_store::Path::from(meta_path);
store.put(&meta_path, bytes).await.unwrap();
}

#[tokio::test]
Expand Down Expand Up @@ -288,14 +309,17 @@ mod tests {
max_sequence: 1001,
schema,
parquet_filter: None,
collapsible_cols_idx: vec![],
};
write_parquet_file_with_metadata(parquet_file_path.as_path(), &custom_meta_data);
let store = Arc::new(LocalFileSystem::new_with_prefix(temp_dir.path()).unwrap());
write_parquet_file_with_metadata(
store.clone(),
parquet_file_path.as_path(),
&custom_meta_data,
)
.await;

let parquet_file = File::open(parquet_file_path.as_path()).unwrap();
let parquet_meta_data = footer::parse_metadata(&parquet_file).unwrap();
let store =
Arc::new(LocalFileSystem::new_with_prefix(parquet_file_path.as_path()).unwrap());
let meta_data = MetaData::try_new(&parquet_meta_data, false, store)
.await
.unwrap();
Expand Down
6 changes: 3 additions & 3 deletions analytic_engine/src/sst/meta_data/metadata_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::sst::{
KvMetaPathEmpty,
},
parquet::{
encoding::{self, decode_sst_meta_data_v2, META_VERSION_CURRENT, META_VERSION_V1},
encoding::{self, decode_sst_meta_data_from_bytes, META_VERSION_CURRENT, META_VERSION_V1},
meta_data::{ParquetMetaData, ParquetMetaDataRef},
},
};
Expand All @@ -54,7 +54,7 @@ impl CustomMetadataReader for MetaV1Reader<'_> {
async fn get_metadata(&self) -> Result<ParquetMetaData> {
let custom_kv_meta = self.custom_kv_meta.context(KvMetaDataNotFound)?;

encoding::decode_sst_meta_data_v1(custom_kv_meta).context(DecodeCustomMetaData)
encoding::decode_sst_meta_data_from_kv(custom_kv_meta).context(DecodeCustomMetaData)
}
}

Expand Down Expand Up @@ -88,7 +88,7 @@ impl CustomMetadataReader for MetaV2Reader {
file_path: meta_path.to_string(),
})?;

decode_sst_meta_data_v2(metadata.as_bytes()).context(DecodeCustomMetaData)
decode_sst_meta_data_from_bytes(metadata.as_bytes()).context(DecodeCustomMetaData)
}
}
}
Expand Down
18 changes: 2 additions & 16 deletions analytic_engine/src/sst/parquet/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,7 @@ use crate::{
SstMetaData,
},
parquet::{
encoding::ParquetDecoder,
meta_data::{ParquetFilter, ParquetMetaDataRef},
row_group_pruner::RowGroupPruner,
encoding::ParquetDecoder, meta_data::ParquetFilter, row_group_pruner::RowGroupPruner,
},
reader::{error::*, Result, SstReader},
},
Expand Down Expand Up @@ -157,20 +155,12 @@ impl<'a> Reader<'a> {
ArrowRecordBatchProjector::from(row_projector)
};

let sst_meta_data = self
.meta_data
.as_ref()
// metadata must be inited after `init_if_necessary`.
.unwrap()
.custom();

let streams: Vec<_> = streams
.into_iter()
.map(|stream| {
Box::new(RecordBatchProjector::new(
stream,
row_projector.clone(),
sst_meta_data.clone(),
self.metrics.metrics_collector.clone(),
)) as _
})
Expand Down Expand Up @@ -474,14 +464,12 @@ struct RecordBatchProjector {

metrics: ProjectorMetrics,
start_time: Instant,
sst_meta: ParquetMetaDataRef,
}

impl RecordBatchProjector {
fn new(
stream: SendableRecordBatchStream,
row_projector: ArrowRecordBatchProjector,
sst_meta: ParquetMetaDataRef,
metrics_collector: Option<MetricsCollector>,
) -> Self {
let metrics = ProjectorMetrics {
Expand All @@ -494,7 +482,6 @@ impl RecordBatchProjector {
row_projector,
metrics,
start_time: Instant::now(),
sst_meta,
}
}
}
Expand All @@ -510,8 +497,7 @@ impl Stream for RecordBatchProjector {
match record_batch.box_err().context(DecodeRecordBatch {}) {
Err(e) => Poll::Ready(Some(Err(e))),
Ok(record_batch) => {
let parquet_decoder =
ParquetDecoder::new(&projector.sst_meta.collapsible_cols_idx);
let parquet_decoder = ParquetDecoder::new();
let record_batch = parquet_decoder
.decode_record_batch(record_batch)
.box_err()
Expand Down
Loading

0 comments on commit 06e985f

Please sign in to comment.