Skip to content

Commit ec54e61

Browse files
committed
[FLINK-35325][cdc-connector][paimon]Support for specifying column order when adding new columns to a table.
1 parent 272d5ee commit ec54e61

File tree

2 files changed

+104
-59
lines changed

2 files changed

+104
-59
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java

Lines changed: 18 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -148,22 +148,15 @@ private List<SchemaChange> applyAddColumnEventWithPosition(AddColumnEvent event)
148148
switch (columnWithPosition.getPosition()) {
149149
case FIRST:
150150
tableChange =
151-
SchemaChange.addColumn(
152-
columnWithPosition.getAddColumn().getName(),
153-
LogicalTypeConversion.toDataType(
154-
DataTypeUtils.toFlinkDataType(
155-
columnWithPosition
156-
.getAddColumn()
157-
.getType())
158-
.getLogicalType()),
159-
columnWithPosition.getAddColumn().getComment(),
151+
SchemaChangeProvider.add(
152+
columnWithPosition,
160153
SchemaChange.Move.first(
161154
columnWithPosition.getAddColumn().getName()));
162155
tableChangeList.add(tableChange);
163156
break;
164157
case LAST:
165158
SchemaChange schemaChangeWithLastPosition =
166-
applyAddColumnWithLastPosition(columnWithPosition);
159+
SchemaChangeProvider.add(columnWithPosition);
167160
tableChangeList.add(schemaChangeWithLastPosition);
168161
break;
169162
case BEFORE:
@@ -178,19 +171,11 @@ private List<SchemaChange> applyAddColumnEventWithPosition(AddColumnEvent event)
178171
checkNotNull(
179172
columnWithPosition.getExistedColumnName(),
180173
"Existing column name must be provided for AFTER position");
181-
tableChange =
182-
SchemaChange.addColumn(
174+
SchemaChange.Move after =
175+
SchemaChange.Move.after(
183176
columnWithPosition.getAddColumn().getName(),
184-
LogicalTypeConversion.toDataType(
185-
DataTypeUtils.toFlinkDataType(
186-
columnWithPosition
187-
.getAddColumn()
188-
.getType())
189-
.getLogicalType()),
190-
columnWithPosition.getAddColumn().getComment(),
191-
SchemaChange.Move.after(
192-
columnWithPosition.getAddColumn().getName(),
193-
columnWithPosition.getExistedColumnName()));
177+
columnWithPosition.getExistedColumnName());
178+
tableChange = SchemaChangeProvider.add(columnWithPosition, after);
194179
tableChangeList.add(tableChange);
195180
break;
196181
default:
@@ -201,16 +186,6 @@ private List<SchemaChange> applyAddColumnEventWithPosition(AddColumnEvent event)
201186
return tableChangeList;
202187
}
203188

204-
private SchemaChange applyAddColumnWithLastPosition(
205-
AddColumnEvent.ColumnWithPosition columnWithPosition) {
206-
return SchemaChange.addColumn(
207-
columnWithPosition.getAddColumn().getName(),
208-
LogicalTypeConversion.toDataType(
209-
DataTypeUtils.toFlinkDataType(columnWithPosition.getAddColumn().getType())
210-
.getLogicalType()),
211-
columnWithPosition.getAddColumn().getComment());
212-
}
213-
214189
private SchemaChange applyAddColumnWithBeforePosition(
215190
String schemaName,
216191
String tableName,
@@ -220,23 +195,19 @@ private SchemaChange applyAddColumnWithBeforePosition(
220195
Table table = catalog.getTable(new Identifier(schemaName, tableName));
221196
List<String> columnNames = table.rowType().getFieldNames();
222197
int index = checkColumnPosition(existedColumnName, columnNames);
223-
224-
return SchemaChange.addColumn(
225-
columnWithPosition.getAddColumn().getName(),
226-
LogicalTypeConversion.toDataType(
227-
DataTypeUtils.toFlinkDataType(columnWithPosition.getAddColumn().getType())
228-
.getLogicalType()),
229-
columnWithPosition.getAddColumn().getComment(),
198+
SchemaChange.Move after =
230199
SchemaChange.Move.after(
231-
columnWithPosition.getAddColumn().getName(), columnNames.get(index - 1)));
200+
columnWithPosition.getAddColumn().getName(), columnNames.get(index - 1));
201+
202+
return SchemaChangeProvider.add(columnWithPosition, after);
232203
}
233204

234205
private int checkColumnPosition(String existedColumnName, List<String> columnNames) {
235206
if (existedColumnName == null) {
236207
return 0;
237208
}
238209
int index = columnNames.indexOf(existedColumnName);
239-
checkArgument(index == -1, "Column %s not found", existedColumnName);
210+
checkArgument(index != -1, "Column %s not found", existedColumnName);
240211
return index;
241212
}
242213

@@ -245,11 +216,7 @@ private void applyDropColumn(DropColumnEvent event)
245216
Catalog.ColumnNotExistException {
246217
List<SchemaChange> tableChangeList = new ArrayList<>();
247218
event.getDroppedColumnNames()
248-
.forEach(
249-
(column) -> {
250-
SchemaChange tableChange = SchemaChange.dropColumn(column);
251-
tableChangeList.add(tableChange);
252-
});
219+
.forEach((column) -> tableChangeList.add(SchemaChangeProvider.drop(column)));
253220
catalog.alterTable(
254221
new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()),
255222
tableChangeList,
@@ -262,10 +229,8 @@ private void applyRenameColumn(RenameColumnEvent event)
262229
List<SchemaChange> tableChangeList = new ArrayList<>();
263230
event.getNameMapping()
264231
.forEach(
265-
(oldName, newName) -> {
266-
SchemaChange tableChange = SchemaChange.renameColumn(oldName, newName);
267-
tableChangeList.add(tableChange);
268-
});
232+
(oldName, newName) ->
233+
tableChangeList.add(SchemaChangeProvider.rename(oldName, newName)));
269234
catalog.alterTable(
270235
new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()),
271236
tableChangeList,
@@ -278,15 +243,9 @@ private void applyAlterColumn(AlterColumnTypeEvent event)
278243
List<SchemaChange> tableChangeList = new ArrayList<>();
279244
event.getTypeMapping()
280245
.forEach(
281-
(oldName, newType) -> {
282-
SchemaChange tableChange =
283-
SchemaChange.updateColumnType(
284-
oldName,
285-
LogicalTypeConversion.toDataType(
286-
DataTypeUtils.toFlinkDataType(newType)
287-
.getLogicalType()));
288-
tableChangeList.add(tableChange);
289-
});
246+
(oldName, newType) ->
247+
tableChangeList.add(
248+
SchemaChangeProvider.updateColumnType(oldName, newType)));
290249
catalog.alterTable(
291250
new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()),
292251
tableChangeList,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package org.apache.flink.cdc.connectors.paimon.sink;
2+
3+
import org.apache.flink.cdc.common.event.AddColumnEvent;
4+
import org.apache.flink.cdc.common.types.DataType;
5+
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
6+
7+
import org.apache.paimon.flink.LogicalTypeConversion;
8+
import org.apache.paimon.schema.SchemaChange;
9+
10+
/**
11+
* The SchemaChangeProvider class provides static methods to create SchemaChange objects that
12+
* represent different types of schema modifications.
13+
*/
14+
public class SchemaChangeProvider {
15+
16+
/**
17+
* Creates a SchemaChange object for adding a column without specifying its position.
18+
*
19+
* @param columnWithPosition The ColumnWithPosition object containing the column details and its
20+
* intended position within the schema.
21+
* @return A SchemaChange object representing the addition of a column.
22+
*/
23+
public static SchemaChange add(AddColumnEvent.ColumnWithPosition columnWithPosition) {
24+
return SchemaChange.addColumn(
25+
columnWithPosition.getAddColumn().getName(),
26+
LogicalTypeConversion.toDataType(
27+
DataTypeUtils.toFlinkDataType(columnWithPosition.getAddColumn().getType())
28+
.getLogicalType()),
29+
columnWithPosition.getAddColumn().getComment());
30+
}
31+
32+
/**
33+
* Creates a SchemaChange object for adding a column with a specified position.
34+
*
35+
* @param columnWithPosition The ColumnWithPosition object containing the column details and its
36+
* intended position within the schema.
37+
* @param move The move operation to indicate the column's new position.
38+
* @return A SchemaChange object representing the addition of a column with position
39+
* information.
40+
*/
41+
public static SchemaChange add(
42+
AddColumnEvent.ColumnWithPosition columnWithPosition, SchemaChange.Move move) {
43+
return SchemaChange.addColumn(
44+
columnWithPosition.getAddColumn().getName(),
45+
LogicalTypeConversion.toDataType(
46+
DataTypeUtils.toFlinkDataType(columnWithPosition.getAddColumn().getType())
47+
.getLogicalType()),
48+
columnWithPosition.getAddColumn().getComment(),
49+
move);
50+
}
51+
52+
/**
53+
* Creates a SchemaChange object to update the data type of a column.
54+
*
55+
* @param oldColumnName The name of the column whose data type is to be updated.
56+
* @param newType The new DataType for the column.
57+
* @return A SchemaChange object representing the update of the column's data type.
58+
*/
59+
public static SchemaChange updateColumnType(String oldColumnName, DataType newType) {
60+
return SchemaChange.updateColumnType(
61+
oldColumnName,
62+
LogicalTypeConversion.toDataType(
63+
DataTypeUtils.toFlinkDataType(newType).getLogicalType()));
64+
}
65+
66+
/**
67+
* Creates a SchemaChange object for renaming a column.
68+
*
69+
* @param oldColumnName The current name of the column to be renamed.
70+
* @param newColumnName The new name for the column.
71+
* @return A SchemaChange object representing the renaming of a column.
72+
*/
73+
public static SchemaChange rename(String oldColumnName, String newColumnName) {
74+
return SchemaChange.renameColumn(oldColumnName, newColumnName);
75+
}
76+
77+
/**
78+
* Creates a SchemaChange object for dropping a column.
79+
*
80+
* @param columnName The name of the column to be dropped.
81+
* @return A SchemaChange object representing the deletion of a column.
82+
*/
83+
public static SchemaChange drop(String columnName) {
84+
return SchemaChange.dropColumn(columnName);
85+
}
86+
}

0 commit comments

Comments
 (0)