Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.utils.StringUtils;
import org.apache.flink.cdc.composer.definition.PipelineDef;
import org.apache.flink.cdc.composer.definition.RouteDef;
Expand All @@ -35,11 +37,13 @@

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR;
import static org.apache.flink.cdc.common.utils.ChangeEventUtils.resolveSchemaEvolutionOptions;
import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;

Expand Down Expand Up @@ -99,6 +103,19 @@ public PipelineDef parse(String pipelineDefText, Configuration globalPipelineCon

private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipelineConfig)
throws Exception {

// UDFs are optional. We parse UDF first and remove it from the pipelineDefJsonNode since
// it's not of plain data types and must be removed before calling toPipelineConfig.
List<UdfDef> udfDefs = new ArrayList<>();
Optional.ofNullable(((ObjectNode) pipelineDefJsonNode.get(PIPELINE_KEY)).remove(UDF_KEY))
.ifPresent(node -> node.forEach(udf -> udfDefs.add(toUdfDef(udf))));

// Pipeline configs are optional
Configuration userPipelineConfig = toPipelineConfig(pipelineDefJsonNode.get(PIPELINE_KEY));

SchemaChangeBehavior schemaChangeBehavior =
userPipelineConfig.get(PIPELINE_SCHEMA_CHANGE_BEHAVIOR);

// Source is required
SourceDef sourceDef =
toSourceDef(
Expand All @@ -113,7 +130,8 @@ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipe
checkNotNull(
pipelineDefJsonNode.get(SINK_KEY),
"Missing required field \"%s\" in pipeline definition",
SINK_KEY));
SINK_KEY),
schemaChangeBehavior);

// Transforms are optional
List<TransformDef> transformDefs = new ArrayList<>();
Expand All @@ -128,14 +146,6 @@ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipe
Optional.ofNullable(pipelineDefJsonNode.get(ROUTE_KEY))
.ifPresent(node -> node.forEach(route -> routeDefs.add(toRouteDef(route))));

// UDFs are optional
List<UdfDef> udfDefs = new ArrayList<>();
Optional.ofNullable(((ObjectNode) pipelineDefJsonNode.get(PIPELINE_KEY)).remove(UDF_KEY))
.ifPresent(node -> node.forEach(udf -> udfDefs.add(toUdfDef(udf))));

// Pipeline configs are optional
Configuration userPipelineConfig = toPipelineConfig(pipelineDefJsonNode.get(PIPELINE_KEY));

// Merge user config into global config
Configuration pipelineConfig = new Configuration();
pipelineConfig.addAll(globalPipelineConfig);
Expand All @@ -162,7 +172,7 @@ private SourceDef toSourceDef(JsonNode sourceNode) {
return new SourceDef(type, name, Configuration.fromMap(sourceMap));
}

