Skip to content

Commit

Permalink
feat: init scan implementation for time merge storage (apache#1586)
Browse files Browse the repository at this point in the history
## Rationale


## Detailed Changes


## Test Plan
CI
  • Loading branch information
jiacai2050 authored Nov 5, 2024
1 parent 115b002 commit e65c504
Show file tree
Hide file tree
Showing 7 changed files with 260 additions and 59 deletions.
1 change: 1 addition & 0 deletions horaedb/metric_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

pub mod error;
mod manifest;
mod read;
mod sst;
pub mod storage;
pub mod types;
Expand Down
40 changes: 25 additions & 15 deletions horaedb/metric_engine/src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use tokio::sync::RwLock;

use crate::{
sst::{FileId, FileMeta, SstFile},
types::ObjectStoreRef,
types::{ObjectStoreRef, TimeRange},
AnyhowError, Error, Result,
};

Expand Down Expand Up @@ -56,6 +56,18 @@ impl TryFrom<pb_types::Manifest> for Payload {
}
}

impl From<Payload> for pb_types::Manifest {
fn from(value: Payload) -> Self {
pb_types::Manifest {
files: value
.files
.into_iter()
.map(pb_types::SstFile::from)
.collect(),
}
}
}

impl Manifest {
pub async fn try_new(path: String, store: ObjectStoreRef) -> Result<Self> {
let snapshot_path = Path::from(format!("{path}/{SNAPSHOT_FILENAME}"));
Expand Down Expand Up @@ -97,20 +109,7 @@ impl Manifest {
let new_sst = SstFile { id, meta };
tmp_ssts.push(new_sst.clone());
let pb_manifest = pb_types::Manifest {
files: tmp_ssts
.into_iter()
.map(|f| pb_types::SstFile {
id: f.id,
meta: Some(pb_types::SstMeta {
max_sequence: f.meta.max_sequence,
num_rows: f.meta.num_rows,
time_range: Some(pb_types::TimeRange {
start: f.meta.time_range.start,
end: f.meta.time_range.end,
}),
}),
})
.collect::<Vec<_>>(),
files: tmp_ssts.into_iter().map(|f| f.into()).collect::<Vec<_>>(),
};

let mut buf = Vec::with_capacity(pb_manifest.encoded_len());
Expand All @@ -130,4 +129,15 @@ impl Manifest {

Ok(())
}

pub async fn find_ssts(&self, time_range: &TimeRange) -> Vec<SstFile> {
let payload = self.payload.read().await;

payload
.files
.iter()
.filter(move |f| f.meta.time_range.overlaps(time_range))
.cloned()
.collect()
}
}
55 changes: 55 additions & 0 deletions horaedb/metric_engine/src/read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion::{
datasource::physical_plan::{FileMeta, ParquetFileReaderFactory},
error::Result as DfResult,
parquet::arrow::async_reader::AsyncFileReader,
physical_plan::metrics::ExecutionPlanMetricsSet,
};
use parquet::arrow::async_reader::ParquetObjectReader;

use crate::types::ObjectStoreRef;

#[derive(Debug, Clone)]
pub struct DefaultParquetFileReaderFactory {
object_store: ObjectStoreRef,
}

/// Returns a AsyncFileReader factory
impl DefaultParquetFileReaderFactory {
pub fn new(object_store: ObjectStoreRef) -> Self {
Self { object_store }
}
}

impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
fn create_reader(
&self,
_partition_index: usize,
file_meta: FileMeta,
metadata_size_hint: Option<usize>,
_metrics: &ExecutionPlanMetricsSet,
) -> DfResult<Box<dyn AsyncFileReader + Send>> {
let object_store = self.object_store.clone();
let mut reader = ParquetObjectReader::new(object_store, file_meta.object_meta);
if let Some(size) = metadata_size_hint {
reader = reader.with_footer_size_hint(size);
}
Ok(Box::new(reader))
}
}
30 changes: 26 additions & 4 deletions horaedb/metric_engine/src/sst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,20 @@ impl TryFrom<pb_types::SstFile> for SstFile {
}
}

impl From<SstFile> for pb_types::SstFile {
fn from(value: SstFile) -> Self {
pb_types::SstFile {
id: value.id,
meta: Some(value.meta.into()),
}
}
}

