Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector. #4382

Merged
merged 35 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
3491092
Add Pulsar sink connector.
Mar 21, 2023
3046c38
Add license header.
Mar 22, 2023
43aad76
Merge branch 'dev' into create-pulsar-sink
Mar 23, 2023
ab1e71c
Modify asynchronous write mode to improve write throughput performance
Mar 24, 2023
0b8d97f
update doc.
Mar 27, 2023
5584b72
Merge branch 'dev' into create-pulsar-sink
Mar 27, 2023
b7cde11
Merge branch 'dev' into create-pulsar-sink
May 24, 2023
0a94a8d
Merge branch 'dev' into create-pulsar-sink
May 25, 2023
50d3ad7
Merge branch 'dev' into create-pulsar-sink
hailin0 May 25, 2023
ce34460
add e2e test.
May 26, 2023
86f1c0d
Merge branch 'create-pulsar-sink' of github.com:lightzhao/incubator-s…
May 26, 2023
273452d
Merge branch 'dev' into create-pulsar-sink
Aug 14, 2023
89157b8
add pulsar sink config.
Aug 15, 2023
3c80879
Merge branch 'dev' into create-pulsar-sink
Aug 28, 2023
5ee01e1
fix sink test and miss class.
Aug 28, 2023
de45934
Merge branch 'dev' into create-pulsar-sink
Sep 1, 2023
8a08656
Merge branch 'dev' into create-pulsar-sink
Sep 12, 2023
8f1a1ec
Merge branch 'dev' into create-pulsar-sink
Sep 18, 2023
e16c7dd
Merge branch 'dev' into create-pulsar-sink
Sep 21, 2023
64da469
Merge branch 'dev' into create-pulsar-sink
Nov 22, 2023
d00a87b
update doc and add e2e test case.
Nov 22, 2023
7200d1f
update test.
Nov 24, 2023
78bfe21
update test.
Nov 24, 2023
7f67ac5
update test.
Nov 25, 2023
49b0a69
update test.
Nov 25, 2023
9438f90
update e2e test.
Nov 25, 2023
a7a6dff
update e2e test.
Nov 25, 2023
918729d
Merge branch 'dev' into create-pulsar-sink
Dec 1, 2023
a80e32c
Merge branch 'dev' into create-pulsar-sink
Dec 8, 2023
c9ca686
Support multi-table sink feature and update doc.
Dec 11, 2023
612d000
Support multi-table sink feature and update doc.
Dec 12, 2023
994242b
Merge branch 'dev' into create-pulsar-sink
Dec 13, 2023
ba5a187
remove unless code.
Dec 19, 2023
0122d77
Merge branch 'dev' into create-pulsar-sink
Dec 20, 2023
126874d
Update seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/…
EricJoy2048 Jan 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Support multi-table sink feature and update doc.
  • Loading branch information
lightzhao committed Dec 11, 2023
commit c9ca686a25c2a6063889236d4bd1c51b08e64fb8
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Apache Pulsar
# Pulsar

> Apache Pulsar sink connector
> Pulsar sink connector

## Support Those Engines

Expand Down Expand Up @@ -35,7 +35,7 @@ Sink connector for Apache Pulsar.
| field_delimiter | String | No | , | Customize the field delimiter for data format. |
| semantics | Enum | No | AT_LEAST_ONCE | Consistency semantics for writing to pulsar. |
| transaction_timeout | Int | No | 600 | The transaction timeout is specified as 10 minutes by default. |
| pulsar. | String | No | - | In addition to the above parameters that must be specified by the Pulsar producer client. |
| pulsar.config | Map | No | - | In addition to the above parameters that must be specified by the Pulsar producer client. |
| message.routing.mode | Enum | No | RoundRobinPartition | Default routing mode for messages to partition. |
| partition_key_fields | array | No | - | Configure which fields are used as the key of the pulsar message. |
| common-options | config | no | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. |
Expand Down Expand Up @@ -89,7 +89,7 @@ The transaction timeout is specified as 10 minutes by default.
If the transaction does not commit within the specified timeout, the transaction will be automatically aborted.
So you need to ensure that the timeout is greater than the checkpoint interval.

