Skip to content

Commit bb97ae9

Browse files
ion-elgrecortyler
authored andcommitted
add _change_type projection
1 parent 4ea2e7a commit bb97ae9

File tree

1 file changed

+41
-10
lines changed

1 file changed

+41
-10
lines changed

crates/core/src/operations/delete.rs

+41-10
Original file line numberDiff line numberDiff line change
@@ -17,34 +17,39 @@
1717
//! .await?;
1818
//! ````
1919
20-
use core::panic;
21-
use std::sync::Arc;
22-
use std::time::{Instant, SystemTime, UNIX_EPOCH};
23-
24-
use super::writer::DeltaWriter;
2520
use crate::logstore::LogStoreRef;
21+
use core::panic;
2622
use datafusion::execution::context::{SessionContext, SessionState};
2723
use datafusion::physical_plan::filter::FilterExec;
24+
use datafusion::physical_plan::projection::ProjectionExec;
2825
use datafusion::physical_plan::ExecutionPlan;
2926
use datafusion::prelude::Expr;
3027
use datafusion_common::scalar::ScalarValue;
3128
use datafusion_common::DFSchema;
29+
use datafusion_expr::lit;
30+
use datafusion_physical_expr::{
31+
expressions::{self},
32+
PhysicalExpr,
33+
};
3234
use futures::future::BoxFuture;
33-
use object_store::prefix::PrefixStore;
35+
use std::iter;
36+
use std::sync::Arc;
37+
use std::time::{Instant, SystemTime, UNIX_EPOCH};
38+
3439
use parquet::file::properties::WriterProperties;
3540
use serde::Serialize;
3641

3742
use super::cdc::should_write_cdc;
3843
use super::datafusion_utils::Expression;
3944
use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL};
40-
use super::write::{write_execution_plan_cdc, write_execution_plan_cdf, WriterStatsConfig};
41-
use super::writer::WriterConfig;
45+
use super::write::{write_execution_plan_cdc, WriterStatsConfig};
46+
4247
use crate::delta_datafusion::expr::fmt_expr_to_sql;
4348
use crate::delta_datafusion::{
4449
find_files, register_store, DataFusionMixins, DeltaScanBuilder, DeltaSessionContext,
4550
};
4651
use crate::errors::DeltaResult;
47-
use crate::kernel::{Action, Add, AddCDCFile, Remove};
52+
use crate::kernel::{Action, Add, Remove};
4853
use crate::operations::write::write_execution_plan;
4954
use crate::protocol::DeltaOperation;
5055
use crate::table::state::DeltaTableState;
@@ -191,14 +196,40 @@ async fn excute_non_empty_expr(
191196
// CDC logic, simply filters data with predicate and adds the _change_type="delete" as literal column
192197
match should_write_cdc(&snapshot) {
193198
Ok(true) => {
199+
// Create CDC scan
194200
let cdc_predicate_expr =
195201
state.create_physical_expr(expression.clone(), &input_dfschema)?;
196202
let cdc_scan: Arc<dyn ExecutionPlan> =
197203
Arc::new(FilterExec::try_new(cdc_predicate_expr, scan.clone())?);
204+
205+
// Add literal column "_change_type"
206+
let change_type_lit = lit(ScalarValue::Utf8(Some("delete".to_string())));
207+
let change_type_expr = state.create_physical_expr(change_type_lit, &input_dfschema)?;
208+
209+
// Project columns and lit
210+
let project_expressions: Vec<(Arc<dyn PhysicalExpr>, String)> = scan
211+
.schema()
212+
.fields()
213+
.into_iter()
214+
.enumerate()
215+
.map(|(idx, field)| -> (Arc<dyn PhysicalExpr>, String) {
216+
(
217+
Arc::new(expressions::Column::new(field.name(), idx)),
218+
field.name().to_owned(),
219+
)
220+
})
221+
.chain(iter::once((change_type_expr, "_change_type".to_owned())))
222+
.collect();
223+
224+
let projected_scan: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
225+
project_expressions,
226+
cdc_scan.clone(),
227+
)?);
228+
198229
let cdc_actions = write_execution_plan_cdc(
199230
Some(snapshot),
200231
state.clone(),
201-
cdc_scan.clone(),
232+
projected_scan.clone(),
202233
table_partition_cols.clone(),
203234
log_store.object_store(),
204235
Some(snapshot.table_config().target_file_size() as usize),

0 commit comments

Comments
 (0)