5858import org .junit .jupiter .api .Assertions ;
5959import org .junit .jupiter .api .io .TempDir ;
6060import org .junit .jupiter .params .ParameterizedTest ;
61- import org .junit .jupiter .params .provider .ValueSource ;
61+ import org .junit .jupiter .params .provider .CsvSource ;
6262
6363import java .io .File ;
6464import java .io .IOException ;
@@ -140,7 +140,7 @@ private void initialize(String metastore)
140140 .dropDatabase (TEST_DATABASE , true , true );
141141 }
142142
143- private List <Event > createTestEvents () throws SchemaEvolveException {
143+ private List <Event > createTestEvents (boolean enableDeleteVectors ) throws SchemaEvolveException {
144144 List <Event > testEvents = new ArrayList <>();
145145 // create table
146146 Schema schema =
@@ -149,6 +149,7 @@ private List<Event> createTestEvents() throws SchemaEvolveException {
149149 .physicalColumn ("col2" , DataTypes .STRING ())
150150 .primaryKey ("col1" )
151151 .option ("bucket" , "1" )
152+ .option ("deletion-vectors.enabled" , String .valueOf (enableDeleteVectors ))
152153 .build ();
153154 CreateTableEvent createTableEvent = new CreateTableEvent (table1 , schema );
154155 testEvents .add (createTableEvent );
@@ -180,8 +181,8 @@ private List<Event> createTestEvents() throws SchemaEvolveException {
180181 }
181182
182183 @ ParameterizedTest
183- @ ValueSource ( strings = { "filesystem" , "hive" })
184- public void testSinkWithDataChange (String metastore )
184+ @ CsvSource ({ "filesystem, true" , "filesystem, false " , "hive, true" , "hive, false " })
185+ public void testSinkWithDataChange (String metastore , boolean enableDeleteVector )
185186 throws IOException , InterruptedException , Catalog .DatabaseNotEmptyException ,
186187 Catalog .DatabaseNotExistException , SchemaEvolveException {
187188 initialize (metastore );
@@ -192,7 +193,7 @@ public void testSinkWithDataChange(String metastore)
192193 Committer <MultiTableCommittable > committer = paimonSink .createCommitter ();
193194
194195 // insert
195- for (Event event : createTestEvents ()) {
196+ for (Event event : createTestEvents (enableDeleteVector )) {
196197 writer .write (event , null );
197198 }
198199 writer .flush (false );
@@ -215,7 +216,7 @@ public void testSinkWithDataChange(String metastore)
215216 // delete
216217 Event event =
217218 DataChangeEvent .deleteEvent (
218- TableId . tableId ( "test" , " table1" ) ,
219+ table1 ,
219220 generator .generate (
220221 new Object [] {
221222 BinaryStringData .fromString ("1" ),
@@ -240,7 +241,7 @@ public void testSinkWithDataChange(String metastore)
240241 // update
241242 event =
242243 DataChangeEvent .updateEvent (
243- TableId . tableId ( "test" , " table1" ) ,
244+ table1 ,
244245 generator .generate (
245246 new Object [] {
246247 BinaryStringData .fromString ("2" ),
@@ -273,17 +274,19 @@ public void testSinkWithDataChange(String metastore)
273274 .collect ()
274275 .forEachRemaining (result ::add );
275276 // Each commit will generate one sequence number(equal to checkpointId).
276- Assertions .assertEquals (
277- Arrays .asList (
278- Row .ofKind (RowKind .INSERT , 1L ),
279- Row .ofKind (RowKind .INSERT , 2L ),
280- Row .ofKind (RowKind .INSERT , 3L )),
281- result );
277+ List <Row > expected =
278+ enableDeleteVector
279+ ? Collections .singletonList (Row .ofKind (RowKind .INSERT , 3L ))
280+ : Arrays .asList (
281+ Row .ofKind (RowKind .INSERT , 1L ),
282+ Row .ofKind (RowKind .INSERT , 2L ),
283+ Row .ofKind (RowKind .INSERT , 3L ));
284+ Assertions .assertEquals (expected , result );
282285 }
283286
284287 @ ParameterizedTest
285- @ ValueSource ( strings = { "filesystem" , "hive" })
286- public void testSinkWithSchemaChange (String metastore )
288+ @ CsvSource ({ "filesystem, true" , "filesystem, false " , "hive, true" , "hive, false " })
289+ public void testSinkWithSchemaChange (String metastore , boolean enableDeleteVector )
287290 throws IOException , InterruptedException , Catalog .DatabaseNotEmptyException ,
288291 Catalog .DatabaseNotExistException , SchemaEvolveException {
289292 initialize (metastore );
@@ -294,7 +297,7 @@ public void testSinkWithSchemaChange(String metastore)
294297 Committer <MultiTableCommittable > committer = paimonSink .createCommitter ();
295298
296299 // 1. receive only DataChangeEvents during one checkpoint
297- for (Event event : createTestEvents ()) {
300+ for (Event event : createTestEvents (enableDeleteVector )) {
298301 writer .write (event , null );
299302 }
300303 writer .flush (false );
@@ -427,8 +430,8 @@ public void testSinkWithSchemaChange(String metastore)
427430 }
428431
429432 @ ParameterizedTest
430- @ ValueSource ( strings = { "filesystem" , "hive" })
431- public void testSinkWithMultiTables (String metastore )
433+ @ CsvSource ({ "filesystem, true" , "filesystem, false " , "hive, true" , "hive, false " })
434+ public void testSinkWithMultiTables (String metastore , boolean enableDeleteVector )
432435 throws IOException , InterruptedException , Catalog .DatabaseNotEmptyException ,
433436 Catalog .DatabaseNotExistException , SchemaEvolveException {
434437 initialize (metastore );
@@ -437,7 +440,7 @@ public void testSinkWithMultiTables(String metastore)
437440 catalogOptions , new PaimonRecordEventSerializer (ZoneId .systemDefault ()));
438441 PaimonWriter <Event > writer = paimonSink .createWriter (new MockInitContext ());
439442 Committer <MultiTableCommittable > committer = paimonSink .createCommitter ();
440- List <Event > testEvents = createTestEvents ();
443+ List <Event > testEvents = createTestEvents (enableDeleteVector );
441444 // create table
442445 TableId table2 = TableId .tableId ("test" , "table2" );
443446 Schema schema =
@@ -492,8 +495,8 @@ public void testSinkWithMultiTables(String metastore)
492495 }
493496
494497 @ ParameterizedTest
495- @ ValueSource ( strings = { "filesystem" , "hive" })
496- public void testDuplicateCommitAfterRestore (String metastore )
498+ @ CsvSource ({ "filesystem, true" , "filesystem, false " , "hive, true" , "hive, false " })
499+ public void testDuplicateCommitAfterRestore (String metastore , boolean enableDeleteVector )
497500 throws IOException , InterruptedException , Catalog .DatabaseNotEmptyException ,
498501 Catalog .DatabaseNotExistException , SchemaEvolveException {
499502 initialize (metastore );
@@ -504,7 +507,7 @@ public void testDuplicateCommitAfterRestore(String metastore)
504507 Committer <MultiTableCommittable > committer = paimonSink .createCommitter ();
505508
506509 // insert
507- for (Event event : createTestEvents ()) {
510+ for (Event event : createTestEvents (enableDeleteVector )) {
508511 writer .write (event , null );
509512 }
510513 writer .flush (false );
@@ -553,8 +556,13 @@ public void testDuplicateCommitAfterRestore(String metastore)
553556 .execute ()
554557 .collect ()
555558 .forEachRemaining (result ::add );
556- // 8 APPEND and 1 COMPACT
557- Assertions .assertEquals (result .size (), 9 );
559+ if (enableDeleteVector ) {
560+ // Each APPEND will trigger COMPACT once enable deletion-vectors.
561+ Assertions .assertEquals (16 , result .size ());
562+ } else {
563+ // 8 APPEND and 1 COMPACT
564+ Assertions .assertEquals (9 , result .size ());
565+ }
558566 result .clear ();
559567
560568 tEnv .sqlQuery ("select * from paimon_catalog.test.`table1`" )
0 commit comments