Skip to content

Commit

Permalink
✨ Source MongoDB Internal POC: Check Operation (airbytehq#28946)
Browse files Browse the repository at this point in the history
* Implement check operation

* Formatting
  • Loading branch information
jdpgrailsdev authored Aug 2, 2023
1 parent c32cc25 commit 407e834
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,17 @@

package io.airbyte.integrations.source.mongodb.internal;

import static io.airbyte.integrations.source.mongodb.internal.MongoConstants.AUTH_SOURCE_CONFIGURATION_KEY;
import static io.airbyte.integrations.source.mongodb.internal.MongoConstants.CONNECTION_STRING_CONFIGURATION_KEY;
import static io.airbyte.integrations.source.mongodb.internal.MongoConstants.PASSWORD_CONFIGURATION_KEY;
import static io.airbyte.integrations.source.mongodb.internal.MongoConstants.REPLICA_SET_CONFIGURATION_KEY;
import static io.airbyte.integrations.source.mongodb.internal.MongoConstants.USER_CONFIGURATION_KEY;

import com.fasterxml.jackson.databind.JsonNode;
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.MongoCredential;
import com.mongodb.MongoDriverInformation;
import com.mongodb.ReadPreference;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
Expand All @@ -24,24 +31,28 @@ public class MongoConnectionUtils {
* @return The configured {@link MongoClient}.
*/
public static MongoClient createMongoClient(final JsonNode config) {
final String authSource = config.get("auth_source").asText();
final String connectionString = config.get("connection_string").asText();
final String replicaSet = config.get("replica_set").asText();
final String authSource = config.get(AUTH_SOURCE_CONFIGURATION_KEY).asText();
final String connectionString = config.get(CONNECTION_STRING_CONFIGURATION_KEY).asText();
final String replicaSet = config.get(REPLICA_SET_CONFIGURATION_KEY).asText();

final ConnectionString mongoConnectionString = new ConnectionString(connectionString + "?replicaSet=" +
replicaSet + "&retryWrites=false&provider=airbyte&tls=true");

final MongoDriverInformation mongoDriverInformation = MongoDriverInformation.builder()
.driverName("Airbyte")
.build();

final MongoClientSettings.Builder mongoClientSettingsBuilder = MongoClientSettings.builder()
.applyConnectionString(mongoConnectionString)
.readPreference(ReadPreference.secondaryPreferred());

if (config.has("user") && config.has("password")) {
final String user = config.get("user").asText();
final String password = config.get("password").asText();
if (config.has(USER_CONFIGURATION_KEY) && config.has(PASSWORD_CONFIGURATION_KEY)) {
final String user = config.get(USER_CONFIGURATION_KEY).asText();
final String password = config.get(PASSWORD_CONFIGURATION_KEY).asText();
mongoClientSettingsBuilder.credential(MongoCredential.createCredential(user, authSource, password.toCharArray()));
}

return MongoClients.create(mongoClientSettingsBuilder.build());
return MongoClients.create(mongoClientSettingsBuilder.build(), mongoDriverInformation);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.mongodb.internal;

public class MongoConstants {

public static final String AUTH_SOURCE_CONFIGURATION_KEY = "auth_source";
public static final String CONNECTION_STRING_CONFIGURATION_KEY = "connection_string";
public static final String DATABASE_CONFIGURATION_KEY = "database";
public static final String PASSWORD_CONFIGURATION_KEY = "password";
public static final String REPLICA_SET_CONFIGURATION_KEY = "replica_set";
public static final String USER_CONFIGURATION_KEY = "user";

private MongoConstants() {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.source.mongodb.internal;

import static io.airbyte.integrations.source.mongodb.internal.MongoConstants.DATABASE_CONFIGURATION_KEY;

import com.fasterxml.jackson.databind.JsonNode;
import com.mongodb.MongoCommandException;
import com.mongodb.MongoException;
Expand All @@ -12,6 +14,7 @@
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.connection.ClusterType;
import io.airbyte.commons.exceptions.ConnectionErrorException;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.integrations.BaseConnector;
Expand All @@ -36,7 +39,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoDbSource extends BaseConnector implements Source, AutoCloseable {
public class MongoDbSource extends BaseConnector implements Source {

private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbSource.class);

Expand All @@ -54,8 +57,34 @@ public static void main(final String[] args) throws Exception {
}

@Override
public AirbyteConnectionStatus check(final JsonNode config) throws Exception {
return null;
public AirbyteConnectionStatus check(final JsonNode config) {
try (final MongoClient mongoClient = MongoConnectionUtils.createMongoClient(config)) {
final String databaseName = config.get(DATABASE_CONFIGURATION_KEY).asText();

/*
* Perform the authorized collections check before the cluster type check. The MongoDB Java driver
* needs to actually execute a command in order to fetch the cluster description. Querying for the
* authorized collections guarantees that the cluster description will be available to the driver.
*/
if (getAuthorizedCollections(mongoClient, databaseName).isEmpty()) {
return new AirbyteConnectionStatus()
.withMessage("Target MongoDB database does not contain any authorized collections.")
.withStatus(AirbyteConnectionStatus.Status.FAILED);
}
if (!ClusterType.REPLICA_SET.equals(mongoClient.getClusterDescription().getType())) {
return new AirbyteConnectionStatus()
.withMessage("Target MongoDB instance is not a replica set cluster.")
.withStatus(AirbyteConnectionStatus.Status.FAILED);
}
} catch (final Exception e) {
LOGGER.error("Unable to perform source check operation.", e);
return new AirbyteConnectionStatus()
.withMessage(e.getMessage())
.withStatus(AirbyteConnectionStatus.Status.FAILED);
}

LOGGER.info("The source passed the check operation test!");
return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED);
}

@Override
Expand All @@ -72,11 +101,6 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
return null;
}

@Override
public void close() throws Exception {

}

private Set<String> getAuthorizedCollections(final MongoClient mongoClient, final String databaseName) {
/*
* db.runCommand ({listCollections: 1.0, authorizedCollections: true, nameOnly: true }) the command
Expand Down Expand Up @@ -108,9 +132,10 @@ private Set<String> getAuthorizedCollections(final MongoClient mongoClient, fina
private List<AirbyteStream> discoverInternal(final JsonNode config) {
final List<AirbyteStream> streams = new ArrayList<>();
try (final MongoClient mongoClient = MongoConnectionUtils.createMongoClient(config)) {
final Set<String> authorizedCollections = getAuthorizedCollections(mongoClient, config.get("database").asText());
final String databaseName = config.get(DATABASE_CONFIGURATION_KEY).asText();
final Set<String> authorizedCollections = getAuthorizedCollections(mongoClient, databaseName);
authorizedCollections.parallelStream().forEach(collectionName -> {
final List<Field> fields = getFields(mongoClient.getDatabase(config.get("database").asText()).getCollection(collectionName));
final List<Field> fields = getFields(mongoClient.getDatabase(databaseName).getCollection(collectionName));
streams.add(CatalogHelpers.createAirbyteStream(collectionName, "", fields));
});
return streams;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@

package io.airbyte.integrations.source.mongodb.internal;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static io.airbyte.integrations.source.mongodb.internal.MongoConstants.DATABASE_CONFIGURATION_KEY;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
Expand All @@ -19,8 +17,6 @@
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.AirbyteCatalog;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
Expand All @@ -32,11 +28,9 @@
import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import org.bson.BsonArray;
import org.bson.BsonString;
import org.bson.Document;
import org.junit.jupiter.api.Test;

public class MongoDbSourceAcceptanceTest extends SourceAcceptanceTest {

Expand All @@ -56,7 +50,7 @@ protected void setupEnvironment(final TestDestinationEnv testEnv) throws IOExcep
}

config = Jsons.deserialize(Files.readString(CREDENTIALS_PATH));
((ObjectNode) config).put("database", DATABASE_NAME);
((ObjectNode) config).put(DATABASE_CONFIGURATION_KEY, DATABASE_NAME);

mongoClient = MongoConnectionUtils.createMongoClient(config);

Expand Down Expand Up @@ -130,22 +124,4 @@ protected JsonNode getState() {
return Jsons.jsonNode(new HashMap<>());
}

@Test
void discoverCatalog() throws Exception {
final AirbyteCatalog catalog = new MongoDbSource().discover(config);
assertNotNull(catalog);

final Optional<AirbyteStream> stream = catalog.getStreams().stream().filter(n -> n.getName().equalsIgnoreCase(COLLECTION_NAME)).findFirst();
assertTrue(stream.isPresent());

final JsonNode schema = stream.get().getJsonSchema();
assertEquals("number", schema.get("properties").get("double_test").get("type").asText());
assertEquals("string", schema.get("properties").get("test").get("type").asText());
assertEquals("string", schema.get("properties").get("name").get("type").asText());
assertEquals("string", schema.get("properties").get("_id").get("type").asText());
assertEquals("string", schema.get("properties").get("id").get("type").asText());
assertEquals("object", schema.get("properties").get("object_test").get("type").asText());
assertEquals("number", schema.get("properties").get("int_test").get("type").asText());
}

}

0 comments on commit 407e834

Please sign in to comment.