Skip to content

Commit 510cd2a

Browse files
committed
[FLINK-35325][cdc-connector][paimon]Support for specifying column order when adding new columns to a table.
1 parent 0e8b2c7 commit 510cd2a

File tree

4 files changed

+286
-31
lines changed

4 files changed

+286
-31
lines changed

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AddColumnEvent.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,24 @@ public AddColumnEvent(TableId tableId, List<ColumnWithPosition> addedColumns) {
4444
this.addedColumns = addedColumns;
4545
}
4646

47+
public static AddColumnEvent.ColumnWithPosition first(Column addColumn) {
48+
return new ColumnWithPosition(addColumn, ColumnPosition.FIRST, null);
49+
}
50+
51+
public static AddColumnEvent.ColumnWithPosition last(Column addColumn) {
52+
return new ColumnWithPosition(addColumn, ColumnPosition.LAST, null);
53+
}
54+
55+
public static AddColumnEvent.ColumnWithPosition before(
56+
Column addColumn, String existedColumnName) {
57+
return new ColumnWithPosition(addColumn, ColumnPosition.BEFORE, existedColumnName);
58+
}
59+
60+
public static AddColumnEvent.ColumnWithPosition after(
61+
Column addColumn, String existedColumnName) {
62+
return new ColumnWithPosition(addColumn, ColumnPosition.AFTER, existedColumnName);
63+
}
64+
4765
/** Returns the added columns. */
4866
public List<ColumnWithPosition> getAddedColumns() {
4967
return addedColumns;

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java

Lines changed: 82 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,16 @@
3434
import org.apache.paimon.flink.LogicalTypeConversion;
3535
import org.apache.paimon.options.Options;
3636
import org.apache.paimon.schema.SchemaChange;
37+
import org.apache.paimon.table.Table;
3738

3839
import java.util.ArrayList;
3940
import java.util.HashMap;
4041
import java.util.List;
4142
import java.util.Map;
4243

44+
import static org.apache.flink.cdc.common.utils.Preconditions.checkArgument;
45+
import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;
46+
4347
/**
4448
* A {@code MetadataApplier} that applies metadata changes to Paimon. Support primary key table
4549
* only.
@@ -129,35 +133,90 @@ private void applyCreateTable(CreateTableEvent event)
129133
private void applyAddColumn(AddColumnEvent event)
130134
throws Catalog.TableNotExistException, Catalog.ColumnAlreadyExistException,
131135
Catalog.ColumnNotExistException {
132-
List<SchemaChange> tableChangeList = new ArrayList<>();
133-
event.getAddedColumns()
134-
.forEach(
135-
(column) -> {
136-
SchemaChange tableChange =
137-
SchemaChange.addColumn(
138-
column.getAddColumn().getName(),
139-
LogicalTypeConversion.toDataType(
140-
DataTypeUtils.toFlinkDataType(
141-
column.getAddColumn().getType())
142-
.getLogicalType()));
143-
tableChangeList.add(tableChange);
144-
});
136+
List<SchemaChange> tableChangeList = applyAddColumnEventWithPosition(event);
145137
catalog.alterTable(
146138
new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()),
147139
tableChangeList,
148140
true);
149141
}
150142

143+
private List<SchemaChange> applyAddColumnEventWithPosition(AddColumnEvent event)
144+
throws Catalog.TableNotExistException {
145+
List<SchemaChange> tableChangeList = new ArrayList<>();
146+
for (AddColumnEvent.ColumnWithPosition columnWithPosition : event.getAddedColumns()) {
147+
SchemaChange tableChange;
148+
switch (columnWithPosition.getPosition()) {
149+
case FIRST:
150+
tableChange =
151+
SchemaChangeProvider.add(
152+
columnWithPosition,
153+
SchemaChange.Move.first(
154+
columnWithPosition.getAddColumn().getName()));
155+
tableChangeList.add(tableChange);
156+
break;
157+
case LAST:
158+
SchemaChange schemaChangeWithLastPosition =
159+
SchemaChangeProvider.add(columnWithPosition);
160+
tableChangeList.add(schemaChangeWithLastPosition);
161+
break;
162+
case BEFORE:
163+
SchemaChange schemaChangeWithBeforePosition =
164+
applyAddColumnWithBeforePosition(
165+
event.tableId().getSchemaName(),
166+
event.tableId().getTableName(),
167+
columnWithPosition);
168+
tableChangeList.add(schemaChangeWithBeforePosition);
169+
break;
170+
case AFTER:
171+
checkNotNull(
172+
columnWithPosition.getExistedColumnName(),
173+
"Existing column name must be provided for AFTER position");
174+
SchemaChange.Move after =
175+
SchemaChange.Move.after(
176+
columnWithPosition.getAddColumn().getName(),
177+
columnWithPosition.getExistedColumnName());
178+
tableChange = SchemaChangeProvider.add(columnWithPosition, after);
179+
tableChangeList.add(tableChange);
180+
break;
181+
default:
182+
throw new IllegalArgumentException(
183+
"Unknown column position: " + columnWithPosition.getPosition());
184+
}
185+
}
186+
return tableChangeList;
187+
}
188+
189+
private SchemaChange applyAddColumnWithBeforePosition(
190+
String schemaName,
191+
String tableName,
192+
AddColumnEvent.ColumnWithPosition columnWithPosition)
193+
throws Catalog.TableNotExistException {
194+
String existedColumnName = columnWithPosition.getExistedColumnName();
195+
Table table = catalog.getTable(new Identifier(schemaName, tableName));
196+
List<String> columnNames = table.rowType().getFieldNames();
197+
int index = checkColumnPosition(existedColumnName, columnNames);
198+
SchemaChange.Move after =
199+
SchemaChange.Move.after(
200+
columnWithPosition.getAddColumn().getName(), columnNames.get(index - 1));
201+
202+
return SchemaChangeProvider.add(columnWithPosition, after);
203+
}
204+
205+
private int checkColumnPosition(String existedColumnName, List<String> columnNames) {
206+
if (existedColumnName == null) {
207+
return 0;
208+
}
209+
int index = columnNames.indexOf(existedColumnName);
210+
checkArgument(index != -1, "Column %s not found", existedColumnName);
211+
return index;
212+
}
213+
151214
private void applyDropColumn(DropColumnEvent event)
152215
throws Catalog.ColumnAlreadyExistException, Catalog.TableNotExistException,
153216
Catalog.ColumnNotExistException {
154217
List<SchemaChange> tableChangeList = new ArrayList<>();
155218
event.getDroppedColumnNames()
156-
.forEach(
157-
(column) -> {
158-
SchemaChange tableChange = SchemaChange.dropColumn(column);
159-
tableChangeList.add(tableChange);
160-
});
219+
.forEach((column) -> tableChangeList.add(SchemaChangeProvider.drop(column)));
161220
catalog.alterTable(
162221
new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()),
163222
tableChangeList,
@@ -170,10 +229,8 @@ private void applyRenameColumn(RenameColumnEvent event)
170229
List<SchemaChange> tableChangeList = new ArrayList<>();
171230
event.getNameMapping()
172231
.forEach(
173-
(oldName, newName) -> {
174-
SchemaChange tableChange = SchemaChange.renameColumn(oldName, newName);
175-
tableChangeList.add(tableChange);
176-
});
232+
(oldName, newName) ->
233+
tableChangeList.add(SchemaChangeProvider.rename(oldName, newName)));
177234
catalog.alterTable(
178235
new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()),
179236
tableChangeList,
@@ -186,15 +243,9 @@ private void applyAlterColumn(AlterColumnTypeEvent event)
186243
List<SchemaChange> tableChangeList = new ArrayList<>();
187244
event.getTypeMapping()
188245
.forEach(
189-
(oldName, newType) -> {
190-
SchemaChange tableChange =
191-
SchemaChange.updateColumnType(
192-
oldName,
193-
LogicalTypeConversion.toDataType(
194-
DataTypeUtils.toFlinkDataType(newType)
195-
.getLogicalType()));
196-
tableChangeList.add(tableChange);
197-
});
246+
(oldName, newType) ->
247+
tableChangeList.add(
248+
SchemaChangeProvider.updateColumnType(oldName, newType)));
198249
catalog.alterTable(
199250
new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()),
200251
tableChangeList,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.paimon.sink;
19+
20+
import org.apache.flink.cdc.common.event.AddColumnEvent;
21+
import org.apache.flink.cdc.common.types.DataType;
22+
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
23+
24+
import org.apache.paimon.flink.LogicalTypeConversion;
25+
import org.apache.paimon.schema.SchemaChange;
26+
27+
/**
28+
* The SchemaChangeProvider class provides static methods to create SchemaChange objects that
29+
* represent different types of schema modifications.
30+
*/
31+
public class SchemaChangeProvider {
32+
33+
/**
34+
* Creates a SchemaChange object for adding a column without specifying its position.
35+
*
36+
* @param columnWithPosition The ColumnWithPosition object containing the column details and its
37+
* intended position within the schema.
38+
* @return A SchemaChange object representing the addition of a column.
39+
*/
40+
public static SchemaChange add(AddColumnEvent.ColumnWithPosition columnWithPosition) {
41+
return SchemaChange.addColumn(
42+
columnWithPosition.getAddColumn().getName(),
43+
LogicalTypeConversion.toDataType(
44+
DataTypeUtils.toFlinkDataType(columnWithPosition.getAddColumn().getType())
45+
.getLogicalType()),
46+
columnWithPosition.getAddColumn().getComment());
47+
}
48+
49+
/**
50+
* Creates a SchemaChange object for adding a column with a specified position.
51+
*
52+
* @param columnWithPosition The ColumnWithPosition object containing the column details and its
53+
* intended position within the schema.
54+
* @param move The move operation to indicate the column's new position.
55+
* @return A SchemaChange object representing the addition of a column with position
56+
* information.
57+
*/
58+
public static SchemaChange add(
59+
AddColumnEvent.ColumnWithPosition columnWithPosition, SchemaChange.Move move) {
60+
return SchemaChange.addColumn(
61+
columnWithPosition.getAddColumn().getName(),
62+
LogicalTypeConversion.toDataType(
63+
DataTypeUtils.toFlinkDataType(columnWithPosition.getAddColumn().getType())
64+
.getLogicalType()),
65+
columnWithPosition.getAddColumn().getComment(),
66+
move);
67+
}
68+
69+
/**
70+
* Creates a SchemaChange object to update the data type of a column.
71+
*
72+
* @param oldColumnName The name of the column whose data type is to be updated.
73+
* @param newType The new DataType for the column.
74+
* @return A SchemaChange object representing the update of the column's data type.
75+
*/
76+
public static SchemaChange updateColumnType(String oldColumnName, DataType newType) {
77+
return SchemaChange.updateColumnType(
78+
oldColumnName,
79+
LogicalTypeConversion.toDataType(
80+
DataTypeUtils.toFlinkDataType(newType).getLogicalType()));
81+
}
82+
83+
/**
84+
* Creates a SchemaChange object for renaming a column.
85+
*
86+
* @param oldColumnName The current name of the column to be renamed.
87+
* @param newColumnName The new name for the column.
88+
* @return A SchemaChange object representing the renaming of a column.
89+
*/
90+
public static SchemaChange rename(String oldColumnName, String newColumnName) {
91+
return SchemaChange.renameColumn(oldColumnName, newColumnName);
92+
}
93+
94+
/**
95+
* Creates a SchemaChange object for dropping a column.
96+
*
97+
* @param columnName The name of the column to be dropped.
98+
* @return A SchemaChange object representing the deletion of a column.
99+
*/
100+
public static SchemaChange drop(String columnName) {
101+
return SchemaChange.dropColumn(columnName);
102+
}
103+
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,4 +333,87 @@ public void testCreateTableWithAllDataTypes(String metastore)
333333
Assertions.assertEquals(
334334
tableSchema, catalog.getTable(Identifier.fromString("test.table1")).rowType());
335335
}
336+
337+
@ParameterizedTest
338+
@ValueSource(strings = {"filesystem", "hive"})
339+
public void testAddColumnWithPosition(String metastore)
340+
throws Catalog.DatabaseNotEmptyException, Catalog.DatabaseNotExistException,
341+
Catalog.TableNotExistException {
342+
initialize(metastore);
343+
MetadataApplier metadataApplier = new PaimonMetadataApplier(catalogOptions);
344+
345+
CreateTableEvent createTableEvent =
346+
new CreateTableEvent(
347+
TableId.parse("test.table1"),
348+
org.apache.flink.cdc.common.schema.Schema.newBuilder()
349+
.physicalColumn(
350+
"col1",
351+
org.apache.flink.cdc.common.types.DataTypes.STRING()
352+
.notNull())
353+
.physicalColumn(
354+
"col2", org.apache.flink.cdc.common.types.DataTypes.INT())
355+
.primaryKey("col1")
356+
.build());
357+
metadataApplier.applySchemaChange(createTableEvent);
358+
359+
List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
360+
addedColumns.add(
361+
new AddColumnEvent.ColumnWithPosition(
362+
Column.physicalColumn(
363+
"col3",
364+
org.apache.flink.cdc.common.types.DataTypes
365+
.STRING()))); // default last position.
366+
AddColumnEvent addColumnEvent =
367+
new AddColumnEvent(TableId.parse("test.table1"), addedColumns);
368+
metadataApplier.applySchemaChange(addColumnEvent);
369+
RowType tableSchema =
370+
new RowType(
371+
Arrays.asList(
372+
new DataField(0, "col1", DataTypes.STRING().notNull()),
373+
new DataField(1, "col2", DataTypes.INT()),
374+
new DataField(2, "col3", DataTypes.STRING())));
375+
376+
Assertions.assertEquals(
377+
tableSchema, catalog.getTable(Identifier.fromString("test.table1")).rowType());
378+
379+
addedColumns.clear();
380+
addedColumns.add(
381+
AddColumnEvent.first(
382+
Column.physicalColumn(
383+
"col4_first",
384+
org.apache.flink.cdc.common.types.DataTypes.STRING())));
385+
addedColumns.add(
386+
AddColumnEvent.last(
387+
Column.physicalColumn(
388+
"col5_last",
389+
org.apache.flink.cdc.common.types.DataTypes.STRING())));
390+
addedColumns.add(
391+
AddColumnEvent.before(
392+
Column.physicalColumn(
393+
"col6_before",
394+
org.apache.flink.cdc.common.types.DataTypes.STRING()),
395+
"col2"));
396+
addedColumns.add(
397+
AddColumnEvent.after(
398+
Column.physicalColumn(
399+
"col7_after", org.apache.flink.cdc.common.types.DataTypes.STRING()),
400+
"col2"));
401+
402+
addColumnEvent = new AddColumnEvent(TableId.parse("test.table1"), addedColumns);
403+
metadataApplier.applySchemaChange(addColumnEvent);
404+
405+
tableSchema =
406+
new RowType(
407+
Arrays.asList(
408+
new DataField(3, "col4_first", DataTypes.STRING()),
409+
new DataField(0, "col1", DataTypes.STRING().notNull()),
410+
new DataField(5, "col6_before", DataTypes.STRING()),
411+
new DataField(1, "col2", DataTypes.INT()),
412+
new DataField(6, "col7_after", DataTypes.STRING()),
413+
new DataField(2, "col3", DataTypes.STRING()),
414+
new DataField(4, "col5_last", DataTypes.STRING())));
415+
416+
Assertions.assertEquals(
417+
tableSchema, catalog.getTable(Identifier.fromString("test.table1")).rowType());
418+
}
336419
}

0 commit comments

Comments
 (0)