Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -213,28 +213,23 @@ private List<SchemaChange> applyAddColumnEventWithPosition(AddColumnEvent event)
try {
List<SchemaChange> tableChangeList = new ArrayList<>();
for (AddColumnEvent.ColumnWithPosition columnWithPosition : event.getAddedColumns()) {
SchemaChange tableChange;
switch (columnWithPosition.getPosition()) {
case FIRST:
tableChange =
tableChangeList.addAll(
SchemaChangeProvider.add(
columnWithPosition,
SchemaChange.Move.first(
columnWithPosition.getAddColumn().getName()));
tableChangeList.add(tableChange);
columnWithPosition.getAddColumn().getName())));
break;
case LAST:
SchemaChange schemaChangeWithLastPosition =
SchemaChangeProvider.add(columnWithPosition);
tableChangeList.add(schemaChangeWithLastPosition);
tableChangeList.addAll(SchemaChangeProvider.add(columnWithPosition));
break;
case BEFORE:
SchemaChange schemaChangeWithBeforePosition =
tableChangeList.addAll(
applyAddColumnWithBeforePosition(
event.tableId().getSchemaName(),
event.tableId().getTableName(),
columnWithPosition);
tableChangeList.add(schemaChangeWithBeforePosition);
columnWithPosition));
break;
case AFTER:
checkNotNull(
Expand All @@ -244,8 +239,7 @@ private List<SchemaChange> applyAddColumnEventWithPosition(AddColumnEvent event)
SchemaChange.Move.after(
columnWithPosition.getAddColumn().getName(),
columnWithPosition.getExistedColumnName());
tableChange = SchemaChangeProvider.add(columnWithPosition, after);
tableChangeList.add(tableChange);
tableChangeList.addAll(SchemaChangeProvider.add(columnWithPosition, after));
break;
default:
throw new SchemaEvolveException(
Expand All @@ -259,7 +253,7 @@ private List<SchemaChange> applyAddColumnEventWithPosition(AddColumnEvent event)
}
}

