Skip to content

Commit

Permalink
[FLINK-22748] Allow dynamic target topic selection in SQL Kafka sinks (
Browse files Browse the repository at this point in the history
…#109)

[FLINK-22748][connector-kafka] Allow dynamic target topic selection in SQL Kafka sinks

Allows writing to different Kafka topics based on the topic metadata column value in SQL, and updates the Table API's KafkaDynamicSink to accept a List<String> topics instead of String topic as well as support topic-pattern. The list acts as an allow-list of acceptable values for the topic metadata column. topic-pattern for sinks is a pattern that must match topic metadata column values, or else an error is thrown.

If a single topic is provided, it is used by default for the target topic to produce to
If a list is provided, only that list of topics can be produced to
If a topic pattern is provided, it must match topic metadata column values
  • Loading branch information
klam-shop authored Sep 6, 2024
1 parent 268f6b5 commit 3730005
Show file tree
Hide file tree
Showing 14 changed files with 648 additions and 132 deletions.
8 changes: 4 additions & 4 deletions docs/content.zh/docs/connectors/table/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ CREATE TABLE KafkaTable (
<td><code>topic</code></td>
<td><code>STRING NOT NULL</code></td>
<td>Kafka 记录的 Topic 名。</td>
<td><code>R</code></td>
<td><code>R/W</code></td>
</tr>
<tr>
<td><code>partition</code></td>
Expand Down Expand Up @@ -191,17 +191,17 @@ CREATE TABLE KafkaTable (
</tr>
<tr>
<td><h5>topic</h5></td>
<td>required for sink</td>
<td>可选</td>
<td style="word-wrap: break-word;">(无)</td>
<td>String</td>
<td>当表用作 source 时读取数据的 topic 名。亦支持用分号间隔的 topic 列表,如 <code>'topic-1;topic-2'</code>。注意,对 source 表而言,'topic' 和 'topic-pattern' 两个选项只能使用其中一个。当表被用作 sink 时,该配置表示写入的 topic 名。注意 sink 表不支持 topic 列表。</td>
<td>当表用作 source 时读取数据的 topic 名,或当表用作 sink 时写入的 topic 名。它还支持通过分号分隔的 topic 列表,如 <code>'topic-1;topic-2'</code> 来作为 source topic 列表。注意,“topic-pattern”和“topic”只能指定其中一个。对于 sink 来说,topic 名是写入数据的 topic。它还支持 sink topic 列表。提供的 topic 列表被视为 `topic` 元数据列的有效值的允许列表。如果提供了列表,对于 sink 表,“topic”元数据列是可写的并且必须指定。</td>
</tr>
<tr>
<td><h5>topic-pattern</h5></td>
<td>可选</td>
<td style="word-wrap: break-word;">(无)</td>
<td>String</td>
<td>匹配读取 topic 名称的正则表达式。在作业开始运行时,所有匹配该正则表达式的 topic 都将被 Kafka consumer 订阅。注意,对 source 表而言,'topic' 和 'topic-pattern' 两个选项只能使用其中一个。</td>
<td>用于读取或写入的 topic 名称模式的正则表达式。所有匹配指定正则表达式的 topic 名称将在作业开始运行时被消费者订阅。对于 sink 来说,`topic` 元数据列是可写的,必须提供并且与 `topic-pattern` 正则表达式匹配。注意,“topic-pattern”和“topic”只能指定其中一个。</td>
</tr>
<tr>
<td><h5>properties.bootstrap.servers</h5></td>
Expand Down
2 changes: 1 addition & 1 deletion docs/content.zh/docs/connectors/table/upsert-kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ of all available metadata fields.
<td>必选</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>用于读取和写入的 Kafka topic 名称。</td>
<td>当表用作 source 时读取数据的 topic 名,或当表用作 sink 时写入的 topic 名。它还支持通过分号分隔的 topic 列表,如 <code>'topic-1;topic-2'</code> 来作为 source 的 topic 列表。注意,“topic-pattern”和“topic”只能指定其中一个。对于 sink 来说,topic 名是写入数据的 topic。它还支持 sink 的 topic 列表。提供的 topic 列表被视为 `topic` 元数据列的有效值的允许列表。如果提供了列表,对于 sink 表,“topic”元数据列是可写的并且必须指定。</td>
</tr>
<tr>
<td><h5>properties.bootstrap.servers</h5></td>
Expand Down
8 changes: 4 additions & 4 deletions docs/content/docs/connectors/table/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ Read-only columns must be declared `VIRTUAL` to exclude them during an `INSERT I
<td><code>topic</code></td>
<td><code>STRING NOT NULL</code></td>
<td>Topic name of the Kafka record.</td>
<td><code>R</code></td>
<td><code>R/W</code></td>
</tr>
<tr>
<td><code>partition</code></td>
Expand Down Expand Up @@ -196,19 +196,19 @@ Connector Options
</tr>
<tr>
<td><h5>topic</h5></td>
<td>required for sink</td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by semicolon like <code>'topic-1;topic-2'</code>. Note, only one of "topic-pattern" and "topic" can be specified for sources. When the table is used as sink, the topic name is the topic to write data to. Note topic list is not supported for sinks.</td>
<td>Topic name(s) to read data from when the table is used as source, or topics for writing when the table is used as sink. It also supports topic list for source by separating topic by semicolon like <code>'topic-1;topic-2'</code>. Note, only one of "topic-pattern" and "topic" can be specified. For sinks, the topic name is the topic to write data. It also supports topic list for sinks. The provided topic-list is treated as a allow list of valid values for the `topic` metadata column. If a list is provided, for sink table, 'topic' metadata column is writable and must be specified.</td>
</tr>
<tr>
<td><h5>topic-pattern</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The regular expression for a pattern of topic names to read from. All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running. Note, only one of "topic-pattern" and "topic" can be specified for sources.</td>
<td>The regular expression for a pattern of topic names to read from or write to. All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running. For sinks, the `topic` metadata column is writable, must be provided and match the `topic-pattern` regex. Note, only one of "topic-pattern" and "topic" can be specified.</td>
</tr>
<tr>
<td><h5>properties.bootstrap.servers</h5></td>
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/connectors/table/upsert-kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ Connector Options
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The Kafka topic name to read from and write to.</td>
<td>Topic name(s) to read data from when the table is used as source, or topics for writing when the table is used as sink. It also supports topic list for source by separating topic by semicolon like <code>'topic-1;topic-2'</code>. Note, only one of "topic-pattern" and "topic" can be specified. For sinks, the topic name is the topic to write data. It also supports topic list for sinks. The provided topic-list is treated as a allow list of valid values for the `topic` metadata column. If a list is provided, for sink table, 'topic' metadata column is writable and must be specified.</td>
</tr>
<tr>
<td><h5>properties.bootstrap.servers</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,20 @@

import javax.annotation.Nullable;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;

import static org.apache.flink.util.Preconditions.checkNotNull;

/** SerializationSchema used by {@link KafkaDynamicSink} to configure a {@link KafkaSink}. */
class DynamicKafkaRecordSerializationSchema implements KafkaRecordSerializationSchema<RowData> {

private final String topic;
private final Set<String> topics;
private final Pattern topicPattern;
private final FlinkKafkaPartitioner<RowData> partitioner;
@Nullable private final SerializationSchema<RowData> keySerialization;
private final SerializationSchema<RowData> valueSerialization;
Expand All @@ -44,9 +52,11 @@ class DynamicKafkaRecordSerializationSchema implements KafkaRecordSerializationS
private final boolean hasMetadata;
private final int[] metadataPositions;
private final boolean upsertMode;
private final Map<String, Boolean> topicPatternMatches;

DynamicKafkaRecordSerializationSchema(
String topic,
@Nullable List<String> topics,
@Nullable Pattern topicPattern,
@Nullable FlinkKafkaPartitioner<RowData> partitioner,
@Nullable SerializationSchema<RowData> keySerialization,
SerializationSchema<RowData> valueSerialization,
Expand All @@ -60,7 +70,16 @@ class DynamicKafkaRecordSerializationSchema implements KafkaRecordSerializationS
keySerialization != null && keyFieldGetters.length > 0,
"Key must be set in upsert mode for serialization schema.");
}
this.topic = checkNotNull(topic);
Preconditions.checkArgument(
(topics != null && topicPattern == null && topics.size() > 0)
|| (topics == null && topicPattern != null),
"Either Topic or Topic Pattern must be set.");
if (topics != null) {
this.topics = new HashSet<>(topics);
} else {
this.topics = null;
}
this.topicPattern = topicPattern;
this.partitioner = partitioner;
this.keySerialization = keySerialization;
this.valueSerialization = checkNotNull(valueSerialization);
Expand All @@ -69,6 +88,8 @@ class DynamicKafkaRecordSerializationSchema implements KafkaRecordSerializationS
this.hasMetadata = hasMetadata;
this.metadataPositions = metadataPositions;
this.upsertMode = upsertMode;
// Cache results of topic pattern matches to avoid re-evaluating the pattern for each record
this.topicPatternMatches = new HashMap<>();
}

@Override
Expand All @@ -77,13 +98,15 @@ public ProducerRecord<byte[], byte[]> serialize(
// shortcut in case no input projection is required
if (keySerialization == null && !hasMetadata) {
final byte[] valueSerialized = valueSerialization.serialize(consumedRow);
final String targetTopic = getTargetTopic(consumedRow);
return new ProducerRecord<>(
topic,
targetTopic,
extractPartition(
consumedRow,
targetTopic,
null,
valueSerialized,
context.getPartitionsForTopic(topic)),
context.getPartitionsForTopic(targetTopic)),
null,
valueSerialized);
}
Expand Down Expand Up @@ -115,14 +138,15 @@ public ProducerRecord<byte[], byte[]> serialize(
consumedRow, kind, valueFieldGetters);
valueSerialized = valueSerialization.serialize(valueRow);
}

final String targetTopic = getTargetTopic(consumedRow);
return new ProducerRecord<>(
topic,
targetTopic,
extractPartition(
consumedRow,
targetTopic,
keySerialized,
valueSerialized,
context.getPartitionsForTopic(topic)),
context.getPartitionsForTopic(targetTopic)),
readMetadata(consumedRow, KafkaDynamicSink.WritableMetadata.TIMESTAMP),
keySerialized,
valueSerialized,
Expand All @@ -144,14 +168,42 @@ public void open(
valueSerialization.open(context);
}

private String getTargetTopic(RowData element) {
if (topics != null && topics.size() == 1) {
// If topics is a singleton list, we only return the provided topic.
return topics.stream().findFirst().get();
}
final String targetTopic = readMetadata(element, KafkaDynamicSink.WritableMetadata.TOPIC);
if (targetTopic == null) {
throw new IllegalArgumentException(
"The topic of the sink record is not valid. Expected a single topic but no topic is set.");
} else if (topics != null && !topics.contains(targetTopic)) {
throw new IllegalArgumentException(
String.format(
"The topic of the sink record is not valid. Expected topic to be in: %s but was: %s",
topics, targetTopic));
} else if (topicPattern != null && !cachedTopicPatternMatch(targetTopic)) {
throw new IllegalArgumentException(
String.format(
"The topic of the sink record is not valid. Expected topic to match: %s but was: %s",
topicPattern, targetTopic));
}
return targetTopic;
}

private boolean cachedTopicPatternMatch(String topic) {
return topicPatternMatches.computeIfAbsent(topic, t -> topicPattern.matcher(t).matches());
}

private Integer extractPartition(
RowData consumedRow,
String targetTopic,
@Nullable byte[] keySerialized,
byte[] valueSerialized,
int[] partitions) {
if (partitioner != null) {
return partitioner.partition(
consumedRow, keySerialized, valueSerialized, topic, partitions);
consumedRow, keySerialized, valueSerialized, targetTopic, partitions);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,15 @@ public class KafkaConnectorOptions {
.asList()
.noDefaultValue()
.withDescription(
"Topic names from which the table is read. Either 'topic' or 'topic-pattern' must be set for source. "
+ "Option 'topic' is required for sink.");
"Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by semicolon like 'topic-1;topic-2'. Note, only one of 'topic-pattern' and 'topic' can be specified for sources. "
+ "When the table is used as sink, the topic name is the topic to write data. It also supports topic list for sinks. The provided topic-list is treated as a allow list of valid values for the `topic` metadata column. If a list is provided, for sink table, 'topic' metadata column is writable and must be specified.");

public static final ConfigOption<String> TOPIC_PATTERN =
ConfigOptions.key("topic-pattern")
.stringType()
.noDefaultValue()
.withDescription(
"Optional topic pattern from which the table is read for source. Either 'topic' or 'topic-pattern' must be set.");
"Optional topic pattern from which the table is read for source, or topic pattern that must match the provided `topic` metadata column for sink. Either 'topic' or 'topic-pattern' must be set.");

public static final ConfigOption<String> PROPS_BOOTSTRAP_SERVERS =
ConfigOptions.key("properties.bootstrap.servers")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,23 +98,22 @@ class KafkaConnectorOptionsUtil {
protected static final String DEBEZIUM_AVRO_CONFLUENT = "debezium-avro-confluent";
private static final List<String> SCHEMA_REGISTRY_FORMATS =
Arrays.asList(AVRO_CONFLUENT, DEBEZIUM_AVRO_CONFLUENT);

// --------------------------------------------------------------------------------------------
// Validation
// --------------------------------------------------------------------------------------------

public static void validateTableSourceOptions(ReadableConfig tableOptions) {
validateSourceTopic(tableOptions);
validateTopic(tableOptions);
validateScanStartupMode(tableOptions);
validateScanBoundedMode(tableOptions);
}

public static void validateTableSinkOptions(ReadableConfig tableOptions) {
validateSinkTopic(tableOptions);
validateTopic(tableOptions);
validateSinkPartitioner(tableOptions);
}

public static void validateSourceTopic(ReadableConfig tableOptions) {
public static void validateTopic(ReadableConfig tableOptions) {
Optional<List<String>> topic = tableOptions.getOptional(TOPIC);
Optional<String> pattern = tableOptions.getOptional(TOPIC_PATTERN);

Expand All @@ -128,23 +127,6 @@ public static void validateSourceTopic(ReadableConfig tableOptions) {
}
}

public static void validateSinkTopic(ReadableConfig tableOptions) {
String errorMessageTemp =
"Flink Kafka sink currently only supports single topic, but got %s: %s.";
if (!isSingleTopic(tableOptions)) {
if (tableOptions.getOptional(TOPIC_PATTERN).isPresent()) {
throw new ValidationException(
String.format(
errorMessageTemp,
"'topic-pattern'",
tableOptions.get(TOPIC_PATTERN)));
} else {
throw new ValidationException(
String.format(errorMessageTemp, "'topic'", tableOptions.get(TOPIC)));
}
}
}

private static void validateScanStartupMode(ReadableConfig tableOptions) {
tableOptions
.getOptional(SCAN_STARTUP_MODE)
Expand Down Expand Up @@ -254,11 +236,11 @@ private static void validateSinkPartitioner(ReadableConfig tableOptions) {
// Utilities
// --------------------------------------------------------------------------------------------

public static List<String> getSourceTopics(ReadableConfig tableOptions) {
public static List<String> getTopics(ReadableConfig tableOptions) {
return tableOptions.getOptional(TOPIC).orElse(null);
}

public static Pattern getSourceTopicPattern(ReadableConfig tableOptions) {
public static Pattern getTopicPattern(ReadableConfig tableOptions) {
return tableOptions.getOptional(TOPIC_PATTERN).map(Pattern::compile).orElse(null);
}

Expand Down Expand Up @@ -636,21 +618,25 @@ public static DynamicTableFactory.Context autoCompleteSchemaRegistrySubject(
private static Map<String, String> autoCompleteSchemaRegistrySubject(
Map<String, String> options) {
Configuration configuration = Configuration.fromMap(options);
// the subject autoComplete should only be used in sink, check the topic first
validateSinkTopic(configuration);
final Optional<String> valueFormat = configuration.getOptional(VALUE_FORMAT);
final Optional<String> keyFormat = configuration.getOptional(KEY_FORMAT);
final Optional<String> format = configuration.getOptional(FORMAT);
final String topic = configuration.get(TOPIC).get(0);

if (format.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(format.get())) {
autoCompleteSubject(configuration, format.get(), topic + "-value");
} else if (valueFormat.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(valueFormat.get())) {
autoCompleteSubject(configuration, "value." + valueFormat.get(), topic + "-value");
}
// the subject autoComplete should only be used in sink with a single topic, check the topic
// option first
validateTopic(configuration);
if (configuration.contains(TOPIC) && isSingleTopic(configuration)) {
final Optional<String> valueFormat = configuration.getOptional(VALUE_FORMAT);
final Optional<String> keyFormat = configuration.getOptional(KEY_FORMAT);
final Optional<String> format = configuration.getOptional(FORMAT);
final String topic = configuration.get(TOPIC).get(0);

if (format.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(format.get())) {
autoCompleteSubject(configuration, format.get(), topic + "-value");
} else if (valueFormat.isPresent()
&& SCHEMA_REGISTRY_FORMATS.contains(valueFormat.get())) {
autoCompleteSubject(configuration, "value." + valueFormat.get(), topic + "-value");
}

if (keyFormat.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(keyFormat.get())) {
autoCompleteSubject(configuration, "key." + keyFormat.get(), topic + "-key");
if (keyFormat.isPresent() && SCHEMA_REGISTRY_FORMATS.contains(keyFormat.get())) {
autoCompleteSubject(configuration, "key." + keyFormat.get(), topic + "-key");
}
}
return configuration.toMap();
}
Expand Down
Loading

0 comments on commit 3730005

Please sign in to comment.