Skip to content

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.mongodb.spark.sql.connector.config.MongoConfig.COMMENT_CONFIG;
import static com.mongodb.spark.sql.connector.interop.JavaScala.asJava;
import static java.lang.String.format;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.apache.spark.sql.types.DataTypes.createStructField;
Expand All @@ -26,6 +27,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assumptions.assumeTrue;

Expand All @@ -41,6 +43,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.spark.SparkException;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrameReader;
Expand All @@ -49,6 +52,7 @@
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.bson.BsonDocument;
Expand Down Expand Up @@ -653,6 +657,7 @@ void testLogsCommentsInProfilerLogs() {

List<BsonDocument> collectionData =
toBsonDocuments(spark.read().textFile(READ_RESOURCES_HOBBITS_JSON_PATH));
getCollection().drop();
getCollection().insertMany(collectionData);

ReadConfig readConfig = MongoConfig.readConfig(asJava(spark.initialSessionOptions()))
Expand All @@ -677,6 +682,70 @@ void testLogsCommentsInProfilerLogs() {
readConfig);
}

@Test
void testReadsWithParseMode() {
SparkSession spark = getOrCreateSparkSession();

List<BsonDocument> collectionData = asList(
BsonDocument.parse("{_id: 1, name: 'Ada Lovelace', address: {street: 'St James Square'}}"),
BsonDocument.parse(
"{_id: 2, name: 'Charles Babbage', address: {street: '5 Devonshire Street'}}"),
BsonDocument.parse(
"{_id: 3, name: 'Alan Turing', address: '{\"street\": \"Bletchley Hall\"}'}"),
BsonDocument.parse("{_id: 4, name: 'Timothy Berners-Lee', address: {}}"));
getCollection().insertMany(collectionData);

StructType schema = createStructType(asList(
createStructField("_id", DataTypes.IntegerType, false),
createStructField("name", DataTypes.StringType, true),
createStructField("address", DataType.fromDDL("street STRING"), true)));

DataFrameReader errorDFR = spark.read().format("mongodb").schema(schema);
assertThrows(SparkException.class, () -> errorDFR.load().toJSON().collect());
assertThrows(SparkException.class, () -> errorDFR
.option(ReadConfig.PARSE_MODE, ReadConfig.ParseMode.FAILFAST.name())
.load()
.toJSON()
.collect());

DataFrameReader dfr = spark.read().format("mongodb").schema(schema);
List<BsonDocument> expectedData =
collectionData.stream().map(BsonDocument::clone).collect(Collectors.toList());
expectedData.remove(2);
assertEquals(
expectedData,
toBsonDocuments(dfr.option(ReadConfig.PARSE_MODE, ReadConfig.ParseMode.DROPMALFORMED.name())
.load()
.toJSON()));

// Note toJSON uses the default ignoreNullFields = true
dfr = dfr.option(ReadConfig.PARSE_MODE, ReadConfig.ParseMode.PERMISSIVE.name());
expectedData = collectionData.stream().map(BsonDocument::clone).collect(Collectors.toList());
expectedData.remove(2);
expectedData.add(2, BsonDocument.parse("{_id: 3, name: 'Alan Turing'}"));
assertEquals(expectedData, toBsonDocuments(dfr.load().toJSON()));

// No corrupted field in the schema - so ignore
String corruptedField = "_corrupted";
dfr = dfr.option(ReadConfig.COLUMN_NAME_OF_CORRUPT_RECORD, corruptedField);
assertEquals(expectedData, toBsonDocuments(dfr.load().toJSON()));

// With corrupted field
StructType schemaWithCorruptedField = schema.add(corruptedField, DataTypes.StringType);
dfr = dfr.schema(schemaWithCorruptedField);
expectedData = collectionData.stream().map(BsonDocument::clone).collect(Collectors.toList());

expectedData.remove(2);
expectedData.add(
2,
BsonDocument.parse(
format(
"{_id: 3, name: 'Alan Turing', _corrupted: '%s'}",
"{\"_id\": 3, \"name\": \"Alan Turing\", \"address\": \"{\\\\\"street\\\\\": \\\\\"Bletchley Hall\\\\\"}\"}")));

assertEquals(expectedData, toBsonDocuments(dfr.load().toJSON()));
}

