Skip to content

Commit a1559d4

Browse files
committed
feat: custom execution handler
Signed-off-by: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com>
1 parent 6010edc commit a1559d4

26 files changed

+745
-137
lines changed

crates/core/src/operations/add_column.rs

+27-3
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
//! Add a new column to a table
22
3+
use std::sync::Arc;
4+
35
use delta_kernel::schema::StructType;
46
use futures::future::BoxFuture;
57
use itertools::Itertools;
68

79
use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL};
10+
use super::{CustomExecuteHandler, Operation};
811
use crate::kernel::StructField;
912
use crate::logstore::LogStoreRef;
1013
use crate::operations::cast::merge_schema::merge_delta_struct;
@@ -22,9 +25,17 @@ pub struct AddColumnBuilder {
2225
log_store: LogStoreRef,
2326
/// Additional information to add to the commit
2427
commit_properties: CommitProperties,
28+
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
2529
}
2630

27-
impl super::Operation<()> for AddColumnBuilder {}
31+
impl super::Operation<()> for AddColumnBuilder {
32+
fn log_store(&self) -> &LogStoreRef {
33+
&self.log_store
34+
}
35+
fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
36+
self.custom_execute_handler.clone()
37+
}
38+
}
2839

2940
impl AddColumnBuilder {
3041
/// Create a new builder
@@ -34,6 +45,7 @@ impl AddColumnBuilder {
3445
log_store,
3546
fields: None,
3647
commit_properties: CommitProperties::default(),
48+
custom_execute_handler: None,
3749
}
3850
}
3951

@@ -47,6 +59,12 @@ impl AddColumnBuilder {
4759
self.commit_properties = commit_properties;
4860
self
4961
}
62+
63+
/// Set a custom execute handler, for pre and post execution
64+
pub fn with_custom_execute_handler(mut self, handler: Arc<dyn CustomExecuteHandler>) -> Self {
65+
self.custom_execute_handler = Some(handler);
66+
self
67+
}
5068
}
5169

