Skip to content

Commit 7e08ebf

Browse files
ion-elgrecortyler
authored andcommitted
cdc delete
1 parent ea8f068 commit 7e08ebf

File tree

2 files changed

+90
-12
lines changed

2 files changed

+90
-12
lines changed

crates/core/src/operations/delete.rs

+40-11
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use core::panic;
2121
use std::sync::Arc;
2222
use std::time::{Instant, SystemTime, UNIX_EPOCH};
2323

24+
use super::writer::DeltaWriter;
2425
use crate::logstore::LogStoreRef;
2526
use datafusion::execution::context::{SessionContext, SessionState};
2627
use datafusion::physical_plan::filter::FilterExec;
@@ -29,18 +30,21 @@ use datafusion::prelude::Expr;
2930
use datafusion_common::scalar::ScalarValue;
3031
use datafusion_common::DFSchema;
3132
use futures::future::BoxFuture;
33+
use object_store::prefix::PrefixStore;
3234
use parquet::file::properties::WriterProperties;
3335
use serde::Serialize;
3436

37+
use super::cdc::should_write_cdc;
3538
use super::datafusion_utils::Expression;
3639
use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL};
37-
use super::write::WriterStatsConfig;
40+
use super::write::{write_execution_plan_cdc, write_execution_plan_cdf, WriterStatsConfig};
41+
use super::writer::WriterConfig;
3842
use crate::delta_datafusion::expr::fmt_expr_to_sql;
3943
use crate::delta_datafusion::{
4044
find_files, register_store, DataFusionMixins, DeltaScanBuilder, DeltaSessionContext,
4145
};
4246
use crate::errors::DeltaResult;
43-
use crate::kernel::{Action, Add, Remove};
47+
use crate::kernel::{Action, Add, AddCDCFile, Remove};
4448
use crate::operations::write::write_execution_plan;
4549
use crate::protocol::DeltaOperation;
4650
use crate::table::state::DeltaTableState;
@@ -130,7 +134,7 @@ async fn excute_non_empty_expr(
130134
metrics: &mut DeleteMetrics,
131135
rewrite: &[Add],
132136
writer_properties: Option<WriterProperties>,
133-
) -> DeltaResult<Vec<Add>> {
137+
) -> DeltaResult<Vec<Action>> {
134138
// For each identified file perform a parquet scan + filter + limit (1) + count.
135139
// If returned count is not zero then append the file to be rewritten and removed from the log. Otherwise do nothing to the file.
136140

@@ -160,27 +164,53 @@ async fn excute_non_empty_expr(
160164
.map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>()),
161165
);
162166

163-
let add_actions = write_execution_plan(
167+
let mut actions: Vec<Action> = write_execution_plan(
164168
Some(snapshot),
165169
state.clone(),
166170
filter.clone(),
167171
table_partition_cols.clone(),
168172
log_store.object_store(),
169173
Some(snapshot.table_config().target_file_size() as usize),
170174
None,
171-
writer_properties,
175+
writer_properties.clone(),
172176
false,
173177
None,
174-
writer_stats_config,
178+
writer_stats_config.clone(),
175179
None,
176180
)
177181
.await?
178182
.into_iter()
179183
.map(|a| match a {
180184
Action::Add(a) => a,
181185
_ => panic!("Expected Add action"),
182-
})
183-
.collect::<Vec<Add>>();
186+
}).into_iter().map(Action::Add).collect();
187+
188+
// CDC logic, simply filters data with predicate and adds the _change_type="delete" as literal column
189+
match should_write_cdc(&snapshot) {
190+
Ok(true) => {
191+
let cdc_predicate_expr =
192+
state.create_physical_expr(expression.clone(), &input_dfschema)?;
193+
let cdc_scan: Arc<dyn ExecutionPlan> =
194+
Arc::new(FilterExec::try_new(cdc_predicate_expr, scan.clone())?);
195+
let cdc_actions = write_execution_plan_cdc(
196+
Some(snapshot),
197+
state.clone(),
198+
cdc_scan.clone(),
199+
table_partition_cols.clone(),
200+
log_store.object_store(),
201+
Some(snapshot.table_config().target_file_size() as usize),
202+
None,
203+
writer_properties,
204+
false,
205+
None,
206+
writer_stats_config,
207+
None,
208+
)
209+
.await?;
210+
actions.extend(cdc_actions)
211+
}
212+
_ => (),
213+
};
184214