private List<BsonDocument> toBsonDocuments(final Dataset<String> dataset) {
return dataset.toJavaRDD().map(BsonDocument::parse).collect();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,66 @@ public final class ReadConfig extends AbstractMongoConfig {

private static final boolean AGGREGATION_ALLOW_DISK_USE_DEFAULT = true;

/**
* Parse Mode using the existing Spark json parse mode configuration names
*/
public enum ParseMode {
/**
* Fail if a data error occurs
*/
FAILFAST,
/**
* Ignore any data errors for a field and replace with null
*/
PERMISSIVE,
/**
* Drop any rows that contains a data error
*/
DROPMALFORMED;

static ParseMode fromString(final String userParseMode) {
try {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If null is provided through the option() method, it currently results in a NullPointerException. To make this more user-friendly and consistent, we could throw a ConfigException , which provides clearer feedback.

Suggested change
try {
validateConfig(userParseMode, Objects::nonNull, () -> "The userParseMode can't be null");
try {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up reverting as this method is internal and only called via the readConfig method where there is a default value for the parse method.

return ParseMode.valueOf(userParseMode.toUpperCase());
} catch (IllegalArgumentException e) {
throw new ConfigException(format("'%s' is not a valid Parse mode", userParseMode));
}
}
}

/**
* Parsing strategy for handling corrupt records during reads.
*
* <ul>
* <li>{@code PERMISSIVE}: When it meets a corrupted record, sets any malformed fields to null.
* Configure the {@value COLUMN_NAME_OF_CORRUPT_RECORD} if you want to store the whole record
* as an extended json string when encountering a corrupt record.
* When inferring a schema, it will implicitly add a {@value COLUMN_NAME_OF_CORRUPT_RECORD} field
* in the output schema if configured.</li>
* <li>{@code DROPMALFORMED}: ignores the whole corrupted records.</li>
* <li>{@code FAILFAST}: throws an exception when it meets corrupted records.</li>
* </ul>
*
* <p>Note: A "corrupt record" is any document that doesn't match schema of the dataset.
*
* <p>Configuration: {@value}
*
* <p>Default: FAILFAST.
*/
public static final String PARSE_MODE = "mode";

private static final String PARSE_MODE_DEFAULT = ParseMode.FAILFAST.name();

/**
* Allows renaming the new field having malformed string created by PERMISSIVE mode.
*
* <p>Configuration: {@value}
*
* <p>Default: {@value COLUMN_NAME_OF_CORRUPT_RECORD_DEFAULT}.
*/
public static final String COLUMN_NAME_OF_CORRUPT_RECORD = "columnNameOfCorruptRecord";

private static final String COLUMN_NAME_OF_CORRUPT_RECORD_DEFAULT = EMPTY_STRING;

/**
* Publish Full Document only when streaming.
*
Expand Down Expand Up @@ -295,6 +355,7 @@ static StreamingStartupMode fromString(final String userStartupMode) {
private static final boolean OUTPUT_EXTENDED_JSON_DEFAULT = false;

private final List<BsonDocument> aggregationPipeline;
private final ParseMode parseMode;

/**
* Construct a new instance
Expand All @@ -304,6 +365,7 @@ static StreamingStartupMode fromString(final String userStartupMode) {
ReadConfig(final Map<String, String> options) {
super(options, UsageMode.READ);
aggregationPipeline = generateAggregationPipeline();
parseMode = ParseMode.fromString(getOrDefault(PARSE_MODE, PARSE_MODE_DEFAULT));
}

@Override
Expand Down Expand Up @@ -381,6 +443,23 @@ public FullDocument getStreamFullDocument() {
}
}

/** @return true if should drop any malformed rows */
public boolean dropMalformed() {
return parseMode == ParseMode.DROPMALFORMED;
}

/** @return true if should allow permissive handling of parse errors */
public boolean isPermissive() {
return parseMode == ParseMode.PERMISSIVE;
}

/** @return the configured column name to store corrupt documents in or an empty string if not parsing in permissive mode */
public String getColumnNameOfCorruptRecord() {
return isPermissive()
? getOrDefault(COLUMN_NAME_OF_CORRUPT_RECORD, COLUMN_NAME_OF_CORRUPT_RECORD_DEFAULT)
: COLUMN_NAME_OF_CORRUPT_RECORD_DEFAULT;
}

/**
* Returns the initial start at operation time for a stream
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ final class MongoBatch implements Batch {
MongoBatch(final StructType schema, final ReadConfig readConfig) {
this.schema = schema;
this.readConfig = readConfig;
this.bsonDocumentToRowConverter =
new BsonDocumentToRowConverter(schema, readConfig.outputExtendedJson());
this.bsonDocumentToRowConverter = new BsonDocumentToRowConverter(schema, readConfig);
}

/** Returns a list of partitions that split the collection into parts */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.mongodb.spark.sql.connector.assertions.Assertions;
import com.mongodb.spark.sql.connector.config.ReadConfig;
import com.mongodb.spark.sql.connector.schema.BsonDocumentToRowConverter;
import java.util.Optional;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.read.PartitionReader;
import org.bson.BsonDocument;
Expand Down Expand Up @@ -65,15 +66,20 @@ class MongoBatchPartitionReader implements PartitionReader<InternalRow> {
bsonDocumentToRowConverter.getSchema());
}

/** Proceed to next record, returns false if there is no more records. */
/** Proceed to next record, returns false if there are no more records. */
@Override
public boolean next() {
Assertions.ensureState(() -> !closed, () -> "Cannot call next() on a closed PartitionReader.");
boolean hasNext = getCursor().hasNext();
if (hasNext) {
currentRow = bsonDocumentToRowConverter.toInternalRow(getCursor().next());
while (getCursor().hasNext()) {
BsonDocument next = getCursor().next();
// If there is a result return it otherwise try again (eg: ReadConfig.dropMalformed())
Optional<InternalRow> internalRowOptional = bsonDocumentToRowConverter.toInternalRow(next);
if (internalRowOptional.isPresent()) {
currentRow = internalRowOptional.get();
return true;
}
}
return hasNext;
return false;
}

/** Return the current record. This method should return same value until `next` is called. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.mongodb.spark.sql.connector.schema.BsonDocumentToRowConverter;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.read.streaming.ContinuousPartitionReader;
Expand Down Expand Up @@ -130,21 +131,34 @@ public void close() {
}
}

/**
* Used within a loop as continuous streams are required to block until {@code ContinuousPartitionReader#next}
* returns a result.
*
* <p>Ensures the cursor resources are managed correctly in case of error retrieving a result.
*
* @return true if there is a result available else false
*/
private boolean tryNext() {
return withCursor(c -> {
return withCursor(cursor -> {
try {
if (c.hasNext()) {
BsonDocument next = c.next();
if (next.containsKey("_id") && next.isDocument("_id")) {
setLastOffset(next.getDocument("_id"));
}
currentRow = null;
BsonDocument next = cursor.tryNext();
setLastOffset(cursor.getResumeToken());
if (next != null) {
if (readConfig.streamPublishFullDocumentOnly()) {
next = next.getDocument(FULL_DOCUMENT, new BsonDocument());
}
currentRow = bsonDocumentToRowConverter.toInternalRow(next);
return true;

// If there is a result return it otherwise try again (eg: ReadConfig.dropMalformed())
Optional<InternalRow> internalRowOptional =
bsonDocumentToRowConverter.toInternalRow(next);
if (internalRowOptional.isPresent()) {
currentRow = internalRowOptional.get();
return true;
}
}
setLastOffset(c.getResumeToken());
return false;
} catch (MongoException e) {
LOGGER.info("Trying to get more data from the change stream failed, releasing cursor.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ final class MongoContinuousStream implements ContinuousStream {
checkpointLocation,
MongoOffset.getInitialOffset(readConfig));
this.readConfig = readConfig;
this.bsonDocumentToRowConverter =
new BsonDocumentToRowConverter(schema, readConfig.outputExtendedJson());
this.bsonDocumentToRowConverter = new BsonDocumentToRowConverter(schema, readConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.mongodb.spark.sql.connector.schema.BsonDocumentToRowConverter;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.read.PartitionReader;
import org.bson.BsonDocument;
Expand Down Expand Up @@ -98,28 +99,33 @@ public boolean next() {
Assertions.ensureState(() -> !closed, () -> "Cannot call next() on a closed PartitionReader.");

MongoChangeStreamCursor<BsonDocument> cursor = getCursor();
BsonDocument cursorNext;
BsonDocument next;

do {
try {
cursorNext = cursor.tryNext();
currentRow = null;
next = cursor.tryNext();
if (next != null) {
if (readConfig.streamPublishFullDocumentOnly()) {
next = next.getDocument(FULL_DOCUMENT, new BsonDocument());
}

// If there is a result return it otherwise try again (eg: ReadConfig.dropMalformed())
Optional<InternalRow> internalRowOptional =
bsonDocumentToRowConverter.toInternalRow(next);
if (internalRowOptional.isPresent()) {
currentRow = internalRowOptional.get();
return true;
}
}
} catch (RuntimeException e) {
throw new MongoSparkException("Calling `cursor.tryNext()` errored.", e);
}
} while (cursorNext == null
&& cursor.getServerCursor() != null
} while (cursor.getServerCursor() != null
&& (cursor.getResumeToken() == null
|| getTimestamp(cursor.getResumeToken()).compareTo(partition.getEndOffsetTimestamp())
< 0));

boolean hasNext = cursorNext != null;
if (hasNext) {
if (readConfig.streamPublishFullDocumentOnly()) {
cursorNext = cursorNext.getDocument(FULL_DOCUMENT, new BsonDocument());
}
currentRow = bsonDocumentToRowConverter.toInternalRow(cursorNext);
}
return hasNext;
return false;
}

/** Return the current record. This method should return same value until `next` is called. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ final class MongoMicroBatchStream implements MicroBatchStream {
checkpointLocation,
MongoOffset.getInitialOffset(readConfig));
this.readConfig = readConfig;
this.bsonDocumentToRowConverter =
new BsonDocumentToRowConverter(schema, readConfig.outputExtendedJson());
this.bsonDocumentToRowConverter = new BsonDocumentToRowConverter(schema, readConfig);
}

@Override
Expand Down
Loading