5270
impl std::future::IntoFuture for AddColumnBuilder {
@@ -59,10 +77,12 @@ impl std::future::IntoFuture for AddColumnBuilder {
5977

6078
Box::pin(async move {
6179
let mut metadata = this.snapshot.metadata().clone();
62-
let fields = match this.fields {
80+
let fields = match this.fields.clone() {
6381
Some(v) => v,
6482
None => return Err(DeltaTableError::Generic("No fields provided".to_string())),
6583
};
84+
let operation_id = this.get_operation_id();
85+
this.pre_execute(operation_id).await?;
6686

6787
let fields_right = &StructType::new(fields.clone());
6888
let table_schema = this.snapshot.schema();
@@ -99,11 +119,15 @@ impl std::future::IntoFuture for AddColumnBuilder {
99119
actions.push(new_protocol.into())
100120
}
101121

102-
let commit = CommitBuilder::from(this.commit_properties)
122+
let commit = CommitBuilder::from(this.commit_properties.clone())
103123
.with_actions(actions)
124+
.with_operation_id(operation_id)
125+
.with_post_commit_hook_handler(this.get_custom_execute_handler())
104126
.build(Some(&this.snapshot), this.log_store.clone(), operation)
105127
.await?;
106128

129+
this.post_execute(operation_id).await?;
130+
107131
Ok(DeltaTable::new_with_state(
108132
this.log_store,
109133
commit.snapshot(),

crates/core/src/operations/add_feature.rs

+31-4
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
//! Enable table features
22
3+
use std::sync::Arc;
4+
35
use futures::future::BoxFuture;
46
use itertools::Itertools;
57

68
use super::transaction::{CommitBuilder, CommitProperties};
9+
use super::{CustomExecuteHandler, Operation};
710
use crate::kernel::{ReaderFeatures, TableFeatures, WriterFeatures};
811
use crate::logstore::LogStoreRef;
912
use crate::protocol::DeltaOperation;
@@ -23,9 +26,17 @@ pub struct AddTableFeatureBuilder {
2326
log_store: LogStoreRef,
2427
/// Additional information to add to the commit
2528
commit_properties: CommitProperties,
29+
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
2630
}
2731

28-
impl super::Operation<()> for AddTableFeatureBuilder {}
32+
impl super::Operation<()> for AddTableFeatureBuilder {
33+
fn log_store(&self) -> &LogStoreRef {
34+
&self.log_store
35+
}
36+
fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
37+
self.custom_execute_handler.clone()
38+
}
39+
}
2940

3041
impl AddTableFeatureBuilder {
3142
/// Create a new builder
@@ -36,6 +47,7 @@ impl AddTableFeatureBuilder {
3647
snapshot,
3748
log_store,
3849
commit_properties: CommitProperties::default(),
50+
custom_execute_handler: None,
3951
}
4052
}
4153

@@ -63,6 +75,12 @@ impl AddTableFeatureBuilder {
6375
self.commit_properties = commit_properties;
6476
self
6577
}
78+
79+
/// Set a custom execute handler, for pre and post execution
80+
pub fn with_custom_execute_handler(mut self, handler: Arc<dyn CustomExecuteHandler>) -> Self {
81+
self.custom_execute_handler = Some(handler);
82+
self
83+
}
6684
}
6785

6886
impl std::future::IntoFuture for AddTableFeatureBuilder {
@@ -77,8 +95,11 @@ impl std::future::IntoFuture for AddTableFeatureBuilder {
7795
let name = if this.name.is_empty() {
7896
return Err(DeltaTableError::Generic("No features provided".to_string()));
7997
} else {
80-
this.name
98+
&this.name
8199
};
100+
let operation_id = this.get_operation_id();
101+
this.pre_execute(operation_id).await?;
102+
82103
let (reader_features, writer_features): (
83104
Vec<Option<ReaderFeatures>>,
84105
Vec<Option<WriterFeatures>>,
@@ -104,15 +125,21 @@ impl std::future::IntoFuture for AddTableFeatureBuilder {
104125
protocol = protocol.with_reader_features(reader_features);
105126
protocol = protocol.with_writer_features(writer_features);
106127

107-
let operation = DeltaOperation::AddFeature { name };
128+
let operation = DeltaOperation::AddFeature {
129+
name: name.to_vec(),
130+
};
108131

109132
let actions = vec![protocol.into()];
110133

111-
let commit = CommitBuilder::from(this.commit_properties)
134+
let commit = CommitBuilder::from(this.commit_properties.clone())
112135
.with_actions(actions)
136+
.with_operation_id(operation_id)
137+
.with_post_commit_hook_handler(this.get_custom_execute_handler())
113138
.build(Some(&this.snapshot), this.log_store.clone(), operation)
114139
.await?;
115140

141+
this.post_execute(operation_id).await?;
142+
116143
Ok(DeltaTable::new_with_state(
117144
this.log_store,
118145
commit.snapshot(),

crates/core/src/operations/constraints.rs

+25-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use crate::{DeltaResult, DeltaTable, DeltaTableError};
2424

2525
use super::datafusion_utils::into_expr;
2626
use super::transaction::{CommitBuilder, CommitProperties};
27+
use super::{CustomExecuteHandler, Operation};
2728

2829
/// Build a constraint to add to a table
2930
pub struct ConstraintBuilder {
@@ -39,9 +40,17 @@ pub struct ConstraintBuilder {
3940
state: Option<SessionState>,
4041
/// Additional information to add to the commit
4142
commit_properties: CommitProperties,
43+
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
4244
}
4345

44-
impl super::Operation<()> for ConstraintBuilder {}
46+
impl super::Operation<()> for ConstraintBuilder {
47+
fn log_store(&self) -> &LogStoreRef {
48+
&self.log_store
49+
}
50+
fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
51+
self.custom_execute_handler.clone()
52+
}
53+
}
4554

4655
impl ConstraintBuilder {
4756
/// Create a new builder
@@ -53,6 +62,7 @@ impl ConstraintBuilder {
5362
log_store,
5463
state: None,
5564
commit_properties: CommitProperties::default(),
65+
custom_execute_handler: None,
5666
}
5767
}
5868

@@ -78,6 +88,12 @@ impl ConstraintBuilder {
7888
self.commit_properties = commit_properties;
7989
self
8090
}
91+
92+
/// Set a custom execute handler, for pre and post execution
93+
pub fn with_custom_execute_handler(mut self, handler: Arc<dyn CustomExecuteHandler>) -> Self {
94+
self.custom_execute_handler = Some(handler);
95+
self
96+
}
8197
}
8298

8399
impl std::future::IntoFuture for ConstraintBuilder {
@@ -94,6 +110,8 @@ impl std::future::IntoFuture for ConstraintBuilder {
94110
"ADD CONSTRAINTS".into(),
95111
));
96112
}
113+
let operation_id = this.get_operation_id();
114+
this.pre_execute(operation_id).await?;
97115

98116
let name = match this.name {
99117
Some(v) => v,
@@ -201,9 +219,15 @@ impl std::future::IntoFuture for ConstraintBuilder {
201219

202220
let commit = CommitBuilder::from(this.commit_properties)
203221
.with_actions(actions)
222+
.with_operation_id(operation_id)
223+
.with_post_commit_hook_handler(this.custom_execute_handler.clone())
204224
.build(Some(&this.snapshot), this.log_store.clone(), operation)
205225
.await?;
206226

227+
if let Some(handler) = this.custom_execute_handler {
228+
handler.post_execute(&this.log_store, operation_id).await?;
229+
}
230+
207231
Ok(DeltaTable::new_with_state(
208232
this.log_store,
209233
commit.snapshot(),

crates/core/src/operations/convert_to_delta.rs

+44-14
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use parquet::errors::ParquetError;
1515
use percent_encoding::percent_decode_str;
1616
use serde_json::{Map, Value};
1717
use tracing::debug;
18+
use uuid::Uuid;
1819

1920
use crate::operations::get_num_idx_cols_and_stats_columns;
2021
use crate::{
@@ -28,6 +29,8 @@ use crate::{
2829
DeltaResult, DeltaTable, DeltaTableError, ObjectStoreError, NULL_PARTITION_VALUE_DATA_PATH,
2930
};
3031

32+
use super::{CustomExecuteHandler, Operation};
33+
3134
/// Error converting a Parquet table to a Delta table
3235
#[derive(Debug, thiserror::Error)]
3336
enum Error {
@@ -107,6 +110,7 @@ pub struct ConvertToDeltaBuilder {
107110
comment: Option<String>,
108111
configuration: HashMap<String, Option<String>>,
109112
metadata: Option<Map<String, Value>>,
113+
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
110114
}
111115

112116
impl Default for ConvertToDeltaBuilder {
@@ -115,7 +119,16 @@ impl Default for ConvertToDeltaBuilder {
115119
}
116120
}
117121

118-
impl super::Operation<()> for ConvertToDeltaBuilder {}
122+
impl super::Operation<()> for ConvertToDeltaBuilder {
123+
fn log_store(&self) -> &LogStoreRef {
124+
self.log_store
125+
.as_ref()
126+
.expect("Log store should be available at this stage.")
127+
}
128+
fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
129+
self.custom_execute_handler.clone()
130+
}
131+
}
119132

120133
impl ConvertToDeltaBuilder {
121134
/// Create a new [`ConvertToDeltaBuilder`]
@@ -131,6 +144,7 @@ impl ConvertToDeltaBuilder {
131144
comment: None,
132145
configuration: Default::default(),
133146
metadata: Default::default(),
147+
custom_execute_handler: None,
134148
}
135149
}
136150

@@ -229,29 +243,38 @@ impl ConvertToDeltaBuilder {
229243
self
230244
}
231245

246+
/// Set a custom execute handler, for pre and post execution
247+
pub fn with_custom_execute_handler(mut self, handler: Arc<dyn CustomExecuteHandler>) -> Self {
248+
self.custom_execute_handler = Some(handler);
249+
self
250+
}
251+
232252
/// Consume self into CreateBuilder with corresponding add actions, schemas and operation meta
233-
async fn into_create_builder(self) -> Result<CreateBuilder, Error> {
253+
async fn into_create_builder(mut self) -> Result<(CreateBuilder, Uuid), Error> {
234254
// Use the specified log store. If a log store is not provided, create a new store from the specified path.
235255
// Return an error if neither log store nor path is provided
236-
let log_store = if let Some(log_store) = self.log_store {
237-
log_store
238-
} else if let Some(location) = self.location {
239-
crate::logstore::logstore_for(
256+
self.log_store = if let Some(log_store) = self.log_store {
257+
Some(log_store)
258+
} else if let Some(location) = self.location.clone() {
259+
Some(crate::logstore::logstore_for(
240260
ensure_table_uri(location)?,
241-
self.storage_options.unwrap_or_default(),
261+
self.storage_options.clone().unwrap_or_default(),
242262
None, // TODO: allow runtime to be passed into builder
243-
)?
263+
)?)
244264
} else {
245265
return Err(Error::MissingLocation);
246266
};
247267

268+
let operation_id = self.get_operation_id();
269+
self.pre_execute(operation_id).await?;
270+
248271
// Return an error if the location is already a Delta table location
249-
if log_store.is_delta_table_location().await? {
272+
if self.log_store().is_delta_table_location().await? {
250273
return Err(Error::DeltaTableAlready);
251274
}
252275
debug!(
253276
"Converting Parquet table in log store location: {:?}",
254-
log_store.root_uri()
277+
self.log_store().root_uri()
255278
);
256279

257280
// Get all the parquet files in the location
@@ -398,7 +421,7 @@ impl ConvertToDeltaBuilder {
398421

399422
// Generate CreateBuilder with corresponding add actions, schemas and operation meta
400423
let mut builder = CreateBuilder::new()
401-
.with_log_store(log_store)
424+
.with_log_store(self.log_store().clone())
402425
.with_columns(schema_fields.into_iter().cloned())
403426
.with_partition_columns(partition_columns.into_iter())
404427
.with_actions(actions)
@@ -413,8 +436,7 @@ impl ConvertToDeltaBuilder {
413436
if let Some(metadata) = self.metadata {
414437
builder = builder.with_metadata(metadata);
415438
}
416-
417-
Ok(builder)
439+
Ok((builder, operation_id))
418440
}
419441
}
420442

@@ -426,10 +448,18 @@ impl std::future::IntoFuture for ConvertToDeltaBuilder {
426448
let this = self;
427449

428450
Box::pin(async move {
429-
let builder = this
451+
let handler = this.custom_execute_handler.clone();
452+
let (builder, operation_id) = this
430453
.into_create_builder()
431454
.await
432455
.map_err(DeltaTableError::from)?;
456+
457+
if let Some(handler) = handler {
458+
handler
459+
.post_execute(builder.log_store(), operation_id)
460+
.await?;
461+
}
462+
433463
let table = builder.await?;
434464
Ok(table)
435465
})

0 commit comments

Comments
 (0)