-
Notifications
You must be signed in to change notification settings - Fork 3
POC: Merge extension #1022
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
POC: Merge extension #1022
Changes from all commits
a3f92ba
6d6b9f8
949f3b3
dcafec4
5d4707c
8c9b648
e7c5f76
4aa51e3
1b7386c
9f50bd1
2481dc8
94595bd
d963d8c
2858984
38766ab
7f4b203
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,21 +1,36 @@ | ||
use std::sync::Arc; | ||
|
||
use async_trait::async_trait; | ||
use datafusion::physical_planner::ExtensionPlanner; | ||
|
||
use super::{logical_plan::merge::MergeIntoSink, physical_plan::merge::MergeIntoSinkExec}; | ||
|
||
#[derive(Debug, Default)] | ||
pub struct CustomExtensionPlanner {} | ||
|
||
#[async_trait] | ||
impl ExtensionPlanner for CustomExtensionPlanner { | ||
async fn plan_extension( | ||
&self, | ||
_planner: &dyn datafusion::physical_planner::PhysicalPlanner, | ||
_node: &dyn datafusion_expr::UserDefinedLogicalNode, | ||
planner: &dyn datafusion::physical_planner::PhysicalPlanner, | ||
node: &dyn datafusion_expr::UserDefinedLogicalNode, | ||
_logical_inputs: &[&datafusion_expr::LogicalPlan], | ||
_physical_inputs: &[std::sync::Arc<dyn datafusion_physical_plan::ExecutionPlan>], | ||
_session_state: &datafusion::execution::SessionState, | ||
session_state: &datafusion::execution::SessionState, | ||
) -> datafusion_common::Result< | ||
Option<std::sync::Arc<dyn datafusion_physical_plan::ExecutionPlan>>, | ||
> { | ||
Ok(None) | ||
if let Some(merge) = node.as_any().downcast_ref::<MergeIntoSink>() { | ||
let input = planner | ||
.create_physical_plan(&merge.input, session_state) | ||
.await?; | ||
Ok(Some(Arc::new(MergeIntoSinkExec::new( | ||
merge.schema.clone(), | ||
input, | ||
merge.target.clone(), | ||
)))) | ||
} else { | ||
Ok(None) | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
use std::hash::Hasher; | ||
use std::sync::Arc; | ||
|
||
use datafusion::arrow::datatypes::{DataType, Field}; | ||
use datafusion::physical_expr_common::physical_expr::DynHash; | ||
use datafusion_common::{DFSchema, DFSchemaRef}; | ||
use datafusion_expr::{Expr, InvariantLevel, LogicalPlan, UserDefinedLogicalNode}; | ||
use datafusion_iceberg::DataFusionTable; | ||
|
||
#[derive(Debug, Clone)] | ||
// The MergeIntoSink performs the final writing step of the "MERGE INTO" statement. It assumes | ||
// that the typical Join of the target and source tables is already performed and that the Recordbatches | ||
// have a particular form. As such the Recordbatches must contain a "__target_exists" and | ||
// "__source_exists" column that indicate whether the given row exists in the target and source | ||
// tables respectivaly. Additionally, it requires a "__data_file_path" column to keep track of which files to overwrite. | ||
// It then writes the resulting data to parquet files and updates the target | ||
// table accordingly. | ||
pub struct MergeIntoSink { | ||
pub input: Arc<LogicalPlan>, | ||
pub target: DataFusionTable, | ||
pub schema: DFSchemaRef, | ||
} | ||
|
||
impl MergeIntoSink { | ||
pub fn new( | ||
input: Arc<LogicalPlan>, | ||
target: DataFusionTable, | ||
) -> datafusion_common::Result<Self> { | ||
let field = Field::new("number of rows updated", DataType::Int64, false); | ||
let schema = DFSchema::new_with_metadata( | ||
vec![(None, Arc::new(field))], | ||
std::collections::HashMap::new(), | ||
)?; | ||
|
||
Ok(Self { | ||
input, | ||
target, | ||
schema: Arc::new(schema), | ||
}) | ||
} | ||
} | ||
|
||
impl UserDefinedLogicalNode for MergeIntoSink { | ||
fn as_any(&self) -> &dyn std::any::Any { | ||
self | ||
} | ||
|
||
fn name(&self) -> &'static str { | ||
"MergeIntoSink" | ||
} | ||
|
||
fn inputs(&self) -> Vec<&LogicalPlan> { | ||
vec![&self.input] | ||
} | ||
|
||
fn schema(&self) -> &datafusion_common::DFSchemaRef { | ||
&self.schema | ||
} | ||
|
||
fn check_invariants( | ||
&self, | ||
_check: InvariantLevel, | ||
_plan: &LogicalPlan, | ||
) -> datafusion_common::Result<()> { | ||
Ok(()) | ||
} | ||
|
||
fn expressions(&self) -> Vec<Expr> { | ||
vec![] | ||
} | ||
|
||
fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { | ||
write!(f, "MergeIntoSink") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we can specify the catalog, schema, and table here |
||
} | ||
|
||
fn with_exprs_and_inputs( | ||
&self, | ||
_exprs: Vec<Expr>, | ||
inputs: Vec<LogicalPlan>, | ||
) -> datafusion_common::Result<Arc<dyn UserDefinedLogicalNode>> { | ||
if inputs.len() != 1 { | ||
return Err(datafusion_common::DataFusionError::Internal( | ||
"MergeIntoSink requires exactly one input".to_string(), | ||
)); | ||
} | ||
|
||
Ok(Arc::new(Self { | ||
input: Arc::new(inputs.into_iter().next().unwrap()), | ||
target: self.target.clone(), | ||
schema: self.schema.clone(), | ||
})) | ||
} | ||
|
||
fn dyn_hash(&self, state: &mut dyn Hasher) { | ||
"MergeIntoSink".dyn_hash(state); | ||
self.input.dyn_hash(state); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should hash target as well, if two sinks point at different tables can lead to errors |
||
} | ||
|
||
fn dyn_eq(&self, other: &dyn UserDefinedLogicalNode) -> bool { | ||
if let Some(other) = other.as_any().downcast_ref::<Self>() { | ||
self.input == other.input && self.schema == other.schema | ||
} else { | ||
false | ||
} | ||
} | ||
|
||
fn dyn_ord(&self, _other: &dyn UserDefinedLogicalNode) -> Option<std::cmp::Ordering> { | ||
None | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
pub mod merge; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jonathanc-n The target could also be an
Arc<dyn TableProvoder>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can also pass the target as LogicalPlan