Skip to content

Commit

Permalink
✨ Source Mongodb POC: Ignore data type for discovered field uniqueness (
Browse files Browse the repository at this point in the history
airbytehq#29168)

* Ignore data type for discovered field uniqueness

* Add unit test

* Formatting
  • Loading branch information
jdpgrailsdev authored Aug 8, 2023
1 parent b277cd3 commit 848fb74
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 39 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.mongodb.internal;

import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import java.util.Objects;

/**
* Custom implementation of {@link Field} that only uses the name of the field for equality. This is
* to support MongoDB's unstructured documents which may contain more than one document with the
* same field name, but different data type.
*/
public class MongoField extends Field {

public MongoField(String name, JsonSchemaType type) {
super(name, type);
}

public boolean equals(Object o) {
if (this == o) {
return true;
} else if (o != null && this.getClass() == o.getClass()) {
final MongoField field = (MongoField) o;
return this.getName().equals(field.getName());
} else {
return false;
}
}

public int hashCode() {
return Objects.hash(new Object[] {this.getName()});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.source.mongodb.internal;

import static io.airbyte.integrations.source.mongodb.internal.MongoCatalogHelper.DEFAULT_CURSOR_FIELD;

import com.mongodb.MongoCommandException;
import com.mongodb.MongoException;
import com.mongodb.MongoSecurityException;
Expand All @@ -27,8 +29,6 @@
import org.bson.Document;
import org.bson.conversions.Bson;

import static io.airbyte.integrations.source.mongodb.internal.MongoCatalogHelper.DEFAULT_CURSOR_FIELD;

public class MongoUtil {

/**
Expand Down Expand Up @@ -89,21 +89,18 @@ public static Set<String> getAuthorizedCollections(final MongoClient mongoClient
* database.
*/
public static List<AirbyteStream> getAirbyteStreams(final MongoClient mongoClient, final String databaseName) {
final List<AirbyteStream> streams = new ArrayList<>();
final Set<String> authorizedCollections = getAuthorizedCollections(mongoClient, databaseName);
authorizedCollections.parallelStream().forEach(collectionName -> {
return authorizedCollections.parallelStream().map(collectionName -> {
/*
* Fetch the keys/types from the first N documents and the last N documents from the collection.
* This is an attempt to "survey" the documents in the collection for variance in the schema keys.
*/
final Set<Field> fields1 = getFieldsInCollection(mongoClient.getDatabase(databaseName).getCollection(collectionName), Optional.empty());
final Set<Field> fields2 =
getFieldsInCollection(mongoClient.getDatabase(databaseName).getCollection(collectionName), Optional.of(DEFAULT_CURSOR_FIELD));
fields1.addAll(fields2);

streams.add(createAirbyteStream(collectionName, databaseName, new ArrayList<>(fields1)));
});
return streams;
final Set<Field> discoveredFields = new HashSet<>();
final MongoCollection<Document> mongoCollection = mongoClient.getDatabase(databaseName).getCollection(collectionName);
discoveredFields.addAll(getFieldsInCollection(mongoCollection, Optional.empty()));
discoveredFields.addAll(getFieldsInCollection(mongoCollection, Optional.of(DEFAULT_CURSOR_FIELD)));
return createAirbyteStream(collectionName, databaseName, new ArrayList<>(discoveredFields));
}).collect(Collectors.toList());
}

private static AirbyteStream createAirbyteStream(final String collectionName, final String databaseName, final List<Field> fields) {
Expand Down Expand Up @@ -136,7 +133,7 @@ private static Set<Field> getFieldsInCollection(final MongoCollection collection
while (cursor.hasNext()) {
final Map<String, String> fields = ((List<Map<String, String>>) cursor.next().get("fields")).get(0);
discoveredFields.addAll(fields.entrySet().stream()
.map(e -> new Field(e.getKey(), convertToSchemaType(e.getValue())))
.map(e -> new MongoField(e.getKey(), convertToSchemaType(e.getValue())))
.collect(Collectors.toSet()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@
package io.airbyte.integrations.source.mongodb.internal;

import static io.airbyte.integrations.source.mongodb.internal.MongoCatalogHelper.DEFAULT_CURSOR_FIELD;
import static io.airbyte.integrations.source.mongodb.internal.MongoCatalogHelper.SUPPORTED_SYNC_MODES;
import static io.airbyte.integrations.source.mongodb.internal.MongoConstants.DATABASE_CONFIGURATION_KEY;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Lists;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import io.airbyte.commons.json.Jsons;
Expand All @@ -20,7 +18,6 @@
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.ConnectorSpecification;
Expand All @@ -31,8 +28,6 @@
import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;

import org.bson.BsonArray;
import org.bson.BsonString;
import org.bson.Document;
Expand Down Expand Up @@ -102,26 +97,23 @@ protected JsonNode getConfig() {
@Override
protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
final List<Field> fields = List.of(
Field.of(DEFAULT_CURSOR_FIELD, JsonSchemaType.STRING),
Field.of("id", JsonSchemaType.STRING),
Field.of("name", JsonSchemaType.STRING),
Field.of("test", JsonSchemaType.STRING),
Field.of("test_array", JsonSchemaType.ARRAY),
Field.of("empty_test", JsonSchemaType.STRING),
Field.of("double_test", JsonSchemaType.NUMBER),
Field.of("int_test", JsonSchemaType.NUMBER),
Field.of("object_test", JsonSchemaType.OBJECT)
);
Field.of(DEFAULT_CURSOR_FIELD, JsonSchemaType.STRING),
Field.of("id", JsonSchemaType.STRING),
Field.of("name", JsonSchemaType.STRING),
Field.of("test", JsonSchemaType.STRING),
Field.of("test_array", JsonSchemaType.ARRAY),
Field.of("empty_test", JsonSchemaType.STRING),
Field.of("double_test", JsonSchemaType.NUMBER),
Field.of("int_test", JsonSchemaType.NUMBER),
Field.of("object_test", JsonSchemaType.OBJECT));
final List<AirbyteStream> airbyteStreams = List.of(
MongoCatalogHelper.buildAirbyteStream(COLLECTION_NAME, DATABASE_NAME, fields),
MongoCatalogHelper.buildAirbyteStream(COLLECTION_NAME, DATABASE_NAME, fields));
MongoCatalogHelper.buildAirbyteStream(COLLECTION_NAME, DATABASE_NAME, fields),
MongoCatalogHelper.buildAirbyteStream(COLLECTION_NAME, DATABASE_NAME, fields));

return new ConfiguredAirbyteCatalog().withStreams(
List.of(
convertToConfiguredAirbyteStream(airbyteStreams.get(0), SyncMode.INCREMENTAL),
convertToConfiguredAirbyteStream(airbyteStreams.get(1), SyncMode.FULL_REFRESH)
)
);
List.of(
convertToConfiguredAirbyteStream(airbyteStreams.get(0), SyncMode.INCREMENTAL),
convertToConfiguredAirbyteStream(airbyteStreams.get(1), SyncMode.FULL_REFRESH)));
}

@Override
Expand All @@ -131,9 +123,10 @@ protected JsonNode getState() {

private ConfiguredAirbyteStream convertToConfiguredAirbyteStream(final AirbyteStream airbyteStream, final SyncMode syncMode) {
return new ConfiguredAirbyteStream()
.withSyncMode(syncMode)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withCursorField(List.of(DEFAULT_CURSOR_FIELD))
.withStream(airbyteStream);
.withSyncMode(syncMode)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withCursorField(List.of(DEFAULT_CURSOR_FIELD))
.withStream(airbyteStream);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.mongodb.internal;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.core.type.TypeReference;
import com.mongodb.client.AggregateIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.AirbyteStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.bson.Document;
import org.junit.jupiter.api.Test;

public class MongoUtilTest {

@Test
void testGetAirbyteStreams() throws IOException {
final AggregateIterable<Document> aggregateIterable = mock(AggregateIterable.class);
final MongoCursor<Document> cursor = mock(MongoCursor.class);
final String databaseName = "database";
final Document authorizedCollectionsResponse = Document.parse(MoreResources.readResource("authorized_collections_response.json"));
final MongoClient mongoClient = mock(MongoClient.class);
final MongoCollection mongoCollection = mock(MongoCollection.class);
final MongoDatabase mongoDatabase = mock(MongoDatabase.class);
final List<Map<String, Object>> schemaDiscoveryJsonResponses =
Jsons.deserialize(MoreResources.readResource("schema_discovery_response.json"), new TypeReference<>() {});
final List<Document> schemaDiscoveryResponses = schemaDiscoveryJsonResponses.stream().map(s -> new Document(s)).collect(Collectors.toList());

when(cursor.hasNext()).thenReturn(true, true, false);
when(cursor.next()).thenReturn(schemaDiscoveryResponses.get(0), schemaDiscoveryResponses.get(1));
when(aggregateIterable.cursor()).thenReturn(cursor);
when(mongoCollection.aggregate(any())).thenReturn(aggregateIterable);
when(mongoDatabase.getCollection(any())).thenReturn(mongoCollection);
when(mongoDatabase.runCommand(any())).thenReturn(authorizedCollectionsResponse);
when(mongoClient.getDatabase(databaseName)).thenReturn(mongoDatabase);

final List<AirbyteStream> streams = MongoUtil.getAirbyteStreams(mongoClient, databaseName);
assertNotNull(streams);
assertEquals(1, streams.size());
assertEquals(11, streams.get(0).getJsonSchema().get("properties").size());
}

@Test
void testGetAirbyteStreamsDifferentDataTypes() throws IOException {
final AggregateIterable<Document> aggregateIterable = mock(AggregateIterable.class);
final MongoCursor<Document> cursor = mock(MongoCursor.class);
final String databaseName = "database";
final Document authorizedCollectionsResponse = Document.parse(MoreResources.readResource("authorized_collections_response.json"));
final MongoClient mongoClient = mock(MongoClient.class);
final MongoCollection mongoCollection = mock(MongoCollection.class);
final MongoDatabase mongoDatabase = mock(MongoDatabase.class);
final List<Map<String, Object>> schemaDiscoveryJsonResponses =
Jsons.deserialize(MoreResources.readResource("schema_discovery_response_different_datatypes.json"), new TypeReference<>() {});
final List<Document> schemaDiscoveryResponses = schemaDiscoveryJsonResponses.stream().map(s -> new Document(s)).collect(Collectors.toList());

when(cursor.hasNext()).thenReturn(true, true, false);
when(cursor.next()).thenReturn(schemaDiscoveryResponses.get(0), schemaDiscoveryResponses.get(1));
when(aggregateIterable.cursor()).thenReturn(cursor);
when(mongoCollection.aggregate(any())).thenReturn(aggregateIterable);
when(mongoDatabase.getCollection(any())).thenReturn(mongoCollection);
when(mongoDatabase.runCommand(any())).thenReturn(authorizedCollectionsResponse);
when(mongoClient.getDatabase(databaseName)).thenReturn(mongoDatabase);

final List<AirbyteStream> streams = MongoUtil.getAirbyteStreams(mongoClient, databaseName);
assertNotNull(streams);
assertEquals(1, streams.size());
assertEquals(11, streams.get(0).getJsonSchema().get("properties").size());
assertEquals(JsonSchemaType.NUMBER.getJsonSchemaTypeMap().get("type"),
streams.get(0).getJsonSchema().get("properties").get("total").get("type").asText());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
[
{
"_id" : null,
"fields" : [
{
"_id" : "string",
"name" : "string",
"last_updated" : "date",
"total" : "int",
"price" : "decimal",
"items" : "array",
"owners" : "object"
}
]
},
{
"_id" : null,
"fields" : [
{
"_id" : "string",
"name" : "string",
"last_updated" : "date",
"total" : "string",
"price" : "decimal",
"items" : "array",
"owners" : "object",
"other" : "string"
}
]
}
]

0 comments on commit 848fb74

Please sign in to comment.