Skip to content
This repository was archived by the owner on Dec 14, 2022. It is now read-only.

Commit e9bd613

Browse files
author
gavingaozhangmin
committed
add newSqlClientConf method for DynamicTableSink
1 parent fbb8d17 commit e9bd613

File tree

3 files changed

+30
-2
lines changed

3 files changed

+30
-2
lines changed

pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/internal/PulsarClientUtils.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,20 @@
1818

1919
package org.apache.flink.streaming.connectors.pulsar.internal;
2020

21+
import org.apache.flink.shaded.curator4.com.google.common.collect.Maps;
22+
2123
import org.apache.pulsar.client.admin.PulsarAdmin;
2224
import org.apache.pulsar.client.api.Authentication;
2325
import org.apache.pulsar.client.api.AuthenticationFactory;
2426
import org.apache.pulsar.client.api.PulsarClientException;
2527
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
2628
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
29+
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
2730
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
2831

32+
import java.util.Map;
2933
import java.util.Properties;
34+
import java.util.stream.Collectors;
3035

3136
/** Utility to create Pulsar Admin Client from adminUrl and clientConfigurationData. */
3237
public class PulsarClientUtils {
@@ -65,4 +70,27 @@ public static ClientConfigurationData newClientConf(String serviceUrl, Propertie
6570
}
6671
return clientConf;
6772
}
73+
74+
public static ClientConfigurationData newSqlClientConf(String serviceUrl,
75+
Properties properties) {
76+
Map<String, Object> clientConfData = getClientParams(Maps.fromProperties(properties));
77+
ClientConfigurationData clientConf = new ClientConfigurationData();
78+
clientConf =
79+
ConfigurationDataUtils.loadData(
80+
clientConfData, clientConf, ClientConfigurationData.class);
81+
clientConf.setServiceUrl(serviceUrl);
82+
return clientConf;
83+
}
84+
85+
public static Map<String, Object> getClientParams(Map<String, String> parameters) {
86+
return parameters.keySet().stream()
87+
.filter(k -> k.startsWith(PulsarOptions.PULSAR_CLIENT_OPTION_KEY_PREFIX))
88+
.collect(
89+
Collectors.toMap(
90+
k ->
91+
k.substring(
92+
PulsarOptions.PULSAR_CLIENT_OPTION_KEY_PREFIX
93+
.length()),
94+
k -> parameters.get(k)));
95+
}
6896
}

pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarDynamicTableSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ private SinkFunction<RowData> createPulsarSink(
237237
Properties properties,
238238
PulsarSerializationSchema<RowData> pulsarSerializer) {
239239
final ClientConfigurationData configurationData =
240-
PulsarClientUtils.newClientConf(serviceUrl, properties);
240+
PulsarClientUtils.newSqlClientConf(serviceUrl, properties);
241241
return new FlinkPulsarSink<RowData>(
242242
adminUrl,
243243
Optional.ofNullable(topic),

pulsar-flink-connector/src/main/java/org/apache/flink/streaming/connectors/pulsar/table/PulsarDynamicTableSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
217217
createPulsarDeserialization(
218218
keyDeserialization, valueDeserialization, producedTypeInfo);
219219
final ClientConfigurationData clientConfigurationData =
220-
PulsarClientUtils.newClientConf(serviceUrl, properties);
220+
PulsarClientUtils.newSqlClientConf(serviceUrl, properties);
221221
FlinkPulsarSource<RowData> source =
222222
new FlinkPulsarSource<>(
223223
adminUrl, clientConfigurationData, deserializationSchema, properties);

0 commit comments

Comments
 (0)