private SchemaChange applyAddColumnWithBeforePosition(
private List<SchemaChange> applyAddColumnWithBeforePosition(
String schemaName,
String tableName,
AddColumnEvent.ColumnWithPosition columnWithPosition)
Expand Down Expand Up @@ -288,7 +282,7 @@ private void applyDropColumn(DropColumnEvent event) throws SchemaEvolveException
try {
List<SchemaChange> tableChangeList = new ArrayList<>();
event.getDroppedColumnNames()
.forEach((column) -> tableChangeList.add(SchemaChangeProvider.drop(column)));
.forEach((column) -> tableChangeList.addAll(SchemaChangeProvider.drop(column)));
catalog.alterTable(tableIdToIdentifier(event), tableChangeList, true);
} catch (Catalog.TableNotExistException
| Catalog.ColumnAlreadyExistException
Expand All @@ -299,12 +293,19 @@ private void applyDropColumn(DropColumnEvent event) throws SchemaEvolveException

private void applyRenameColumn(RenameColumnEvent event) throws SchemaEvolveException {
try {
Map<String, String> options =
catalog.getTable(
new Identifier(
event.tableId().getSchemaName(),
event.tableId().getTableName()))
.options();
List<SchemaChange> tableChangeList = new ArrayList<>();
event.getNameMapping()
.forEach(
(oldName, newName) ->
tableChangeList.add(
SchemaChangeProvider.rename(oldName, newName)));
tableChangeList.addAll(
SchemaChangeProvider.rename(
oldName, newName, options)));
catalog.alterTable(tableIdToIdentifier(event), tableChangeList, true);
} catch (Catalog.TableNotExistException
| Catalog.ColumnAlreadyExistException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,19 @@
package org.apache.flink.cdc.connectors.paimon.sink;

import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.schema.SchemaChange;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* The SchemaChangeProvider class provides static methods to create SchemaChange objects that
* represent different types of schema modifications.
Expand All @@ -37,13 +44,31 @@ public class SchemaChangeProvider {
* intended position within the schema.
* @return A SchemaChange object representing the addition of a column.
*/
public static SchemaChange add(AddColumnEvent.ColumnWithPosition columnWithPosition) {
return SchemaChange.addColumn(
columnWithPosition.getAddColumn().getName(),
LogicalTypeConversion.toDataType(
DataTypeUtils.toFlinkDataType(columnWithPosition.getAddColumn().getType())
.getLogicalType()),
columnWithPosition.getAddColumn().getComment());
public static List<SchemaChange> add(AddColumnEvent.ColumnWithPosition columnWithPosition) {
List<SchemaChange> result = new ArrayList<>();
result.add(
SchemaChange.addColumn(
columnWithPosition.getAddColumn().getName(),
LogicalTypeConversion.toDataType(
DataTypeUtils.toFlinkDataType(
columnWithPosition.getAddColumn().getType())
.getLogicalType()),
columnWithPosition.getAddColumn().getComment()));
// if default value express exists, we need to set the default value to the table
// option
Column column = columnWithPosition.getAddColumn();
Optional.ofNullable(column.getDefaultValueExpression())
.ifPresent(
value -> {
String key =
String.format(
"%s.%s.%s",
CoreOptions.FIELDS_PREFIX,
column.getName(),
CoreOptions.DEFAULT_VALUE_SUFFIX);
result.add(SchemaChangeProvider.setOption(key, value));
});
return result;
}

/**
Expand All @@ -55,15 +80,33 @@ public static SchemaChange add(AddColumnEvent.ColumnWithPosition columnWithPosit
* @return A SchemaChange object representing the addition of a column with position
* information.
*/
public static SchemaChange add(
public static List<SchemaChange> add(
AddColumnEvent.ColumnWithPosition columnWithPosition, SchemaChange.Move move) {
return SchemaChange.addColumn(
columnWithPosition.getAddColumn().getName(),
LogicalTypeConversion.toDataType(
DataTypeUtils.toFlinkDataType(columnWithPosition.getAddColumn().getType())
.getLogicalType()),
columnWithPosition.getAddColumn().getComment(),
move);
List<SchemaChange> result = new ArrayList<>();
result.add(
SchemaChange.addColumn(
columnWithPosition.getAddColumn().getName(),
LogicalTypeConversion.toDataType(
DataTypeUtils.toFlinkDataType(
columnWithPosition.getAddColumn().getType())
.getLogicalType()),
columnWithPosition.getAddColumn().getComment(),
move));
// if default value express exists, we need to set the default value to the table
// option
Column column = columnWithPosition.getAddColumn();
Optional.ofNullable(column.getDefaultValueExpression())
.ifPresent(
value -> {
String key =
String.format(
"%s.%s.%s",
CoreOptions.FIELDS_PREFIX,
column.getName(),
CoreOptions.DEFAULT_VALUE_SUFFIX);
result.add(SchemaChangeProvider.setOption(key, value));
});
return result;
}

/**
Expand All @@ -87,8 +130,16 @@ public static SchemaChange updateColumnType(String oldColumnName, DataType newTy
* @param newColumnName The new name for the column.
* @return A SchemaChange object representing the renaming of a column.
*/
public static SchemaChange rename(String oldColumnName, String newColumnName) {
return SchemaChange.renameColumn(oldColumnName, newColumnName);
public static List<SchemaChange> rename(
String oldColumnName, String newColumnName, Map<String, String> options) {
List<SchemaChange> result = new ArrayList<>();
result.add(SchemaChange.renameColumn(oldColumnName, newColumnName));
String defaultValue = options.get(defaultValueOptionKey(oldColumnName));
if (defaultValue != null) {
result.add(SchemaChange.removeOption(defaultValueOptionKey(oldColumnName)));
result.add(SchemaChange.setOption(defaultValueOptionKey(newColumnName), defaultValue));
}
return result;
}

/**
Expand All @@ -97,7 +148,27 @@ public static SchemaChange rename(String oldColumnName, String newColumnName) {
* @param columnName The name of the column to be dropped.
* @return A SchemaChange object representing the deletion of a column.
*/
public static SchemaChange drop(String columnName) {
return SchemaChange.dropColumn(columnName);
public static List<SchemaChange> drop(String columnName) {
List<SchemaChange> result = new ArrayList<>();
result.add(SchemaChange.dropColumn(columnName));
result.add(SchemaChange.removeOption(defaultValueOptionKey(columnName)));
return result;
}

public static String defaultValueOptionKey(String columnName) {
return String.format(
"%s.%s.%s",
CoreOptions.FIELDS_PREFIX, columnName, CoreOptions.DEFAULT_VALUE_SUFFIX);
}

/**
* Creates a SchemaChange object for setting an option.
*
* @param key The key of the option to be set.
* @param value The value of the option to be set.
* @return A SchemaChange object representing the setting of an option.
*/
public static SchemaChange setOption(String key, String value) {
return SchemaChange.setOption(key, value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ public void testApplySchemaChange(String metastore)
addedColumns.add(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn(
"col3", org.apache.flink.cdc.common.types.DataTypes.STRING())));
"col3",
org.apache.flink.cdc.common.types.DataTypes.STRING(),
null,
"col3DefValue")));
AddColumnEvent addColumnEvent =
new AddColumnEvent(TableId.parse("test.table1"), addedColumns);
metadataApplier.applySchemaChange(addColumnEvent);
Expand All @@ -137,6 +140,12 @@ public void testApplySchemaChange(String metastore)
Assertions.assertEquals(
tableSchema, catalog.getTable(Identifier.fromString("test.table1")).rowType());

Assertions.assertEquals(
"col3DefValue",
catalog.getTable(Identifier.fromString("test.table1"))
.options()
.get("fields.col3.default-value"));

Map<String, String> nameMapping = new HashMap<>();
nameMapping.put("col2", "newcol2");
nameMapping.put("col3", "newcol3");
Expand Down
Loading