Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.flink.cdc.common.utils.StringUtils;

import java.util.Objects;
import java.util.Optional;

/**
* Definition of a transformation.
Expand Down Expand Up @@ -75,24 +74,24 @@ public String getSourceTable() {
return sourceTable;
}

public Optional<String> getProjection() {
return Optional.ofNullable(projection);
public String getProjection() {
return projection;
}

public boolean isValidProjection() {
return !StringUtils.isNullOrWhitespaceOnly(projection);
}

public Optional<String> getFilter() {
return Optional.ofNullable(filter);
public String getFilter() {
return filter;
}

public boolean isValidFilter() {
return !StringUtils.isNullOrWhitespaceOnly(filter);
}

public Optional<String> getDescription() {
return Optional.ofNullable(description);
public String getDescription() {
return description;
}

public String getPrimaryKeys() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ public DataStream<Event> translatePreTransform(
for (TransformDef transform : transforms) {
preTransformFunctionBuilder.addTransform(
transform.getSourceTable(),
transform.getProjection().orElse(null),
transform.getFilter().orElse(null),
transform.getProjection(),
transform.getFilter(),
transform.getPrimaryKeys(),
transform.getPartitionKeys(),
transform.getTableOptions(),
Expand Down Expand Up @@ -98,8 +98,8 @@ public DataStream<Event> translatePostTransform(
if (transform.isValidProjection() || transform.isValidFilter()) {
postTransformFunctionBuilder.addTransform(
transform.getSourceTable(),
transform.isValidProjection() ? transform.getProjection().get() : null,
transform.isValidFilter() ? transform.getFilter().get() : null,
transform.getProjection(),
transform.getFilter(),
transform.getPrimaryKeys(),
transform.getPartitionKeys(),
transform.getTableOptions(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@

import static org.apache.flink.configuration.CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Integration test for {@link FlinkPipelineComposer}. */
class FlinkPipelineTransformITCase {
Expand Down Expand Up @@ -196,8 +197,8 @@ void testMultipleReferencedColumnsInFilter(ValuesDataSink.SinkApi sinkApi) throw
null,
null)),
Arrays.asList(
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student], op=INSERT, meta=()}"));
}

Expand All @@ -218,9 +219,9 @@ void testFilteringRules(ValuesDataSink.SinkApi sinkApi) throws Exception {
null,
null)),
Arrays.asList(
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18], op=INSERT, meta=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student], after=[], op=DELETE, meta=()}"));
Expand Down Expand Up @@ -370,6 +371,151 @@ void testMultiTransformMissingProjection(ValuesDataSink.SinkApi sinkApi) throws
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, DERRIDA, 25, student], after=[], op=DELETE, meta=()}"));
}

@ParameterizedTest
@EnumSource
@Disabled("to be fixed in FLINK-37132")
void testMultiTransformSchemaColumnsCompatibilityWithNullProjection(
ValuesDataSink.SinkApi sinkApi) {
TransformDef nullProjection =
new TransformDef(
"default_namespace.default_schema.mytable2",
null,
"age < 18",
null,
null,
null,
null,
null);

assertThatThrownBy(
() ->
runGenericTransformTest(
sinkApi,
Arrays.asList(
nullProjection,
new TransformDef(
"default_namespace.default_schema.mytable2",
// reference part column
"id,UPPER(name) AS name",
"age >= 18",
null,
null,
null,
null,
null)),
Collections.emptyList()))
.rootCause()
.isExactlyInstanceOf(IllegalStateException.class)
.hasMessage(
"Unable to merge schema columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=() "
+ "and columns={`id` BIGINT,`name` STRING}, primaryKeys=id, options=() with different column counts.");
}

@ParameterizedTest
@EnumSource
@Disabled("to be fixed in FLINK-37132")
void testMultiTransformSchemaColumnsCompatibilityWithEmptyProjection(
ValuesDataSink.SinkApi sinkApi) {
TransformDef emptyProjection =
new TransformDef(
"default_namespace.default_schema.mytable2",
"",
"age < 18",
null,
null,
null,
null,
null);

assertThatThrownBy(
() ->
runGenericTransformTest(
sinkApi,
Arrays.asList(
emptyProjection,
new TransformDef(
"default_namespace.default_schema.mytable2",
// reference part column
"id,UPPER(name) AS name",
"age >= 18",
null,
null,
null,
null,
null)),
Collections.emptyList()))
.rootCause()
.isExactlyInstanceOf(IllegalStateException.class)
.hasMessage(
"Unable to merge schema columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=() "
+ "and columns={`id` BIGINT,`name` STRING}, primaryKeys=id, options=() with different column counts.");
}

@ParameterizedTest
@EnumSource
void testMultiTransformWithNullEmptyAsteriskProjections(ValuesDataSink.SinkApi sinkApi)
throws Exception {
TransformDef nullProjection =
new TransformDef(
"default_namespace.default_schema.mytable2",
null,
"age < 18",
null,
null,
null,
null,
null);

TransformDef emptyProjection =
new TransformDef(
"default_namespace.default_schema.mytable2",
"",
"age < 18",
null,
null,
null,
null,
null);

TransformDef asteriskProjection =
new TransformDef(
"default_namespace.default_schema.mytable2",
"*",
"age < 18",
null,
null,
null,
null,
null);

runGenericTransformTest(
sinkApi,
Arrays.asList(
// Setting projection as null, '', or * should be equivalent
nullProjection,
emptyProjection,
asteriskProjection,
new TransformDef(
"default_namespace.default_schema.mytable2",
// reference all column
"id,UPPER(name) AS name,age,description",
"age >= 18",
null,
null,
null,
null,
null)),
Arrays.asList(
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` STRING,`age` TINYINT,`description` STRING}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, DERRIDA, 25, student], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, DERRIDA, 25, student], after=[], op=DELETE, meta=()}"));
}

/** This tests if transform generates metadata info correctly. */
@ParameterizedTest
@EnumSource
Expand Down Expand Up @@ -1447,7 +1593,7 @@ void testTransformWithFilterButNoProjection() throws Exception {
assertThat(outputEvents)
.containsExactly(
// Initial stage
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Barcarolle, 22], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3, Cecily, 23], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3, Cecily, 23], after=[3, Colin, 24], op=UPDATE, meta=()}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,17 +389,8 @@ private void cachePreTransformProcessor(TableId tableId, Schema tableSchema) {
if (!transform.getSelectors().isMatch(tableId)) {
continue;
}
if (!transform.getProjection().isPresent()) {
processProjectionTransform(tableId, tableSchema, referencedColumnsSet, null);
hasMatchTransform = true;
} else {
TransformProjection transformProjection = transform.getProjection().get();
if (transformProjection.isValid()) {
processProjectionTransform(
tableId, tableSchema, referencedColumnsSet, transform);
hasMatchTransform = true;
}
}
processProjectionTransform(tableId, tableSchema, referencedColumnsSet, transform);
hasMatchTransform = true;
}
if (!hasMatchTransform) {
processProjectionTransform(tableId, tableSchema, referencedColumnsSet, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.cdc.runtime.operators.transform;

import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
import org.apache.flink.cdc.common.utils.StringUtils;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -48,7 +49,7 @@ public TransformRule(
@Nullable String postTransformConverter,
SupportedMetadataColumn[] supportedMetadataColumns) {
this.tableInclusions = tableInclusions;
this.projection = projection;
this.projection = StringUtils.isNullOrWhitespaceOnly(projection) ? "*" : projection;
this.filter = normalizeFilter(projection, filter);
this.primaryKey = primaryKey;
this.partitionKey = partitionKey;
Expand Down
Loading