#[derive(Clone, Debug)]
pub struct FileMeta {
pub max_sequence: u64,
pub num_rows: u32,
pub size: u32,
pub time_range: TimeRange,
}

Expand All @@ -66,14 +76,26 @@ impl TryFrom<pb_types::SstMeta> for FileMeta {
Ok(Self {
max_sequence: value.max_sequence,
num_rows: value.num_rows,
time_range: TimeRange {
start: time_range.start,
end: time_range.end,
},
size: value.size,
time_range: TimeRange::new(time_range.start.into(), time_range.end.into()),
})
}
}

impl From<FileMeta> for pb_types::SstMeta {
fn from(value: FileMeta) -> Self {
pb_types::SstMeta {
max_sequence: value.max_sequence,
num_rows: value.num_rows,
size: value.size,
time_range: Some(pb_types::TimeRange {
start: *value.time_range.start,
end: *value.time_range.end,
}),
}
}
}

// Used for sst file id allocation.
// This number mustn't go backwards on restarts, otherwise file id
// collisions are possible. So don't change time on the server
Expand Down
106 changes: 79 additions & 27 deletions horaedb/metric_engine/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@ use arrow::{
use async_trait::async_trait;
use datafusion::{
common::DFSchema,
execution::{
context::ExecutionProps, SendableRecordBatchStream as DFSendableRecordBatchStream,
datasource::{
listing::PartitionedFile,
physical_plan::{FileScanConfig, ParquetExec},
},
logical_expr::Expr,
execution::{context::ExecutionProps, object_store::ObjectStoreUrl, SendableRecordBatchStream},
logical_expr::{utils::conjunction, Expr},
physical_expr::{create_physical_expr, LexOrdering},
physical_plan::{execute_stream, memory::MemoryExec, sorts::sort::SortExec},
physical_planner::create_physical_sort_exprs,
prelude::{ident, SessionContext},
Expand All @@ -43,8 +46,9 @@ use parquet::{

use crate::{
manifest::Manifest,
read::DefaultParquetFileReaderFactory,
sst::{allocate_id, FileId, FileMeta},
types::{ObjectStoreRef, SendableRecordBatchStream, TimeRange, Timestamp},
types::{ObjectStoreRef, TimeRange, Timestamp, WriteResult},
Result,
};

Expand All @@ -55,7 +59,7 @@ pub struct WriteRequest {

pub struct ScanRequest {
range: TimeRange,
predicate: Expr,
predicate: Vec<Expr>,
/// `None` means all columns.
projections: Option<Vec<usize>>,
}
Expand Down Expand Up @@ -84,6 +88,8 @@ pub struct CloudObjectStorage {
num_primary_key: usize,
timestamp_index: usize,
manifest: Manifest,

df_schema: DFSchema,
}

/// It will organize the data in the following way:
Expand All @@ -107,13 +113,15 @@ impl CloudObjectStorage {
let manifest_prefix = crate::manifest::PREFIX_PATH;
let manifest =
Manifest::try_new(format!("{root_path}/{manifest_prefix}"), store.clone()).await?;
let df_schema = DFSchema::try_from(arrow_schema.clone()).context("build DFSchema")?;
Ok(Self {
path: root_path,
num_primary_key,
timestamp_index,
store,
arrow_schema,
manifest,
df_schema,
})
}

Expand All @@ -123,11 +131,11 @@ impl CloudObjectStorage {
format!("{root}/{prefix}/{id}")
}

async fn write_batch(&self, req: WriteRequest) -> Result<FileId> {
async fn write_batch(&self, req: WriteRequest) -> Result<WriteResult> {
let file_id = allocate_id();
let file_path = self.build_file_path(file_id);
let object_store_writer =
ParquetObjectWriter::new(self.store.clone(), Path::from(file_path));
let file_path = Path::from(file_path);
let object_store_writer = ParquetObjectWriter::new(self.store.clone(), file_path.clone());
let mut writer =
AsyncArrowWriter::try_new(object_store_writer, self.schema().clone(), req.props)
.context("create arrow writer")?;
Expand All @@ -139,27 +147,38 @@ impl CloudObjectStorage {
writer.write(&batch).await.context("write arrow batch")?;
}
writer.close().await.context("close arrow writer")?;

Ok(file_id)
let object_meta = self
.store
.head(&file_path)
.await
.context("get object meta")?;

Ok(WriteResult {
id: file_id,
size: object_meta.size,
})
}

async fn sort_batch(&self, batch: RecordBatch) -> Result<DFSendableRecordBatchStream> {
let ctx = SessionContext::default();
let schema = batch.schema();
let df_schema = DFSchema::try_from(schema.clone()).context("build DFSchema")?;

fn build_sort_exprs(&self) -> Result<LexOrdering> {
let sort_exprs = (0..self.num_primary_key)
.collect::<Vec<_>>()
.iter()
.map(|i| ident(schema.clone().field(*i).name()).sort(true, true))
.map(|i| ident(self.schema().field(*i).name()).sort(true, true))
.collect::<Vec<_>>();
let physical_sort_exprs =
create_physical_sort_exprs(&sort_exprs, &df_schema, &ExecutionProps::default())
let sort_exprs =
create_physical_sort_exprs(&sort_exprs, &self.df_schema, &ExecutionProps::default())
.context("create physical sort exprs")?;

Ok(sort_exprs)
}

async fn sort_batch(&self, batch: RecordBatch) -> Result<SendableRecordBatchStream> {
let ctx = SessionContext::default();
let schema = batch.schema();
let sort_exprs = self.build_sort_exprs()?;
let batch_plan =
MemoryExec::try_new(&[vec![batch]], schema, None).context("build batch plan")?;
let physical_plan = Arc::new(SortExec::new(physical_sort_exprs, Arc::new(batch_plan)));
let physical_plan = Arc::new(SortExec::new(sort_exprs, Arc::new(batch_plan)));

let res =
execute_stream(physical_plan, ctx.task_ctx()).context("execute sort physical plan")?;
Expand Down Expand Up @@ -187,17 +206,18 @@ impl TimeMergeStorage for CloudObjectStorage {
let mut start = Timestamp::MAX;
let mut end = Timestamp::MIN;
for v in time_column.values() {
start = start.min(*v);
end = end.max(*v);
start = start.min(Timestamp(*v));
end = end.max(Timestamp(*v));
}
let time_range = TimeRange {
start,
end: end + 1,
};
let file_id = self.write_batch(req).await?;
let time_range = TimeRange::new(start, end + 1);
let WriteResult {
id: file_id,
size: file_size,
} = self.write_batch(req).await?;
let file_meta = FileMeta {
max_sequence: file_id, // Since file_id in increasing order, we can use it as sequence.
num_rows: num_rows as u32,
size: file_size as u32,
time_range,
};
self.manifest.add_file(file_id, file_meta).await?;
Expand All @@ -206,7 +226,39 @@ impl TimeMergeStorage for CloudObjectStorage {
}

async fn scan(&self, req: ScanRequest) -> Result<SendableRecordBatchStream> {
todo!()
let ssts = self.manifest.find_ssts(&req.range).await;
// we won't use url for selecting object_store.
let dummy_url = ObjectStoreUrl::parse("empty://").unwrap();
// TODO: we could group ssts based on time range.
// TODO: fetch using multiple threads since read from parquet will incur CPU
// when convert between arrow and parquet.
let file_groups = ssts
.iter()
.map(|f| PartitionedFile::new(self.build_file_path(f.id), f.meta.size as u64))
.collect::<Vec<_>>();
let scan_config = FileScanConfig::new(dummy_url, self.schema().clone())
.with_file_group(file_groups)
.with_projection(req.projections);

let mut builder = ParquetExec::builder(scan_config).with_parquet_file_reader_factory(
Arc::new(DefaultParquetFileReaderFactory::new(self.store.clone())),
);
if let Some(expr) = conjunction(req.predicate) {
let filters = create_physical_expr(&expr, &self.df_schema, &ExecutionProps::new())
.context("create pyhsical expr")?;
builder = builder.with_predicate(filters);
}

let parquet_exec = builder.build();
let sort_exprs = self.build_sort_exprs()?;
let physical_plan = Arc::new(SortExec::new(sort_exprs, Arc::new(parquet_exec)));

let ctx = SessionContext::default();
// TODO: dedup record batch based on primary keys and sequence number.
let res =
execute_stream(physical_plan, ctx.task_ctx()).context("execute sort physical plan")?;

Ok(res)
}

async fn compact(&self, req: CompactRequest) -> Result<()> {
Expand Down
Loading

0 comments on commit e65c504

Please sign in to comment.