Skip to content

Commit ae986ab

Browse files
authored
[FLINK-36609][pipeline-connector][paimon] Add partition columns to primary keys
This closes #3641.
1 parent b5ab385 commit ae986ab

File tree

2 files changed

+45
-8
lines changed

2 files changed

+45
-8
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplier.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -164,14 +164,22 @@ private void applyCreateTable(CreateTableEvent event) throws SchemaEvolveExcepti
164164
LogicalTypeConversion.toDataType(
165165
DataTypeUtils.toFlinkDataType(column.getType())
166166
.getLogicalType())));
167-
builder.primaryKey(schema.primaryKeys().toArray(new String[0]));
167+
List<String> partitionKeys = new ArrayList<>();
168+
List<String> primaryKeys = schema.primaryKeys();
168169
if (partitionMaps.containsKey(event.tableId())) {
169-
builder.partitionKeys(partitionMaps.get(event.tableId()));
170+
partitionKeys.addAll(partitionMaps.get(event.tableId()));
170171
} else if (schema.partitionKeys() != null && !schema.partitionKeys().isEmpty()) {
171-
builder.partitionKeys(schema.partitionKeys());
172+
partitionKeys.addAll(schema.partitionKeys());
172173
}
173-
builder.options(tableOptions);
174-
builder.options(schema.options());
174+
for (String partitionColumn : partitionKeys) {
175+
if (!primaryKeys.contains(partitionColumn)) {
176+
primaryKeys.add(partitionColumn);
177+
}
178+
}
179+
builder.partitionKeys(partitionKeys)
180+
.primaryKey(primaryKeys)
181+
.options(tableOptions)
182+
.options(schema.options());
175183
catalog.createTable(
176184
new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName()),
177185
builder.build(),

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonMetadataApplierTest.java

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,35 @@ public void testApplySchemaChange(String metastore)
177177
new DataField(2, "newcol3", DataTypes.STRING())));
178178
Assertions.assertEquals(
179179
tableSchema, catalog.getTable(Identifier.fromString("test.table1")).rowType());
180+
181+
// Create table with partition column.
182+
createTableEvent =
183+
new CreateTableEvent(
184+
TableId.parse("test.table_with_partition"),
185+
org.apache.flink.cdc.common.schema.Schema.newBuilder()
186+
.physicalColumn(
187+
"col1",
188+
org.apache.flink.cdc.common.types.DataTypes.STRING()
189+
.notNull())
190+
.physicalColumn(
191+
"col2", org.apache.flink.cdc.common.types.DataTypes.INT())
192+
.physicalColumn(
193+
"dt",
194+
org.apache.flink.cdc.common.types.DataTypes.INT().notNull())
195+
.primaryKey("col1")
196+
.partitionKey("dt")
197+
.build());
198+
metadataApplier.applySchemaChange(createTableEvent);
199+
tableSchema =
200+
new RowType(
201+
Arrays.asList(
202+
new DataField(0, "col1", DataTypes.STRING().notNull()),
203+
new DataField(1, "col2", DataTypes.INT()),
204+
new DataField(2, "dt", DataTypes.INT().notNull())));
205+
Table tableWithPartition =
206+
catalog.getTable(Identifier.fromString("test.table_with_partition"));
207+
Assertions.assertEquals(tableSchema, tableWithPartition.rowType());
208+
Assertions.assertEquals(Arrays.asList("col1", "dt"), tableWithPartition.primaryKeys());
180209
}
181210

182211
@ParameterizedTest
@@ -217,10 +246,10 @@ public void testCreateTableWithOptions(String metastore)
217246
Arrays.asList(
218247
new DataField(0, "col1", DataTypes.STRING().notNull()),
219248
new DataField(1, "col2", DataTypes.STRING()),
220-
new DataField(2, "col3", DataTypes.STRING()),
221-
new DataField(3, "col4", DataTypes.STRING())));
249+
new DataField(2, "col3", DataTypes.STRING().notNull()),
250+
new DataField(3, "col4", DataTypes.STRING().notNull())));
222251
Assertions.assertEquals(tableSchema, table.rowType());
223-
Assertions.assertEquals(Collections.singletonList("col1"), table.primaryKeys());
252+
Assertions.assertEquals(Arrays.asList("col1", "col3", "col4"), table.primaryKeys());
224253
Assertions.assertEquals(Arrays.asList("col3", "col4"), table.partitionKeys());
225254
Assertions.assertEquals("-1", table.options().get("bucket"));
226255
}

0 commit comments

Comments
 (0)