Skip to content

Commit

Permalink
feat: initial MergeExec
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 committed Nov 19, 2024
1 parent af1d5e0 commit 1ae3b60
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 13 deletions.
196 changes: 184 additions & 12 deletions horaedb/metric_engine/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,31 @@
// specific language governing permissions and limitations
// under the License.

use std::{any::Any, sync::Arc};
use std::{
any::Any,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};

use arrow::{
array::{AsArray, BinaryArray, PrimitiveArray, RecordBatch},
compute::concat_batches,
datatypes::{GenericBinaryType, Int8Type, UInt64Type, UInt8Type},
};
use arrow_schema::SchemaRef;
use datafusion::{
common::internal_err,
datasource::physical_plan::{FileMeta, ParquetFileReaderFactory},
error::Result as DfResult,
execution::{SendableRecordBatchStream, TaskContext},
error::{DataFusionError, Result as DfResult},
execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext},
logical_expr::AggregateUDFImpl,
parquet::arrow::async_reader::AsyncFileReader,
physical_plan::{metrics::ExecutionPlanMetricsSet, DisplayAs, ExecutionPlan, PlanProperties},
physical_plan::{
metrics::ExecutionPlanMetricsSet, DisplayAs, Distribution, ExecutionPlan, PlanProperties,
},
};
use futures::{ready, Stream, StreamExt};
use parquet::arrow::async_reader::ParquetObjectReader;

use crate::types::ObjectStoreRef;
Expand Down Expand Up @@ -64,21 +79,46 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
/// column.
#[derive(Debug)]
struct MergeExec {
/// Input plan
input: Arc<dyn ExecutionPlan>,
/// (0..num_primary_keys) are primary key columns
num_primary_keys: usize,
/// Sequence column index
seq_idx: usize,
// (idx, merge_op)
values: Vec<(usize, Arc<dyn AggregateUDFImpl>)>,
/// Input plan
input: Arc<dyn ExecutionPlan>,
value_idx: usize,
value_op: Arc<dyn AggregateUDFImpl>,
}

impl MergeExec {
fn new(
input: Arc<dyn ExecutionPlan>,
num_primary_keys: usize,
seq_idx: usize,
value_idx: usize,
value_op: Arc<dyn AggregateUDFImpl>,
) -> Self {
Self {
input,
num_primary_keys,
seq_idx,
value_idx,
value_op,
}
}
}
impl DisplayAs for MergeExec {
fn fmt_as(
&self,
t: datafusion::physical_plan::DisplayFormatType,
_t: datafusion::physical_plan::DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
todo!()
write!(
f,
"MergeExec: [primary_keys: {}, seq_idx: {}]",
self.num_primary_keys, self.seq_idx
)?;
Ok(())
}
}

Expand All @@ -92,29 +132,161 @@ impl ExecutionPlan for MergeExec {
}

fn properties(&self) -> &PlanProperties {
todo!()
self.input.properties()
}

fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::SinglePartition; self.children().len()]
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}

fn maintains_input_order(&self) -> Vec<bool> {
vec![true]
vec![true; self.children().len()]
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> DfResult<Arc<dyn ExecutionPlan>> {
todo!()
Ok(Arc::new(MergeExec::new(
Arc::clone(&children[0]),
self.num_primary_keys,
self.seq_idx,
self.value_idx,
self.value_op.clone(),
)))
}

fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> DfResult<SendableRecordBatchStream> {
if 0 != partition {
return internal_err!("MergeExec invalid partition {partition}");
}

Ok(Box::pin(MergeStream::new(
self.input.execute(partition, context)?,
self.num_primary_keys,
self.seq_idx,
self.value_idx,
self.value_op.clone(),
)))
}
}

struct MergeStream {
stream: SendableRecordBatchStream,
num_primary_keys: usize,
seq_idx: usize,
value_idx: usize,
value_op: Arc<dyn AggregateUDFImpl>,

pending_batch: Option<RecordBatch>,
}

impl MergeStream {
fn new(
stream: SendableRecordBatchStream,
num_primary_keys: usize,
seq_idx: usize,
value_idx: usize,
value_op: Arc<dyn AggregateUDFImpl>,
) -> Self {
Self {
stream,
num_primary_keys,
seq_idx,
value_idx,
value_op,
pending_batch: None,
}
}

fn primary_key_eq2(
&self,
lhs: &RecordBatch,
lhs_idx: usize,
rhs: &RecordBatch,
rhs_idx: usize,
) -> bool {
for k in 0..self.num_primary_keys {
let lhs_col = lhs.column(k);
let rhs_col = rhs.column(k);
if let Some(lhs_col) = lhs_col.as_primitive_opt::<UInt8Type>() {
let rhs_col = rhs_col.as_primitive::<UInt8Type>();
if !lhs_col.value(lhs_idx).eq(&rhs_col.value(rhs_idx)) {
return false;
}
} else if let Some(lhs_col) = lhs_col.as_primitive_opt::<UInt64Type>() {
let rhs_col = rhs_col.as_primitive::<UInt64Type>();
if !lhs_col.value(lhs_idx).eq(&rhs_col.value(rhs_idx)) {
return false;
}
} else if let Some(lhs_col) = lhs_col.as_bytes_opt::<GenericBinaryType<i32>>() {
let rhs_col = rhs_col.as_bytes::<GenericBinaryType<i32>>();
if !rhs_col.value(rhs_idx).eq(lhs_col.value(lhs_idx)) {
return false;
}
} else {
unreachable!("unsupported column type: {:?}", lhs_col.data_type())
}
}

true
}

fn primary_key_eq(&self, batch: &RecordBatch, i: usize, j: usize) -> bool {
self.primary_key_eq2(batch, i, batch, j)
}

// TODO: only support deduplication now, merge operation will be added later.
fn merge_batch(&mut self, batch: RecordBatch) -> DfResult<RecordBatch> {
let mut row_idx = 0;
let mut batches = vec![];
while row_idx < batch.num_rows() {
let mut cursor = row_idx + 1;
while self.primary_key_eq(&batch, row_idx, cursor) {
cursor += 1;
}

let same_pk_batch = batch.slice(row_idx, cursor - row_idx);
if let Some(pending) = self.pending_batch.take() {
if !self.primary_key_eq2(&pending, pending.num_rows() - 1, &same_pk_batch, 0) {
// only keep the last row in this batch
batches.push(pending.slice(pending.num_rows() - 1, 1));
}
}
batches.push(same_pk_batch.slice(same_pk_batch.num_rows() - 1, 1));

row_idx = cursor;
}
self.pending_batch = batches.pop();

concat_batches(&self.stream.schema(), batches.iter().map(|v| v))
.map_err(|e| DataFusionError::ArrowError(e, None))
}
}

impl Stream for MergeStream {
type Item = DfResult<RecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
Poll::Ready(ready!(self.stream.poll_next_unpin(ctx)).map(|r| {
r.and_then(|batch| {
let batch = self.merge_batch(batch)?;
Ok(batch)
})
}))
}
}

impl RecordBatchStream for MergeStream {
fn schema(&self) -> SchemaRef {
todo!()
}
}
2 changes: 1 addition & 1 deletion horaedb/metric_engine/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ mod tests {
}

#[tokio::test]
#[ignore = "Depend on MergeExec"]
// #[ignore = "Depend on MergeExec"]
async fn test_storage_write_and_scan() {
let schema = arrow_schema!(("pk1", UInt8), ("pk2", UInt8), ("value", Int64));
let root_dir = temp_dir::TempDir::new().unwrap();
Expand Down

0 comments on commit 1ae3b60

Please sign in to comment.