Skip to content

Commit

Permalink
Convert merge op use logical datafusion ops
Browse files Browse the repository at this point in the history
  • Loading branch information
Blajda committed Oct 12, 2023
1 parent 8662e60 commit d0d0e63
Show file tree
Hide file tree
Showing 10 changed files with 400 additions and 293 deletions.
32 changes: 16 additions & 16 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,23 @@ debug = "line-tables-only"

[workspace.dependencies]
# arrow
arrow = { version = "46" }
arrow-array = { version = "46" }
arrow-buffer = { version = "46" }
arrow-cast = { version = "46" }
arrow-ord = { version = "46" }
arrow-row = { version = "46" }
arrow-schema = { version = "46" }
arrow-select = { version = "46" }
parquet = { version = "46" }
arrow = { version = "47" }
arrow-array = { version = "47" }
arrow-buffer = { version = "47" }
arrow-cast = { version = "47" }
arrow-ord = { version = "47" }
arrow-row = { version = "47" }
arrow-schema = { version = "47" }
arrow-select = { version = "47" }
parquet = { version = "47" }

# datafusion
datafusion = { version = "31" }
datafusion-expr = { version = "31" }
datafusion-common = { version = "31" }
datafusion-proto = { version = "31" }
datafusion-sql = { version = "31" }
datafusion-physical-expr = { version = "31" }
datafusion = { git="https://github.com/apache/arrow-datafusion" }
datafusion-expr = { git="https://github.com/apache/arrow-datafusion" }
datafusion-common = { git="https://github.com/apache/arrow-datafusion" }
datafusion-proto = { git="https://github.com/apache/arrow-datafusion" }
datafusion-sql = { git="https://github.com/apache/arrow-datafusion" }
datafusion-physical-expr = { git="https://github.com/apache/arrow-datafusion" }

# serde
serde = { version = "1", features = ["derive"] }
Expand All @@ -46,7 +46,7 @@ url = { version = "2" }
uuid = { version = "1" }

# runtime / async
async-trait = { version = "0.1" }
async-trait = { version = "0.1.73" }
futures = { version = "0.3" }
tokio = { version = "1" }
num_cpus = { version = "1" }
4 changes: 2 additions & 2 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ reqwest = { version = "0.11.18", default-features = false, features = [
# Datafusion
dashmap = { version = "5", optional = true }

sqlparser = { version = "0.37", optional = true }
sqlparser = { version = "0.38", optional = true }

# NOTE dependencies only for integration tests
fs_extra = { version = "1.3.0", optional = true }
Expand Down Expand Up @@ -139,7 +139,7 @@ arrow = [
"arrow-select",
"arrow-buffer",
]
default = ["arrow", "parquet"]
default = ["arrow", "parquet", "datafusion"]
datafusion = [
"dep:datafusion",
"datafusion-expr",
Expand Down
44 changes: 44 additions & 0 deletions rust/src/delta_datafusion/logical.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
//! Logical Operations for DataFusion
use datafusion_expr::{LogicalPlan, UserDefinedLogicalNodeCore};
pub const METRIC_OBSERVER_NAME: &str = "MetricObserver";

#[derive(Debug, Hash, Eq, PartialEq)]
pub(crate) struct MetricObserver {
// This acts as an anchor when converting a to physical operator
pub anchor: String,
pub input: LogicalPlan,
}

impl UserDefinedLogicalNodeCore for MetricObserver {
fn name(&self) -> &str {
METRIC_OBSERVER_NAME
}

fn inputs(&self) -> Vec<&datafusion_expr::LogicalPlan> {
vec![&self.input]
}

fn schema(&self) -> &datafusion_common::DFSchemaRef {
self.input.schema()
}

fn expressions(&self) -> Vec<datafusion_expr::Expr> {
vec![]
}

fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "MetricObserver name={}", &self.anchor)
}

fn from_template(
&self,
_exprs: &[datafusion_expr::Expr],
inputs: &[datafusion_expr::LogicalPlan],
) -> Self {
MetricObserver {
anchor: self.anchor.clone(),
input: inputs[0].clone(),
}
}
}
2 changes: 2 additions & 0 deletions rust/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ use crate::{open_table, open_table_with_storage_options, DeltaTable, Invariant,
const PATH_COLUMN: &str = "__delta_rs_path";

pub mod expr;
pub mod logical;
pub mod physical;

impl From<DeltaTableError> for DataFusionError {
fn from(err: DeltaTableError) -> Self {
Expand Down
145 changes: 145 additions & 0 deletions rust/src/delta_datafusion/physical.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
//! Physical Operations for DataFusion
use std::sync::Arc;

use arrow_schema::SchemaRef;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::Result as DataFusionResult;
use datafusion::physical_plan::DisplayAs;
use datafusion::physical_plan::{
metrics::{ExecutionPlanMetricsSet, MetricsSet},
ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
};
use futures::{Stream, StreamExt};

pub(crate) type MetricObserverFunction = fn(&RecordBatch, &ExecutionPlanMetricsSet) -> ();

pub(crate) struct MetricObserverExec {
parent: Arc<dyn ExecutionPlan>,
anchor: String,
metrics: ExecutionPlanMetricsSet,
update: MetricObserverFunction,
}

impl MetricObserverExec {
pub fn new(anchor: String, parent: Arc<dyn ExecutionPlan>, f: MetricObserverFunction) -> Self {
MetricObserverExec {
parent,
anchor,
metrics: ExecutionPlanMetricsSet::new(),
update: f,
}
}

pub fn anchor(&self) -> &str {
&self.anchor
}
}

impl std::fmt::Debug for MetricObserverExec {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MetricObserverExec")
.field("anchor", &self.anchor)
.field("metrics", &self.metrics)
.finish()
}
}

impl DisplayAs for MetricObserverExec {
fn fmt_as(
&self,
_: datafusion::physical_plan::DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
write!(f, "MetricObserverExec anchor={}", self.anchor)
}
}

impl ExecutionPlan for MetricObserverExec {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn schema(&self) -> arrow_schema::SchemaRef {
self.parent.schema()
}

fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning {
self.parent.output_partitioning()
}

fn output_ordering(&self) -> Option<&[datafusion_physical_expr::PhysicalSortExpr]> {
self.parent.output_ordering()
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.parent.clone()]
}

fn execute(
&self,
partition: usize,
context: Arc<datafusion::execution::context::TaskContext>,
) -> datafusion_common::Result<datafusion::physical_plan::SendableRecordBatchStream> {
let res = self.parent.execute(partition, context)?;
Ok(Box::pin(MetricObserverStream {
schema: self.schema(),
input: res,
metrics: self.metrics.clone(),
update: self.update,
}))
}

fn statistics(&self) -> datafusion_common::Statistics {
self.parent.statistics()
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
//TODO: Error on multiple children
Ok(Arc::new(MetricObserverExec::new(
self.anchor.clone(),
children.get(0).unwrap().clone(),
self.update,
)))
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
}

struct MetricObserverStream {
schema: SchemaRef,
input: SendableRecordBatchStream,
metrics: ExecutionPlanMetricsSet,
update: MetricObserverFunction,
}

impl Stream for MetricObserverStream {
type Item = DataFusionResult<RecordBatch>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.input.poll_next_unpin(cx).map(|x| match x {
Some(Ok(batch)) => {
(self.update)(&batch, &self.metrics);
Some(Ok(batch))
}
other => other,
})
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.input.size_hint()
}
}

impl RecordBatchStream for MetricObserverStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
Loading

0 comments on commit d0d0e63

Please sign in to comment.