@@ -395,6 +395,9 @@ impl std::future::IntoFuture for DeleteBuilder {
395
395
396
396
#[ cfg( test) ]
397
397
mod tests {
398
+ use crate :: delta_datafusion:: cdf:: DeltaCdfScan ;
399
+ use crate :: kernel:: DataType as DeltaDataType ;
400
+ use crate :: operations:: collect_sendable_stream;
398
401
use crate :: operations:: DeltaOps ;
399
402
use crate :: protocol:: * ;
400
403
use crate :: writer:: test_utils:: datafusion:: get_data;
@@ -408,11 +411,15 @@ mod tests {
408
411
use arrow:: datatypes:: { Field , Schema } ;
409
412
use arrow:: record_batch:: RecordBatch ;
410
413
use arrow_array:: ArrayRef ;
414
+ use arrow_array:: StringArray ;
411
415
use arrow_array:: StructArray ;
412
416
use arrow_buffer:: NullBuffer ;
417
+ use arrow_schema:: DataType ;
413
418
use arrow_schema:: Fields ;
414
419
use datafusion:: assert_batches_sorted_eq;
420
+ use datafusion:: physical_plan:: ExecutionPlan ;
415
421
use datafusion:: prelude:: * ;
422
+ use delta_kernel:: schema:: PrimitiveType ;
416
423
use serde_json:: json;
417
424
use std:: sync:: Arc ;
418
425
@@ -868,4 +875,174 @@ mod tests {
868
875
. await ;
869
876
assert ! ( res. is_err( ) ) ;
870
877
}
878
+
879
+ #[ tokio:: test]
880
+ async fn test_delete_cdc_enabled ( ) {
881
+ let table: DeltaTable = DeltaOps :: new_in_memory ( )
882
+ . create ( )
883
+ . with_column (
884
+ "value" ,
885
+ DeltaDataType :: Primitive ( PrimitiveType :: Integer ) ,
886
+ true ,
887
+ None ,
888
+ )
889
+ . with_configuration_property ( DeltaConfigKey :: EnableChangeDataFeed , Some ( "true" ) )
890
+ . await
891
+ . unwrap ( ) ;
892
+ assert_eq ! ( table. version( ) , 0 ) ;
893
+
894
+ let schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new(
895
+ "value" ,
896
+ arrow:: datatypes:: DataType :: Int32 ,
897
+ true ,
898
+ ) ] ) ) ;
899
+
900
+ let batch = RecordBatch :: try_new (
901
+ Arc :: clone ( & schema) ,
902
+ vec ! [ Arc :: new( Int32Array :: from( vec![ Some ( 1 ) , Some ( 2 ) , Some ( 3 ) ] ) ) ] ,
903
+ )
904
+ . unwrap ( ) ;
905
+ let table = DeltaOps ( table)
906
+ . write ( vec ! [ batch] )
907
+ . await
908
+ . expect ( "Failed to write first batch" ) ;
909
+ assert_eq ! ( table. version( ) , 1 ) ;
910
+
911
+ let ( table, _metrics) = DeltaOps ( table)
912
+ . delete ( )
913
+ . with_predicate ( col ( "value" ) . eq ( lit ( 2 ) ) )
914
+ . await
915
+ . unwrap ( ) ;
916
+ assert_eq ! ( table. version( ) , 2 ) ;
917
+
918
+ let ctx = SessionContext :: new ( ) ;
919
+ let table = DeltaOps ( table)
920
+ . load_cdf ( )
921
+ . with_session_ctx ( ctx. clone ( ) )
922
+ . with_starting_version ( 0 )
923
+ . build ( )
924
+ . await
925
+ . expect ( "Failed to load CDF" ) ;
926
+
927
+ let mut batches = collect_batches (
928
+ table. properties ( ) . output_partitioning ( ) . partition_count ( ) ,
929
+ table,
930
+ ctx,
931
+ )
932
+ . await
933
+ . expect ( "Failed to collect batches" ) ;
934
+
935
+ // The batches will contain a current _commit_timestamp which shouldn't be check_append_only
936
+ let _: Vec < _ > = batches. iter_mut ( ) . map ( |b| b. remove_column ( 3 ) ) . collect ( ) ;
937
+
938
+ assert_batches_sorted_eq ! { [
939
+ "+-------+--------------+-----------------+" ,
940
+ "| value | _change_type | _commit_version |" ,
941
+ "+-------+--------------+-----------------+" ,
942
+ "| 1 | insert | 1 |" ,
943
+ "| 2 | delete | 2 |" ,
944
+ "| 2 | insert | 1 |" ,
945
+ "| 3 | insert | 1 |" ,
946
+ "+-------+--------------+-----------------+" ,
947
+ ] , & batches }
948
+ }
949
+
950
+ #[ tokio:: test]
951
+ async fn test_delete_cdc_enabled_partitioned ( ) {
952
+ let table: DeltaTable = DeltaOps :: new_in_memory ( )
953
+ . create ( )
954
+ . with_column (
955
+ "year" ,
956
+ DeltaDataType :: Primitive ( PrimitiveType :: String ) ,
957
+ true ,
958
+ None ,
959
+ )
960
+ . with_column (
961
+ "value" ,
962
+ DeltaDataType :: Primitive ( PrimitiveType :: Integer ) ,
963
+ true ,
964
+ None ,
965
+ )
966
+ . with_partition_columns ( vec ! [ "year" ] )
967
+ . with_configuration_property ( DeltaConfigKey :: EnableChangeDataFeed , Some ( "true" ) )
968
+ . await
969
+ . unwrap ( ) ;
970
+ assert_eq ! ( table. version( ) , 0 ) ;
971
+
972
+ let schema = Arc :: new ( Schema :: new ( vec ! [
973
+ Field :: new( "year" , DataType :: Utf8 , true ) ,
974
+ Field :: new( "value" , DataType :: Int32 , true ) ,
975
+ ] ) ) ;
976
+
977
+ let batch = RecordBatch :: try_new (
978
+ Arc :: clone ( & schema) ,
979
+ vec ! [
980
+ Arc :: new( StringArray :: from( vec![
981
+ Some ( "2020" ) ,
982
+ Some ( "2020" ) ,
983
+ Some ( "2024" ) ,
984
+ ] ) ) ,
985
+ Arc :: new( Int32Array :: from( vec![ Some ( 1 ) , Some ( 2 ) , Some ( 3 ) ] ) ) ,
986
+ ] ,
987
+ )
988
+ . unwrap ( ) ;
989
+
990
+ let table = DeltaOps ( table)
991
+ . write ( vec ! [ batch] )
992
+ . await
993
+ . expect ( "Failed to write first batch" ) ;
994
+ assert_eq ! ( table. version( ) , 1 ) ;
995
+
996
+ let ( table, _metrics) = DeltaOps ( table)
997
+ . delete ( )
998
+ . with_predicate ( col ( "value" ) . eq ( lit ( 2 ) ) )
999
+ . await
1000
+ . unwrap ( ) ;
1001
+ assert_eq ! ( table. version( ) , 2 ) ;
1002
+
1003
+ let ctx = SessionContext :: new ( ) ;
1004
+ let table = DeltaOps ( table)
1005
+ . load_cdf ( )
1006
+ . with_session_ctx ( ctx. clone ( ) )
1007
+ . with_starting_version ( 0 )
1008
+ . build ( )
1009
+ . await
1010
+ . expect ( "Failed to load CDF" ) ;
1011
+
1012
+ let mut batches = collect_batches (
1013
+ table. properties ( ) . output_partitioning ( ) . partition_count ( ) ,
1014
+ table,
1015
+ ctx,
1016
+ )
1017
+ . await
1018
+ . expect ( "Failed to collect batches" ) ;
1019
+
1020
+ // The batches will contain a current _commit_timestamp which shouldn't be check_append_only
1021
+ let _: Vec < _ > = batches. iter_mut ( ) . map ( |b| b. remove_column ( 3 ) ) . collect ( ) ;
1022
+
1023
+ assert_batches_sorted_eq ! { [
1024
+ "+-------+--------------+-----------------+------+" ,
1025
+ "| value | _change_type | _commit_version | year |" ,
1026
+ "+-------+--------------+-----------------+------+" ,
1027
+ "| 1 | insert | 1 | 2020 |" ,
1028
+ "| 2 | delete | 2 | 2020 |" ,
1029
+ "| 2 | insert | 1 | 2020 |" ,
1030
+ "| 3 | insert | 1 | 2024 |" ,
1031
+ "+-------+--------------+-----------------+------+" ,
1032
+ ] , & batches }
1033
+ }
1034
+
1035
+ async fn collect_batches (
1036
+ num_partitions : usize ,
1037
+ stream : DeltaCdfScan ,
1038
+ ctx : SessionContext ,
1039
+ ) -> Result < Vec < RecordBatch > , Box < dyn std:: error:: Error > > {
1040
+ let mut batches = vec ! [ ] ;
1041
+ for p in 0 ..num_partitions {
1042
+ let data: Vec < RecordBatch > =
1043
+ collect_sendable_stream ( stream. execute ( p, ctx. task_ctx ( ) ) ?) . await ?;
1044
+ batches. extend_from_slice ( & data) ;
1045
+ }
1046
+ Ok ( batches)
1047
+ }
871
1048
}
0 commit comments