Skip to content

POC: Merge extension #1022

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
Draft
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/core-executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ tokio-stream = "0.1.17"
url = { workspace = true }
uuid = { workspace = true }
indexmap = { workspace = true }
pin-project-lite = "0.2.16"

[dev-dependencies]
bytes = { workspace = true }
Expand Down
23 changes: 19 additions & 4 deletions crates/core-executor/src/datafusion/extension_planner.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,36 @@
use std::sync::Arc;

use async_trait::async_trait;
use datafusion::physical_planner::ExtensionPlanner;

use super::{logical_plan::merge::MergeIntoSink, physical_plan::merge::MergeIntoSinkExec};

#[derive(Debug, Default)]
pub struct CustomExtensionPlanner {}

#[async_trait]
impl ExtensionPlanner for CustomExtensionPlanner {
async fn plan_extension(
&self,
_planner: &dyn datafusion::physical_planner::PhysicalPlanner,
_node: &dyn datafusion_expr::UserDefinedLogicalNode,
planner: &dyn datafusion::physical_planner::PhysicalPlanner,
node: &dyn datafusion_expr::UserDefinedLogicalNode,
_logical_inputs: &[&datafusion_expr::LogicalPlan],
_physical_inputs: &[std::sync::Arc<dyn datafusion_physical_plan::ExecutionPlan>],
_session_state: &datafusion::execution::SessionState,
session_state: &datafusion::execution::SessionState,
) -> datafusion_common::Result<
Option<std::sync::Arc<dyn datafusion_physical_plan::ExecutionPlan>>,
> {
Ok(None)
if let Some(merge) = node.as_any().downcast_ref::<MergeIntoSink>() {
let input = planner
.create_physical_plan(&merge.input, session_state)
.await?;
Ok(Some(Arc::new(MergeIntoSinkExec::new(
merge.schema.clone(),
input,
merge.target.clone(),
))))
} else {
Ok(None)
}
}
}
110 changes: 110 additions & 0 deletions crates/core-executor/src/datafusion/logical_plan/merge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use std::hash::Hasher;
use std::sync::Arc;

use datafusion::arrow::datatypes::{DataType, Field};
use datafusion::physical_expr_common::physical_expr::DynHash;
use datafusion_common::{DFSchema, DFSchemaRef};
use datafusion_expr::{Expr, InvariantLevel, LogicalPlan, UserDefinedLogicalNode};
use datafusion_iceberg::DataFusionTable;

#[derive(Debug, Clone)]
// The MergeIntoSink performs the final writing step of the "MERGE INTO" statement. It assumes
// that the typical Join of the target and source tables is already performed and that the Recordbatches
// have a particular form. As such the Recordbatches must contain a "__target_exists" and
// "__source_exists" column that indicate whether the given row exists in the target and source
// tables respectivaly. Additionally, it requires a "__data_file_path" column to keep track of which files to overwrite.
// It then writes the resulting data to parquet files and updates the target
// table accordingly.
pub struct MergeIntoSink {
pub input: Arc<LogicalPlan>,
pub target: DataFusionTable,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jonathanc-n The target could also be an Arc<dyn TableProvoder>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also pass the target as LogicalPlan

pub schema: DFSchemaRef,
}

impl MergeIntoSink {
pub fn new(
input: Arc<LogicalPlan>,
target: DataFusionTable,
) -> datafusion_common::Result<Self> {
let field = Field::new("number of rows updated", DataType::Int64, false);
let schema = DFSchema::new_with_metadata(
vec![(None, Arc::new(field))],
std::collections::HashMap::new(),
)?;

Ok(Self {
input,
target,
schema: Arc::new(schema),
})
}
}

impl UserDefinedLogicalNode for MergeIntoSink {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn name(&self) -> &'static str {
"MergeIntoSink"
}

fn inputs(&self) -> Vec<&LogicalPlan> {
vec![&self.input]
}

fn schema(&self) -> &datafusion_common::DFSchemaRef {
&self.schema
}

fn check_invariants(
&self,
_check: InvariantLevel,
_plan: &LogicalPlan,
) -> datafusion_common::Result<()> {
Ok(())
}

fn expressions(&self) -> Vec<Expr> {
vec![]
}

fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "MergeIntoSink")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can specify the catalog, schema, and table here

}

fn with_exprs_and_inputs(
&self,
_exprs: Vec<Expr>,
inputs: Vec<LogicalPlan>,
) -> datafusion_common::Result<Arc<dyn UserDefinedLogicalNode>> {
if inputs.len() != 1 {
return Err(datafusion_common::DataFusionError::Internal(
"MergeIntoSink requires exactly one input".to_string(),
));
}

Ok(Arc::new(Self {
input: Arc::new(inputs.into_iter().next().unwrap()),
target: self.target.clone(),
schema: self.schema.clone(),
}))
}

fn dyn_hash(&self, state: &mut dyn Hasher) {
"MergeIntoSink".dyn_hash(state);
self.input.dyn_hash(state);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should hash target as well, if two sinks point at different tables can lead to errors

}

fn dyn_eq(&self, other: &dyn UserDefinedLogicalNode) -> bool {
if let Some(other) = other.as_any().downcast_ref::<Self>() {
self.input == other.input && self.schema == other.schema
} else {
false
}
}

fn dyn_ord(&self, _other: &dyn UserDefinedLogicalNode) -> Option<std::cmp::Ordering> {
None
}
}
1 change: 1 addition & 0 deletions crates/core-executor/src/datafusion/logical_plan/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod merge;
2 changes: 2 additions & 0 deletions crates/core-executor/src/datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
pub mod analyzer;
pub mod error;
pub mod extension_planner;
pub mod logical_plan;
pub mod physical_optimizer;
pub mod physical_plan;
pub mod planner;
pub mod query_planner;
pub mod rewriters;
Expand Down
Loading
Loading