185215
let read_records = scan.parquet_scan.metrics().and_then(|m| m.output_rows());
186216
let filter_records = filter.metrics().and_then(|m| m.output_rows());
@@ -189,7 +219,7 @@ async fn excute_non_empty_expr(
189219
.zip(filter_records)
190220
.map(|(read, filter)| read - filter);
191221

192-
Ok(add_actions)
222+
Ok(actions)
193223
}
194224

195225
async fn execute(
@@ -209,7 +239,7 @@ async fn execute(
209239

210240
let predicate = predicate.unwrap_or(Expr::Literal(ScalarValue::Boolean(Some(true))));
211241

212-
let add = if candidates.partition_scan {
242+
let mut actions = if candidates.partition_scan {
213243
Vec::new()
214244
} else {
215245
let write_start = Instant::now();
@@ -233,7 +263,6 @@ async fn execute(
233263
.unwrap()
234264
.as_millis() as i64;
235265

236-
let mut actions: Vec<Action> = add.into_iter().map(Action::Add).collect();
237266
metrics.num_removed_files = remove.len();
238267
metrics.num_added_files = actions.len();
239268

crates/core/src/operations/write.rs

+50-1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use datafusion_common::DFSchema;
4040
use datafusion_expr::Expr;
4141
use futures::future::BoxFuture;
4242
use futures::StreamExt;
43+
use object_store::prefix::PrefixStore;
4344
use parquet::file::properties::WriterProperties;
4445
use tracing::log::*;
4546

@@ -52,7 +53,7 @@ use crate::delta_datafusion::expr::parse_predicate_expression;
5253
use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder};
5354
use crate::delta_datafusion::{DataFusionMixins, DeltaDataChecker};
5455
use crate::errors::{DeltaResult, DeltaTableError};
55-
use crate::kernel::{Action, Add, Metadata, PartitionsExt, Remove, StructType};
56+
use crate::kernel::{Action, Add, AddCDCFile, Metadata, PartitionsExt, Remove, StructType};
5657
use crate::logstore::LogStoreRef;
5758
use crate::operations::cast::{cast_record_batch, merge_schema};
5859
use crate::protocol::{DeltaOperation, SaveMode};
@@ -461,6 +462,54 @@ async fn write_execution_plan_with_predicate(
461462
Ok(actions)
462463
}
463464

465+
pub(crate) async fn write_execution_plan_cdc(
466+
snapshot: Option<&DeltaTableState>,
467+
state: SessionState,
468+
plan: Arc<dyn ExecutionPlan>,
469+
partition_columns: Vec<String>,
470+
object_store: ObjectStoreRef,
471+
target_file_size: Option<usize>,
472+
write_batch_size: Option<usize>,
473+
writer_properties: Option<WriterProperties>,
474+
safe_cast: bool,
475+
schema_mode: Option<SchemaMode>,
476+
writer_stats_config: WriterStatsConfig,
477+
sender: Option<Sender<RecordBatch>>,
478+
) -> DeltaResult<Vec<Action>> {
479+
let cdc_store = Arc::new(PrefixStore::new(
480+
object_store,
481+
"_change_data",
482+
));
483+
Ok(write_execution_plan(
484+
snapshot,
485+
state,
486+
plan,
487+
partition_columns,
488+
cdc_store,
489+
target_file_size,
490+
write_batch_size,
491+
writer_properties,
492+
safe_cast,
493+
schema_mode,
494+
writer_stats_config,
495+
sender).await?.into_iter().map(|add| {
496+
// Modify add actions into CDC actions
497+
match add {
498+
Action::Add(add) => { Action::Cdc(AddCDCFile {
499+
// This is a gnarly hack, but the action needs the nested path, not the
500+
// path isnide the prefixed store
501+
path: format!("_change_data/{}", add.path),
502+
size: add.size,
503+
partition_values: add.partition_values,
504+
data_change: false,
505+
tags: add.tags,
506+
})},
507+
_ => panic!("Expected Add action"),
508+
}
509+
}).collect::<Vec<_>>())
510+
}
511+
512+
464513
#[allow(clippy::too_many_arguments)]
465514
pub(crate) async fn write_execution_plan(
466515
snapshot: Option<&DeltaTableState>,

0 commit comments

Comments
 (0)