diff --git a/docs/content/connectors/mongodb-cdc.md b/docs/content/connectors/mongodb-cdc.md
index f1aaa3e63b7..2eccc288917 100644
--- a/docs/content/connectors/mongodb-cdc.md
+++ b/docs/content/connectors/mongodb-cdc.md
@@ -146,6 +146,13 @@ Connector Options
String |
Specify what connector to use, here should be mongodb-cdc . |
+
+ scheme |
+ optional |
+ mongodb |
+ String |
+ The protocol connected to MongoDB. eg. mongodb or mongodb+srv. |
+
hosts |
required |
diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java
index 187279194c5..c940e7bdc93 100644
--- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java
+++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/MongoDBSource.java
@@ -36,6 +36,8 @@
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.COLLECTION_INCLUDE_LIST;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.DATABASE_INCLUDE_LIST;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.HEARTBEAT_TOPIC_NAME;
+import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SCHEME;
+import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SRV_SCHEME;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.OUTPUT_SCHEMA;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COPY_EXISTING;
@@ -63,7 +65,7 @@ public static Builder builder() {
/** Builder class of {@link MongoDBSource}. */
public static class Builder {
-
+ private String scheme;
private String hosts;
private String username;
private String password;
@@ -81,6 +83,17 @@ public static class Builder {
private Integer heartbeatIntervalMillis = HEARTBEAT_INTERVAL_MILLIS.defaultValue();
private DebeziumDeserializationSchema deserializer;
+ /** The protocol connected to MongoDB. For example mongodb or mongodb+srv. */
+ public Builder scheme(String scheme) {
+ checkArgument(
+ MONGODB_SCHEME.equals(scheme) || MONGODB_SRV_SCHEME.equals(scheme),
+ String.format(
+ "The scheme should either be %s or %s",
+ MONGODB_SCHEME, MONGODB_SRV_SCHEME));
+ this.scheme = scheme;
+ return this;
+ }
+
/** The comma-separated list of hostname and port pairs of mongodb servers. */
public Builder hosts(String hosts) {
this.hosts = hosts;
@@ -261,7 +274,8 @@ public DebeziumSourceFunction build() {
props.setProperty(
MongoSourceConfig.CONNECTION_URI_CONFIG,
String.valueOf(
- buildConnectionString(username, password, hosts, connectionOptions)));
+ buildConnectionString(
+ username, password, scheme, hosts, connectionOptions)));
if (databaseList != null) {
props.setProperty(DATABASE_INCLUDE_LIST, String.join(",", databaseList));
diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBEnvelope.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBEnvelope.java
index 312018d9983..a95bf9213d3 100644
--- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBEnvelope.java
+++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/internal/MongoDBEnvelope.java
@@ -41,6 +41,8 @@ public class MongoDBEnvelope {
public static final String MONGODB_SCHEME = "mongodb";
+ public static final String MONGODB_SRV_SCHEME = "mongodb+srv";
+
public static final String ID_FIELD = "_id";
public static final String DATA_FIELD = "_data";
diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java
index 54281e14085..8a2f76b6afd 100644
--- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java
+++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/MongoDBSourceBuilder.java
@@ -52,6 +52,12 @@ public class MongoDBSourceBuilder {
private final MongoDBSourceConfigFactory configFactory = new MongoDBSourceConfigFactory();
private DebeziumDeserializationSchema deserializer;
+ /** The protocol connected to MongoDB. For example mongodb or mongodb+srv. */
+ public MongoDBSourceBuilder scheme(String scheme) {
+ this.configFactory.scheme(scheme);
+ return this;
+ }
+
/** The comma-separated list of hostname and port pairs of mongodb servers. */
public MongoDBSourceBuilder hosts(String hosts) {
this.configFactory.hosts(hosts);
diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfig.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfig.java
index 6e8134204be..34e5a18423b 100644
--- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfig.java
+++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfig.java
@@ -33,6 +33,7 @@ public class MongoDBSourceConfig implements SourceConfig {
private static final long serialVersionUID = 1L;
+ private final String scheme;
private final String hosts;
@Nullable private final String username;
@Nullable private final String password;
@@ -49,6 +50,7 @@ public class MongoDBSourceConfig implements SourceConfig {
private final int splitSizeMB;
MongoDBSourceConfig(
+ String scheme,
String hosts,
@Nullable String username,
@Nullable String password,
@@ -63,13 +65,14 @@ public class MongoDBSourceConfig implements SourceConfig {
int heartbeatIntervalMillis,
int splitMetaGroupSize,
int splitSizeMB) {
+ this.scheme = checkNotNull(scheme);
this.hosts = checkNotNull(hosts);
this.username = username;
this.password = password;
this.databaseList = databaseList;
this.collectionList = collectionList;
this.connectionString =
- buildConnectionString(username, password, hosts, connectionOptions)
+ buildConnectionString(username, password, scheme, hosts, connectionOptions)
.getConnectionString();
this.batchSize = batchSize;
this.pollAwaitTimeMillis = pollAwaitTimeMillis;
@@ -81,6 +84,10 @@ public class MongoDBSourceConfig implements SourceConfig {
this.splitSizeMB = splitSizeMB;
}
+ public String getScheme() {
+ return scheme;
+ }
+
public String getHosts() {
return hosts;
}
@@ -166,6 +173,7 @@ public boolean equals(Object o) {
&& heartbeatIntervalMillis == that.heartbeatIntervalMillis
&& splitMetaGroupSize == that.splitMetaGroupSize
&& splitSizeMB == that.splitSizeMB
+ && Objects.equals(scheme, that.scheme)
&& Objects.equals(hosts, that.hosts)
&& Objects.equals(username, that.username)
&& Objects.equals(password, that.password)
@@ -177,6 +185,7 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
return Objects.hash(
+ scheme,
hosts,
username,
password,
diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfigFactory.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfigFactory.java
index ec1cd088143..9ff96a6e09f 100644
--- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfigFactory.java
+++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceConfigFactory.java
@@ -25,11 +25,14 @@
import java.util.List;
import static com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
+import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SCHEME;
+import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SRV_SCHEME;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_AWAIT_TIME_MILLIS;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB;
+import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCHEME;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -39,6 +42,7 @@ public class MongoDBSourceConfigFactory implements Factory
private static final long serialVersionUID = 1L;
+ private String scheme = SCHEME.defaultValue();
private String hosts;
private String username;
private String password;
@@ -54,6 +58,17 @@ public class MongoDBSourceConfigFactory implements Factory
private Integer splitMetaGroupSize = CHUNK_META_GROUP_SIZE.defaultValue();
private Integer splitSizeMB = SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB.defaultValue();
+ /** The protocol connected to MongoDB. For example mongodb or mongodb+srv. */
+ public MongoDBSourceConfigFactory scheme(String scheme) {
+ checkArgument(
+ MONGODB_SCHEME.equals(scheme) || MONGODB_SRV_SCHEME.equals(scheme),
+ String.format(
+ "The scheme should either be %s or %s",
+ MONGODB_SCHEME, MONGODB_SRV_SCHEME));
+ this.scheme = scheme;
+ return this;
+ }
+
/** The comma-separated list of hostname and port pairs of mongodb servers. */
public MongoDBSourceConfigFactory hosts(String hosts) {
this.hosts = hosts;
@@ -196,6 +211,7 @@ public void validate() {
@Override
public MongoDBSourceConfig create(int subtaskId) {
return new MongoDBSourceConfig(
+ scheme,
hosts,
username,
password,
diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceOptions.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceOptions.java
index d1675d96c7e..9032a6f71d8 100644
--- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceOptions.java
+++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/config/MongoDBSourceOptions.java
@@ -20,9 +20,19 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
+import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SCHEME;
+
/** Configurations for {@link com.ververica.cdc.connectors.mongodb.source.MongoDBSource}. */
public class MongoDBSourceOptions {
+ public static final ConfigOption SCHEME =
+ ConfigOptions.key("scheme")
+ .stringType()
+ .defaultValue(MONGODB_SCHEME)
+ .withDescription(
+ "The protocol connected to MongoDB. eg. mongodb or mongodb+srv. "
+ + "The +srv indicates to the client that the hostname that follows corresponds to a DNS SRV record. Defaults to mongodb.");
+
public static final ConfigOption HOSTS =
ConfigOptions.key("hosts")
.stringType()
diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBScanFetchTask.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBScanFetchTask.java
index 46a6d60942d..6a30ce96b02 100644
--- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBScanFetchTask.java
+++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBScanFetchTask.java
@@ -130,6 +130,7 @@ public void execute(Context context) throws Exception {
SourceRecord snapshotRecord =
createSourceRecord(
createPartitionMap(
+ sourceConfig.getScheme(),
sourceConfig.getHosts(),
collectionId.catalog(),
collectionId.table()),
diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java
index 1e20991fbd0..c4b818ed0d5 100644
--- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java
+++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.java
@@ -145,6 +145,7 @@ public void execute(Context context) throws Exception {
changeRecord =
createSourceRecord(
createPartitionMap(
+ sourceConfig.getScheme(),
sourceConfig.getHosts(),
namespace.getDatabaseName(),
namespace.getCollectionName()),
@@ -277,7 +278,7 @@ private HeartbeatManager openHeartbeatManagerIfNeeded(
changeStreamCursor,
sourceConfig.getHeartbeatIntervalMillis(),
HEARTBEAT_TOPIC_NAME,
- createHeartbeatPartitionMap(sourceConfig.getHosts()));
+ createHeartbeatPartitionMap(sourceConfig.getScheme(), sourceConfig.getHosts()));
}
return null;
}
diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/utils/MongoRecordUtils.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/utils/MongoRecordUtils.java
index 225467526ab..6c7ec7fbd3c 100644
--- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/utils/MongoRecordUtils.java
+++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/utils/MongoRecordUtils.java
@@ -175,9 +175,9 @@ public static Map createSourceOffsetMap(
}
public static Map createPartitionMap(
- String hosts, String database, String collection) {
+ String scheme, String hosts, String database, String collection) {
StringBuilder builder = new StringBuilder();
- builder.append("mongodb://");
+ builder.append(String.format("%s://", scheme));
builder.append(hosts);
builder.append("/");
if (StringUtils.isNotEmpty(database)) {
@@ -190,9 +190,9 @@ public static Map createPartitionMap(
return singletonMap(NAMESPACE_FIELD, builder.toString());
}
- public static Map createHeartbeatPartitionMap(String hosts) {
+ public static Map createHeartbeatPartitionMap(String scheme, String hosts) {
StringBuilder builder = new StringBuilder();
- builder.append("mongodb://");
+ builder.append(String.format("%s://", scheme));
builder.append(hosts);
builder.append("/");
builder.append(HEARTBEAT_TOPIC_NAME);
diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/utils/MongoUtils.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/utils/MongoUtils.java
index 2f50fe80925..136bbf33658 100644
--- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/utils/MongoUtils.java
+++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/utils/MongoUtils.java
@@ -56,7 +56,6 @@
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.DROPPED_FIELD;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.ID_FIELD;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.KEY_FIELD;
-import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SCHEME;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.NAMESPACE_FIELD;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.UUID_FIELD;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.encodeValue;
@@ -348,9 +347,10 @@ public static MongoClient clientFor(MongoDBSourceConfig sourceConfig) {
public static ConnectionString buildConnectionString(
@Nullable String username,
@Nullable String password,
+ String scheme,
String hosts,
@Nullable String connectionOptions) {
- StringBuilder sb = new StringBuilder(MONGODB_SCHEME).append("://");
+ StringBuilder sb = new StringBuilder(scheme).append("://");
if (StringUtils.isNotEmpty(username) && StringUtils.isNotEmpty(password)) {
sb.append(encodeValue(username)).append(":").append(encodeValue(password)).append("@");
diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java
index 4be17dae5b2..cfce1b82276 100644
--- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java
+++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSource.java
@@ -62,6 +62,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
private static final Logger LOG = LoggerFactory.getLogger(MongoDBTableSource.class);
private final ResolvedSchema physicalSchema;
+ private final String scheme;
private final String hosts;
private final String connectionOptions;
private final String username;
@@ -91,6 +92,7 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad
public MongoDBTableSource(
ResolvedSchema physicalSchema,
+ String scheme,
String hosts,
@Nullable String username,
@Nullable String password,
@@ -108,6 +110,7 @@ public MongoDBTableSource(
@Nullable Integer splitMetaGroupSize,
@Nullable Integer splitSizeMB) {
this.physicalSchema = physicalSchema;
+ this.scheme = checkNotNull(scheme);
this.hosts = checkNotNull(hosts);
this.username = username;
this.password = password;
@@ -172,7 +175,10 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
if (enableParallelRead) {
MongoDBSourceBuilder builder =
- MongoDBSource.builder().hosts(hosts).deserializer(deserializer);
+ MongoDBSource.builder()
+ .scheme(scheme)
+ .hosts(hosts)
+ .deserializer(deserializer);
Optional.ofNullable(databaseList).ifPresent(builder::databaseList);
Optional.ofNullable(collectionList).ifPresent(builder::collectionList);
@@ -192,6 +198,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
} else {
com.ververica.cdc.connectors.mongodb.MongoDBSource.Builder builder =
com.ververica.cdc.connectors.mongodb.MongoDBSource.builder()
+ .scheme(scheme)
.hosts(hosts)
.deserializer(deserializer);
@@ -248,6 +255,7 @@ public DynamicTableSource copy() {
MongoDBTableSource source =
new MongoDBTableSource(
physicalSchema,
+ scheme,
hosts,
username,
password,
@@ -279,6 +287,7 @@ public boolean equals(Object o) {
}
MongoDBTableSource that = (MongoDBTableSource) o;
return Objects.equals(physicalSchema, that.physicalSchema)
+ && Objects.equals(scheme, that.scheme)
&& Objects.equals(hosts, that.hosts)
&& Objects.equals(username, that.username)
&& Objects.equals(password, that.password)
@@ -303,6 +312,7 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(
physicalSchema,
+ scheme,
hosts,
username,
password,
diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java
index 72f3d9dc9b6..fd48a200052 100644
--- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java
+++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableSourceFactory.java
@@ -46,6 +46,7 @@
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
+import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCHEME;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.USERNAME;
import static com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -65,6 +66,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
final ReadableConfig config = helper.getOptions();
+ String scheme = config.get(SCHEME);
String hosts = config.get(HOSTS);
String connectionOptions = config.getOptional(CONNECTION_OPTIONS).orElse(null);
@@ -103,6 +105,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
return new MongoDBTableSource(
physicalSchema,
+ scheme,
hosts,
username,
password,
@@ -142,6 +145,7 @@ public Set> requiredOptions() {
@Override
public Set> optionalOptions() {
Set> options = new HashSet<>();
+ options.add(SCHEME);
options.add(USERNAME);
options.add(PASSWORD);
options.add(CONNECTION_OPTIONS);
diff --git a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/LegacyMongoDBSourceTest.java b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/LegacyMongoDBSourceTest.java
index 556b71a6bad..df7226bb9a7 100644
--- a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/LegacyMongoDBSourceTest.java
+++ b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/LegacyMongoDBSourceTest.java
@@ -354,17 +354,17 @@ public void go() throws Exception {
public void testConnectionUri() {
String hosts = MONGODB_CONTAINER.getHostAndPort();
- ConnectionString case0 = buildConnectionString(null, null, hosts, null);
+ ConnectionString case0 = buildConnectionString(null, null, "mongodb", hosts, null);
assertEquals(String.format("mongodb://%s", hosts), case0.toString());
- ConnectionString case1 = buildConnectionString("", null, hosts, null);
+ ConnectionString case1 = buildConnectionString("", null, "mongodb", hosts, null);
assertEquals(String.format("mongodb://%s", hosts), case1.toString());
- ConnectionString case2 = buildConnectionString(null, "", hosts, null);
- assertEquals(String.format("mongodb://%s", hosts), case2.toString());
+ ConnectionString case2 = buildConnectionString(null, "", "mongodb-srv", hosts, null);
+ assertEquals(String.format("mongodb-srv://%s", hosts), case2.toString());
ConnectionString case3 =
- buildConnectionString(FLINK_USER, FLINK_USER_PASSWORD, hosts, null);
+ buildConnectionString(FLINK_USER, FLINK_USER_PASSWORD, "mongodb", hosts, null);
assertEquals(FLINK_USER, case3.getUsername());
assertEquals(FLINK_USER_PASSWORD, new String(case3.getPassword()));
}
diff --git a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java
index 654ace41c9a..f7b84e193e5 100644
--- a/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java
+++ b/flink-connector-mongodb-cdc/src/test/java/com/ververica/cdc/connectors/mongodb/table/MongoDBTableFactoryTest.java
@@ -45,6 +45,7 @@
import java.util.Map;
import static com.ververica.cdc.connectors.base.options.SourceOptions.CHUNK_META_GROUP_SIZE;
+import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.MONGODB_SRV_SCHEME;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COPY_EXISTING;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS;
@@ -52,6 +53,7 @@
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE_MB;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
+import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.SCHEME;
import static com.ververica.cdc.connectors.utils.AssertUtils.assertProducedTypeOfSourceFunction;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -111,6 +113,7 @@ public void testCommonProperties() {
MongoDBTableSource expectedSource =
new MongoDBTableSource(
SCHEMA,
+ SCHEME.defaultValue(),
MY_HOSTS,
USER,
PASSWORD,
@@ -133,6 +136,7 @@ public void testCommonProperties() {
@Test
public void testOptionalProperties() {
Map options = getAllOptions();
+ options.put("scheme", MONGODB_SRV_SCHEME);
options.put("connection.options", "replicaSet=test&connectTimeoutMS=300000");
options.put("copy.existing", "false");
options.put("copy.existing.queue.size", "100");
@@ -148,6 +152,7 @@ public void testOptionalProperties() {
MongoDBTableSource expectedSource =
new MongoDBTableSource(
SCHEMA,
+ MONGODB_SRV_SCHEME,
MY_HOSTS,
USER,
PASSWORD,
@@ -182,6 +187,7 @@ public void testMetadataColumns() {
MongoDBTableSource expectedSource =
new MongoDBTableSource(
ResolvedSchemaUtils.getPhysicalSchema(SCHEMA_WITH_METADATA),
+ SCHEME.defaultValue(),
MY_HOSTS,
USER,
PASSWORD,