### pulsar. [String]
### pulsar.config [Map]

In addition to the above parameters that must be specified by the Pulsar producer client,
the user can also specify multiple non-mandatory parameters for the producer client,
Expand Down Expand Up @@ -162,6 +162,9 @@ sink {
client.service-url = "localhost:pulsar://localhost:6650"
admin.service-url = "http://my-broker.example.com:8080"
result_table_name = "test"
pulsar.config = {
sendTimeoutMs = 30000
}
}
}
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.pulsar.config;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.common.config.TypesafeConfigUtils;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;

Expand Down Expand Up @@ -50,7 +48,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.PULSAR_CONFIG_PREFIX;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.PULSAR_CONFIG;

public class PulsarConfigUtil {

Expand Down Expand Up @@ -166,24 +164,19 @@ public static Producer<byte[]> createProducer(
PulsarClient pulsarClient,
String topic,
PulsarSemantics pulsarSemantics,
Config pluginConfig,
ReadonlyConfig pluginConfig,
MessageRoutingMode messageRoutingMode)
throws PulsarClientException {
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer(Schema.BYTES);
producerBuilder.topic(topic);
producerBuilder.messageRoutingMode(messageRoutingMode);
producerBuilder.blockIfQueueFull(true);
if (pluginConfig.hasPath(PULSAR_CONFIG_PREFIX.key())) {
Config pulsarConfig =
TypesafeConfigUtils.extractSubConfig(
pluginConfig, PULSAR_CONFIG_PREFIX.key(), false);

if (pluginConfig.get(PULSAR_CONFIG) != null) {
Map<String, String> pulsarProperties = new HashMap<>();
pulsarConfig
.entrySet()
.forEach(
entry -> {
pulsarProperties.put(entry.getKey(), entry.getValue().render());
});
pluginConfig
.get(PULSAR_CONFIG)
.forEach((key, value) -> pulsarProperties.put(key, value));
producerBuilder.properties(pulsarProperties);
}
if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.pulsar.client.api.MessageRoutingMode;

import java.util.List;
import java.util.Map;

public class SinkProperties {

Expand Down Expand Up @@ -66,13 +67,14 @@ public class SinkProperties {
.defaultValue(600)
.withDescription(
"The transaction timeout is specified as 10 minutes by default. If the transaction does not commit within the specified timeout, the transaction will be automatically aborted. So you need to ensure that the timeout is greater than the checkpoint interval");
public static final Option<String> PULSAR_CONFIG_PREFIX =
Options.key("pulsar.")
.stringType()

public static final Option<Map<String, String>> PULSAR_CONFIG =
Options.key("pulsar.config")
.mapType()
.noDefaultValue()
.withDescription(
"In addition to the above parameters that must be specified by the Pulsar producer client, "
+ "the user can also specify multiple non-mandatory parameters for the producer client, "
"In addition to the above parameters that must be specified by the Pulsar producer or consumer client, "
+ "the user can also specify multiple non-mandatory parameters for the producer or consumer client, "
+ "covering all the producer parameters specified in the official Pulsar document.");

public static final Option<MessageRoutingMode> MESSAGE_ROUTING_MODE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,116 +17,64 @@

package org.apache.seatunnel.connectors.seatunnel.pulsar.sink;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarAdminConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarSinkState;

import com.google.auto.service.AutoService;

import java.util.Collections;
import java.util.List;
import java.util.Optional;

import static org.apache.seatunnel.common.PropertiesUtil.setOption;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.ADMIN_SERVICE_URL;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PARAMS;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.AUTH_PLUGIN_CLASS;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.CLIENT_SERVICE_URL;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SourceProperties.TOPIC;

/**
* Pulsar Sink implementation by using SeaTunnel sink API. This class contains the method to create
* {@link PulsarSinkWriter} and {@link PulsarSinkCommitter}.
*/
@AutoService(SeaTunnelSink.class)
public class PulsarSink
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Support multi-table sink feature: please refer to the following link for the upgrade instructions: #5652

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do kafka and pulsar need the support of multiple table sinks? Multiple topics are supported.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Supporting multiple tables is mainly to accept multiple upstream tables, which can be implemented separately in the future.

implements SeaTunnelSink<
SeaTunnelRow, PulsarSinkState, PulsarCommitInfo, PulsarAggregatedCommitInfo> {

private PulsarAdminConfig adminConfig;
private Config pluginConfig;
private SeaTunnelRowType seaTunnelRowType;
private PulsarClientConfig clientConfig;
private ReadonlyConfig readonlyConfig;

@Override
public void prepare(Config config) throws PrepareFailException {
this.pluginConfig = config;
CheckResult result =
CheckConfigUtil.checkAllExists(
pluginConfig,
TOPIC.key(),
CLIENT_SERVICE_URL.key(),
ADMIN_SERVICE_URL.key());
if (!result.isSuccess()) {
throw new PulsarConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SINK, result.getMsg()));
}
// admin config
PulsarAdminConfig.Builder adminConfigBuilder =
PulsarAdminConfig.builder().adminUrl(config.getString(ADMIN_SERVICE_URL.key()));
setOption(
config,
AUTH_PLUGIN_CLASS.key(),
config::getString,
adminConfigBuilder::authPluginClassName);
setOption(config, AUTH_PARAMS.key(), config::getString, adminConfigBuilder::authParams);
this.adminConfig = adminConfigBuilder.build();
public PulsarSink(ReadonlyConfig readonlyConfig, SeaTunnelRowType seaTunnelRowType) {
this.readonlyConfig = readonlyConfig;
this.seaTunnelRowType = seaTunnelRowType;

// client config
/** client config */
PulsarClientConfig.Builder clientConfigBuilder =
PulsarClientConfig.builder().serviceUrl(config.getString(CLIENT_SERVICE_URL.key()));
setOption(
config,
AUTH_PLUGIN_CLASS.key(),
config::getString,
clientConfigBuilder::authPluginClassName);
setOption(config, AUTH_PARAMS.key(), config::getString, clientConfigBuilder::authParams);
PulsarClientConfig.builder().serviceUrl(readonlyConfig.get(CLIENT_SERVICE_URL));
clientConfigBuilder.authPluginClassName(readonlyConfig.get(AUTH_PLUGIN_CLASS));
clientConfigBuilder.authParams(readonlyConfig.get(AUTH_PARAMS));
this.clientConfig = clientConfigBuilder.build();
}

@Override
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
this.seaTunnelRowType = seaTunnelRowType;
}

@Override
public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
return this.seaTunnelRowType;
}

@Override
public SinkWriter<SeaTunnelRow, PulsarCommitInfo, PulsarSinkState> createWriter(
SinkWriter.Context context) {
return new PulsarSinkWriter(
context, clientConfig, seaTunnelRowType, pluginConfig, Collections.emptyList());
context, clientConfig, seaTunnelRowType, readonlyConfig, Collections.emptyList());
}

@Override
public SinkWriter<SeaTunnelRow, PulsarCommitInfo, PulsarSinkState> restoreWriter(
SinkWriter.Context context, List<PulsarSinkState> states) {
return new PulsarSinkWriter(context, clientConfig, seaTunnelRowType, pluginConfig, states);
return new PulsarSinkWriter(
context, clientConfig, seaTunnelRowType, readonlyConfig, states);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package org.apache.seatunnel.connectors.seatunnel.pulsar.sink;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;

import com.google.auto.service.AutoService;
Expand Down Expand Up @@ -48,4 +50,12 @@ public OptionRule optionRule() {
.bundled(AUTH_PLUGIN_CLASS, AUTH_PARAMS)
.build();
}

@Override
public TableSink createSink(TableSinkFactoryContext context) {
return () ->
new PulsarSink(
context.getOptions(),
context.getCatalogTable().getTableSchema().toPhysicalRowDataType());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.pulsar.sink;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.SerializationSchema;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
Expand All @@ -29,7 +28,6 @@
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSemantics;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties;
import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarCommitInfo;
Expand Down Expand Up @@ -64,6 +62,7 @@
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.PARTITION_KEY_FIELDS;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.SEMANTICS;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TEXT_FORMAT;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TOPIC;
import static org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties.TRANSACTION_TIMEOUT;

public class PulsarSinkWriter
Expand All @@ -77,36 +76,21 @@ public class PulsarSinkWriter
private TransactionImpl transaction;
private int transactionTimeout = TRANSACTION_TIMEOUT.defaultValue();
private PulsarSemantics pulsarSemantics = SEMANTICS.defaultValue();
private MessageRoutingMode messageRoutingMode = MESSAGE_ROUTING_MODE.defaultValue();
private final AtomicLong pendingMessages;

public PulsarSinkWriter(
Context context,
PulsarClientConfig clientConfig,
SeaTunnelRowType seaTunnelRowType,
Config pluginConfig,
ReadonlyConfig pluginConfig,
List<PulsarSinkState> pulsarStates) {
this.context = context;
String topic = pluginConfig.getString(SinkProperties.TOPIC.key());
String format = FORMAT.defaultValue();
if (pluginConfig.hasPath(FORMAT.key())) {
format = pluginConfig.getString(FORMAT.key());
}
String delimiter = FIELD_DELIMITER.defaultValue();
if (pluginConfig.hasPath(FIELD_DELIMITER.key())) {
delimiter = pluginConfig.getString(FIELD_DELIMITER.key());
}

if (pluginConfig.hasPath(TRANSACTION_TIMEOUT.key())) {
transactionTimeout = pluginConfig.getInt(TRANSACTION_TIMEOUT.key());
}
if (pluginConfig.hasPath(SEMANTICS.key())) {
pulsarSemantics = pluginConfig.getEnum(PulsarSemantics.class, SEMANTICS.key());
}
if (pluginConfig.hasPath(MESSAGE_ROUTING_MODE.key())) {
messageRoutingMode =
pluginConfig.getEnum(MessageRoutingMode.class, MESSAGE_ROUTING_MODE.key());
}
String topic = pluginConfig.get(TOPIC);
String format = pluginConfig.get(FORMAT);
String delimiter = pluginConfig.get(FIELD_DELIMITER);
Integer transactionTimeout = pluginConfig.get(TRANSACTION_TIMEOUT);
PulsarSemantics pulsarSemantics = pluginConfig.get(SEMANTICS);
MessageRoutingMode messageRoutingMode = pluginConfig.get(MESSAGE_ROUTING_MODE);
this.serializationSchema = createSerializationSchema(seaTunnelRowType, format, delimiter);
List<String> partitionKeyList = getPartitionKeyFields(pluginConfig, seaTunnelRowType);
this.keySerializationSchema =
Expand Down Expand Up @@ -281,12 +265,12 @@ public static SerializationSchema createKeySerializationSchema(
* @return
*/
private List<String> getPartitionKeyFields(
Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
if (pluginConfig.hasPath(PARTITION_KEY_FIELDS.key())) {
List<String> partitionKeyFields =
pluginConfig.getStringList(PARTITION_KEY_FIELDS.key());
ReadonlyConfig pluginConfig, SeaTunnelRowType seaTunnelRowType) {
if (pluginConfig.getOptional(PARTITION_KEY_FIELDS) != null) {
Optional<List<String>> partitionKeyFields =
pluginConfig.getOptional(PARTITION_KEY_FIELDS);
List<String> rowTypeFieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
for (String partitionKeyField : partitionKeyFields) {
for (String partitionKeyField : partitionKeyFields.get()) {
if (!rowTypeFieldNames.contains(partitionKeyField)) {
throw new PulsarConnectorException(
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
Expand All @@ -295,7 +279,7 @@ private List<String> getPartitionKeyFields(
partitionKeyField, rowTypeFieldNames));
}
}
return partitionKeyFields;
return partitionKeyFields.get();
}
return Collections.emptyList();
}
Expand Down