private SinkDef toSinkDef(JsonNode sinkNode) {
private SinkDef toSinkDef(JsonNode sinkNode, SchemaChangeBehavior schemaChangeBehavior) {
List<String> includedSETypes = new ArrayList<>();
List<String> excludedSETypes = new ArrayList<>();

Expand All @@ -172,6 +182,14 @@ private SinkDef toSinkDef(JsonNode sinkNode) {
Optional.ofNullable(sinkNode.get(EXCLUDE_SCHEMA_EVOLUTION_TYPES))
.ifPresent(e -> e.forEach(tag -> excludedSETypes.add(tag.asText())));

if (includedSETypes.isEmpty()) {
// If no schema evolution types are specified, include all schema evolution types by
// default.
Arrays.stream(SchemaChangeEventTypeFamily.ALL)
.map(SchemaChangeEventType::getTag)
.forEach(includedSETypes::add);
}

Set<SchemaChangeEventType> declaredSETypes =
resolveSchemaEvolutionOptions(includedSETypes, excludedSETypes);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.cdc.composer.definition.UdfDef;

import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet;
import org.apache.flink.shaded.guava31.com.google.common.io.Resources;

import org.junit.jupiter.api.Test;
Expand All @@ -37,6 +38,11 @@
import java.util.Arrays;
import java.util.Collections;

import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ADD_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.ALTER_COLUMN_TYPE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.CREATE_TABLE;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.DROP_COLUMN;
import static org.apache.flink.cdc.common.event.SchemaChangeEventType.RENAME_COLUMN;
import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
Expand Down Expand Up @@ -384,7 +390,13 @@ void testParsingFullDefinitionFromString() throws Exception {
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("bootstrap-servers", "localhost:9092")
.build())),
.build()),
ImmutableSet.of(
DROP_COLUMN,
ALTER_COLUMN_TYPE,
ADD_COLUMN,
CREATE_TABLE,
RENAME_COLUMN)),
Collections.singletonList(
new RouteDef(
"mydb.default.app_order_.*",
Expand All @@ -401,7 +413,16 @@ void testParsingFullDefinitionFromString() throws Exception {
private final PipelineDef minimizedDef =
new PipelineDef(
new SourceDef("mysql", null, new Configuration()),
new SinkDef("kafka", null, new Configuration()),
new SinkDef(
"kafka",
null,
new Configuration(),
ImmutableSet.of(
DROP_COLUMN,
ALTER_COLUMN_TYPE,
ADD_COLUMN,
CREATE_TABLE,
RENAME_COLUMN)),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Expand Down Expand Up @@ -474,7 +495,16 @@ void testParsingFullDefinitionFromString() throws Exception {
private final PipelineDef pipelineDefWithUdf =
new PipelineDef(
new SourceDef("values", null, new Configuration()),
new SinkDef("values", null, new Configuration()),
new SinkDef(
"values",
null,
new Configuration(),
ImmutableSet.of(
DROP_COLUMN,
ALTER_COLUMN_TYPE,
ADD_COLUMN,
CREATE_TABLE,
RENAME_COLUMN)),
Collections.emptyList(),
Collections.singletonList(
new TransformDef(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,21 @@
/** An enumeration of schema change event types for {@link SchemaChangeEvent}. */
@PublicEvolving
public enum SchemaChangeEventType {
ADD_COLUMN,
ALTER_COLUMN_TYPE,
CREATE_TABLE,
DROP_COLUMN,
RENAME_COLUMN;
ADD_COLUMN("add.column"),
ALTER_COLUMN_TYPE("alter.column.type"),
CREATE_TABLE("create.table"),
DROP_COLUMN("drop.column"),
RENAME_COLUMN("rename.column");

private final String tag;

SchemaChangeEventType(String tag) {
this.tag = tag;
}

public String getTag() {
return tag;
}

public static SchemaChangeEventType ofEvent(SchemaChangeEvent event) {
if (event instanceof AddColumnEvent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@

/** An exception occurred during schema evolution. */
public class SchemaEvolveException extends FlinkRuntimeException {
private final SchemaChangeEvent applyingEvent;
private final String exceptionMessage;
private final @Nullable Throwable cause;
protected final SchemaChangeEvent applyingEvent;
protected final String exceptionMessage;
protected final @Nullable Throwable cause;

public SchemaEvolveException(SchemaChangeEvent applyingEvent, String exceptionMessage) {
this(applyingEvent, exceptionMessage, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,36 @@

import org.apache.flink.cdc.common.event.SchemaChangeEvent;

import javax.annotation.Nullable;

/** A special kind of {@link SchemaEvolveException} that sink doesn't support such event type. */
public class UnsupportedSchemaChangeEventException extends SchemaEvolveException {

public UnsupportedSchemaChangeEventException(SchemaChangeEvent applyingEvent) {
super(applyingEvent, "Sink doesn't support such schema change event.", null);
this(applyingEvent, "Sink doesn't support such schema change event.");
}

public UnsupportedSchemaChangeEventException(
SchemaChangeEvent applyingEvent, String exceptionMessage) {
this(applyingEvent, exceptionMessage, null);
}

public UnsupportedSchemaChangeEventException(
SchemaChangeEvent applyingEvent, String exceptionMessage, @Nullable Throwable cause) {
super(applyingEvent, exceptionMessage, cause);
}

@Override
public String toString() {
return "UnsupportedSchemaChangeEventException{"
+ "applyingEvent="
+ applyingEvent
+ ", exceptionMessage='"
+ exceptionMessage
+ '\''
+ ", cause='"
+ cause
+ '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class PipelineOptions {
public static final ConfigOption<SchemaChangeBehavior> PIPELINE_SCHEMA_CHANGE_BEHAVIOR =
ConfigOptions.key("schema.change.behavior")
.enumType(SchemaChangeBehavior.class)
.defaultValue(SchemaChangeBehavior.EVOLVE)
.defaultValue(SchemaChangeBehavior.LENIENT)
.withDescription(
Description.builder()
.text("Behavior for handling schema change events. ")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,8 @@ public static Set<SchemaChangeEventType> resolveSchemaEvolutionOptions(
List<String> includedSchemaEvolutionTypes, List<String> excludedSchemaEvolutionTypes) {
List<SchemaChangeEventType> resultTypes = new ArrayList<>();

if (includedSchemaEvolutionTypes.isEmpty()) {
resultTypes.addAll(Arrays.asList(SchemaChangeEventTypeFamily.ALL));
} else {
for (String includeTag : includedSchemaEvolutionTypes) {
resultTypes.addAll(resolveSchemaEvolutionTag(includeTag));
}
for (String includeTag : includedSchemaEvolutionTypes) {
resultTypes.addAll(resolveSchemaEvolutionTag(includeTag));
}

for (String excludeTag : excludedSchemaEvolutionTypes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -352,4 +354,73 @@ private static Schema applyAlterColumnTypeEvent(AlterColumnTypeEvent event, Sche
});
return oldSchema.copy(columns);
}

public static Optional<SchemaChangeEvent> transformSchemaChangeEvent(
boolean hasAsterisk, List<String> referencedColumns, SchemaChangeEvent event) {
Optional<SchemaChangeEvent> evolvedSchemaChangeEvent = Optional.empty();
if (event instanceof AddColumnEvent) {
// Send add column events to downstream iff there's an asterisk
if (hasAsterisk) {
List<AddColumnEvent.ColumnWithPosition> addedColumns =
((AddColumnEvent) event)
.getAddedColumns().stream()
.map(
e -> {
if (AddColumnEvent.ColumnPosition.LAST.equals(
e.getPosition())) {
return new AddColumnEvent
.ColumnWithPosition(
e.getAddColumn(),
AddColumnEvent.ColumnPosition.AFTER,
referencedColumns.get(
referencedColumns.size()
- 1));
} else if (AddColumnEvent.ColumnPosition.FIRST
.equals(e.getPosition())) {
return new AddColumnEvent
.ColumnWithPosition(
e.getAddColumn(),
AddColumnEvent.ColumnPosition
.BEFORE,
referencedColumns.get(0));
} else {
return e;
}
})
.collect(Collectors.toList());
evolvedSchemaChangeEvent =
Optional.of(new AddColumnEvent(event.tableId(), addedColumns));
}
} else if (event instanceof AlterColumnTypeEvent) {
AlterColumnTypeEvent alterColumnTypeEvent = (AlterColumnTypeEvent) event;
if (hasAsterisk) {
// In wildcard mode, all alter column type events should be sent to downstream
evolvedSchemaChangeEvent = Optional.of(event);
} else {
// Or, we need to filter out those referenced columns and reconstruct
// SchemaChangeEvents
Map<String, DataType> newDataTypeMap =
alterColumnTypeEvent.getTypeMapping().entrySet().stream()
.filter(e -> referencedColumns.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
if (!newDataTypeMap.isEmpty()) {
evolvedSchemaChangeEvent =
Optional.of(
new AlterColumnTypeEvent(
alterColumnTypeEvent.tableId(), newDataTypeMap));
}
}
} else if (event instanceof RenameColumnEvent) {
if (hasAsterisk) {
evolvedSchemaChangeEvent = Optional.of(event);
}
} else if (event instanceof DropColumnEvent) {
if (hasAsterisk) {
evolvedSchemaChangeEvent = Optional.of(event);
}
} else {
evolvedSchemaChangeEvent = Optional.of(event);
}
return evolvedSchemaChangeEvent;
}
}
Loading