Skip to content

Commit 272d5ee

Browse files
committed
[FLINK-35325][cdc-connector][paimon]Support for specifying column order when adding new columns to a table.
1 parent 0e8b2c7 commit 272d5ee

File tree

3 files changed

+206
-13
lines changed

3 files changed

+206
-13
lines changed

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/event/AddColumnEvent.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,24 @@ public AddColumnEvent(TableId tableId, List<ColumnWithPosition> addedColumns) {
4444
this.addedColumns = addedColumns;
4545
}
4646

47+
public static AddColumnEvent.ColumnWithPosition first(Column addColumn) {
48+
return new ColumnWithPosition(addColumn, ColumnPosition.FIRST, null);
49+
}
50+
51+
public static AddColumnEvent.ColumnWithPosition last(Column addColumn) {
52+
return new ColumnWithPosition(addColumn, ColumnPosition.LAST, null);
53+
}
54+
55+
public static AddColumnEvent.ColumnWithPosition before(
56+
Column addColumn, String existedColumnName) {
57+
return new ColumnWithPosition(addColumn, ColumnPosition.BEFORE, existedColumnName);
58+
}
59+
60+
public static AddColumnEvent.ColumnWithPosition after(
61+
Column addColumn, String existedColumnName) {
62+
return new ColumnWithPosition(addColumn, ColumnPosition.AFTER, existedColumnName);
63+
}
64+
4765
/** Returns the added columns. */
4866
public List<ColumnWithPosition> getAddedColumns() {
4967
return addedColumns;

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java

Lines changed: 105 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,16 @@
3434
import org.apache.paimon.flink.LogicalTypeConversion;
3535
import org.apache.paimon.options.Options;
3636
import org.apache.paimon.schema.SchemaChange;
37+
import org.apache.paimon.table.Table;
3738

3839
import java.util.ArrayList;
3940
import java.util.HashMap;
4041
import java.util.List;
4142
import java.util.Map;
4243

44+
import static org.apache.flink.cdc.common.utils.Preconditions.checkArgument;
45+
import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;
46+
4347
/**
4448
* A {@code MetadataApplier} that applies metadata changes to Paimon. Support primary key table
4549
* only.
@@ -129,25 +133,113 @@ private void applyCreateTable(CreateTableEvent event)
129133
private void applyAddColumn(AddColumnEvent event)
130134
throws Catalog.TableNotExistException, Catalog.ColumnAlreadyExistException,
131135
Catalog.ColumnNotExistException {
132-
List<SchemaChange> tableChangeList = new ArrayList<>();
133-
event.getAddedColumns()
134-
.forEach(
135-
(column) -> {
136-
SchemaChange tableChange =
137-
SchemaChange.addColumn(
138-
column.getAddColumn().getName(),
139-
LogicalTypeConversion.toDataType(
140-
DataTypeUtils.toFlinkDataType(
141-
column.getAddColumn().getType())
142-
.getLogicalType()));
143-
tableChangeList.add(tableChange);
144-
});
136+
List<SchemaChange> tableChangeList = applyAddColumnEventWithPosition(event);
145137
catalog.alterTable(
146138
new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()),
147139
tableChangeList,
148140
true);
149141
}
150142

143+
private List<SchemaChange> applyAddColumnEventWithPosition(AddColumnEvent event)
144+
throws Catalog.TableNotExistException {
145+
List<SchemaChange> tableChangeList = new ArrayList<>();
146+
for (AddColumnEvent.ColumnWithPosition columnWithPosition : event.getAddedColumns()) {
147+
SchemaChange tableChange;
148+
switch (columnWithPosition.getPosition()) {
149+
case FIRST:
150+
tableChange =
151+
SchemaChange.addColumn(
152+
columnWithPosition.getAddColumn().getName(),
153+
LogicalTypeConversion.toDataType(
154+
DataTypeUtils.toFlinkDataType(
155+
columnWithPosition
156+
.getAddColumn()
157+
.getType())
158+
.getLogicalType()),
159+
columnWithPosition.getAddColumn().getComment(),
160+
SchemaChange.Move.first(
161+
columnWithPosition.getAddColumn().getName()));
162+
tableChangeList.add(tableChange);
163+
break;
164+
case LAST:
165+
SchemaChange schemaChangeWithLastPosition =
166+
applyAddColumnWithLastPosition(columnWithPosition);
167+
tableChangeList.add(schemaChangeWithLastPosition);
168+
break;
169+
case BEFORE:
170+
SchemaChange schemaChangeWithBeforePosition =
171+
applyAddColumnWithBeforePosition(
172+
event.tableId().getSchemaName(),
173+
event.tableId().getTableName(),
174+
columnWithPosition);
175+
tableChangeList.add(schemaChangeWithBeforePosition);
176+
break;
177+
case AFTER:
178+
checkNotNull(
179+
columnWithPosition.getExistedColumnName(),
180+
"Existing column name must be provided for AFTER position");
181+
tableChange =
182+
SchemaChange.addColumn(
183+
columnWithPosition.getAddColumn().getName(),
184+
LogicalTypeConversion.toDataType(
185+
DataTypeUtils.toFlinkDataType(
186+
columnWithPosition
187+
.getAddColumn()
188+
.getType())
189+
.getLogicalType()),
190+
columnWithPosition.getAddColumn().getComment(),
191+
SchemaChange.Move.after(
192+
columnWithPosition.getAddColumn().getName(),
193+
columnWithPosition.getExistedColumnName()));
194+
tableChangeList.add(tableChange);
195+
break;
196+
default:
197+
throw new IllegalArgumentException(
198+
"Unknown column position: " + columnWithPosition.getPosition());
199+
}
200+
}
201+
return tableChangeList;
202+
}
203+
204+
private SchemaChange applyAddColumnWithLastPosition(
205+
AddColumnEvent.ColumnWithPosition columnWithPosition) {
206+
return SchemaChange.addColumn(
207+
columnWithPosition.getAddColumn().getName(),
208+
LogicalTypeConversion.toDataType(
209+
DataTypeUtils.toFlinkDataType(columnWithPosition.getAddColumn().getType())
210+
.getLogicalType()),
211+
columnWithPosition.getAddColumn().getComment());
212+
}
213+
214+
private SchemaChange applyAddColumnWithBeforePosition(
215+
String schemaName,
216+
String tableName,
217+
AddColumnEvent.ColumnWithPosition columnWithPosition)
218+
throws Catalog.TableNotExistException {
219+
String existedColumnName = columnWithPosition.getExistedColumnName();
220+
Table table = catalog.getTable(new Identifier(schemaName, tableName));
221+
List<String> columnNames = table.rowType().getFieldNames();
222+
int index = checkColumnPosition(existedColumnName, columnNames);
223+
224+
return SchemaChange.addColumn(
225+
columnWithPosition.getAddColumn().getName(),
226+
LogicalTypeConversion.toDataType(
227+
DataTypeUtils.toFlinkDataType(columnWithPosition.getAddColumn().getType())
228+
.getLogicalType()),
229+
columnWithPosition.getAddColumn().getComment(),
230+
SchemaChange.Move.after(
231+
columnWithPosition.getAddColumn().getName(), columnNames.get(index - 1)));
232+
}
233+
234+
private int checkColumnPosition(String existedColumnName, List<String> columnNames) {
235+
if (existedColumnName == null) {
236+
return 0;
237+
}
238+
int index = columnNames.indexOf(existedColumnName);
239+
checkArgument(index == -1, "Column %s not found", existedColumnName);
240+
return index;
241+
}
242+
151243
private void applyDropColumn(DropColumnEvent event)
152244
throws Catalog.ColumnAlreadyExistException, Catalog.TableNotExistException,
153245
Catalog.ColumnNotExistException {

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,4 +333,87 @@ public void testCreateTableWithAllDataTypes(String metastore)
333333
Assertions.assertEquals(
334334
tableSchema, catalog.getTable(Identifier.fromString("test.table1")).rowType());
335335
}
336+
337+
@ParameterizedTest
338+
@ValueSource(strings = {"filesystem", "hive"})
339+
public void testAddColumnWithPosition(String metastore)
340+
throws Catalog.DatabaseNotEmptyException, Catalog.DatabaseNotExistException,
341+
Catalog.TableNotExistException {
342+
initialize(metastore);
343+
MetadataApplier metadataApplier = new PaimonMetadataApplier(catalogOptions);
344+
345+
CreateTableEvent createTableEvent =
346+
new CreateTableEvent(
347+
TableId.parse("test.table1"),
348+
org.apache.flink.cdc.common.schema.Schema.newBuilder()
349+
.physicalColumn(
350+
"col1",
351+
org.apache.flink.cdc.common.types.DataTypes.STRING()
352+
.notNull())
353+
.physicalColumn(
354+
"col2", org.apache.flink.cdc.common.types.DataTypes.INT())
355+
.primaryKey("col1")
356+
.build());
357+
metadataApplier.applySchemaChange(createTableEvent);
358+
359+
List<AddColumnEvent.ColumnWithPosition> addedColumns = new ArrayList<>();
360+
addedColumns.add(
361+
new AddColumnEvent.ColumnWithPosition(
362+
Column.physicalColumn(
363+
"col3",
364+
org.apache.flink.cdc.common.types.DataTypes
365+
.STRING()))); // default last position.
366+
AddColumnEvent addColumnEvent =
367+
new AddColumnEvent(TableId.parse("test.table1"), addedColumns);
368+
metadataApplier.applySchemaChange(addColumnEvent);
369+
RowType tableSchema =
370+
new RowType(
371+
Arrays.asList(
372+
new DataField(0, "col1", DataTypes.STRING().notNull()),
373+
new DataField(1, "col2", DataTypes.INT()),
374+
new DataField(2, "col3", DataTypes.STRING())));
375+
376+
Assertions.assertEquals(
377+
tableSchema, catalog.getTable(Identifier.fromString("test.table1")).rowType());
378+
379+
addedColumns.clear();
380+
addedColumns.add(
381+
AddColumnEvent.first(
382+
Column.physicalColumn(
383+
"col4_first",
384+
org.apache.flink.cdc.common.types.DataTypes.STRING())));
385+
addedColumns.add(
386+
AddColumnEvent.last(
387+
Column.physicalColumn(
388+
"col5_last",
389+
org.apache.flink.cdc.common.types.DataTypes.STRING())));
390+
addedColumns.add(
391+
AddColumnEvent.before(
392+
Column.physicalColumn(
393+
"col6_before",
394+
org.apache.flink.cdc.common.types.DataTypes.STRING()),
395+
"col2"));
396+
addedColumns.add(
397+
AddColumnEvent.after(
398+
Column.physicalColumn(
399+
"col7_after", org.apache.flink.cdc.common.types.DataTypes.STRING()),
400+
"col2"));
401+
402+
addColumnEvent = new AddColumnEvent(TableId.parse("test.table1"), addedColumns);
403+
metadataApplier.applySchemaChange(addColumnEvent);
404+
405+
tableSchema =
406+
new RowType(
407+
Arrays.asList(
408+
new DataField(3, "col4_first", DataTypes.STRING()),
409+
new DataField(0, "col1", DataTypes.STRING().notNull()),
410+
new DataField(5, "col6_before", DataTypes.STRING()),
411+
new DataField(1, "col2", DataTypes.INT()),
412+
new DataField(6, "col7_after", DataTypes.STRING()),
413+
new DataField(2, "col3", DataTypes.STRING()),
414+
new DataField(4, "col5_last", DataTypes.STRING())));
415+
416+
Assertions.assertEquals(
417+
tableSchema, catalog.getTable(Identifier.fromString("test.table1")).rowType());
418+
}
336419
}

0 commit comments

Comments
 (0)