-
Notifications
You must be signed in to change notification settings - Fork 318
Added parse mode support when reading data from MongoDB. #119
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Adds the `mode` configuration allowing for different parsing strategies when handling documents that don't match the expected schema during reads. The options are: - `FAILFAST` (default) throw an exception when parsing a document that doesn't match the schema. - `PERMISSIVE` Sets any invalid fields to `null`. Combine with the `columnNameOfCorruptRecord` configuration if you want to store any invalid documents as an extended json string. - `DROPMALFORMED` ignores the whole document. Adds the `columnNameOfCorruptRecord` configuration whic extends the `PERMISSIVE` mode. When configured it saves the whole invalid document as extended json in that column, as long as its defined in the Schema. Inferred schemas will add the `columnNameOfCorruptRecord` column if set and the `mode` is `PERMISSIVE`. Note: Names derive from existing spark json configurations, from where this feature takes inspiration. SPARK-327
|
||
@ParameterizedTest | ||
@ValueSource(strings = {"SINGLE", "MULTIPLE", "ALL"}) | ||
@ValueSource(strings = {"SINGLE", "MULTIPLE"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fixes a bug where it was checking the writes against the source db and not the sink db! As you cannot test reading from all collections and writing to that db at the same time. I opted to remove the ALL step. Added SPARK-429 to track.
final Consumer<MongoConfig> setup, | ||
final Consumer<MongoConfig>... consumers) { | ||
testStreamingQuery(writeFormat, mongoConfig, DEFAULT_SCHEMA, null, setup, consumers); | ||
testStreamingQuery(writeFormat, mongoConfig, DEFAULT_SCHEMA, null, null, setup, consumers); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes below are test infrastructure related eg adding stream listener support and / or simplify the testing.
MongoCollection<BsonDocument> collection = writeConfig.withClient(client -> client | ||
.getDatabase(writeConfig.getDatabaseName()) | ||
.getCollection(collectionName(), BsonDocument.class)); | ||
.getCollection(writeConfig.getCollectionName(), BsonDocument.class)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the write config to source the collection name to write to!!
} | ||
options.put( | ||
ReadConfig.READ_PREFIX + ReadConfig.COLLECTION_NAME_CONFIG, collectionsConfigOptionValue); | ||
options.put( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sets the write config collection name.
if (!isPermissive) { | ||
throw e; | ||
} | ||
valueMap.put(field.name(), null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, if one out of N fields in a subdocument is corrupted, the entire subdocument is marked as null. I'm not entirely sure if this behavior aligns with Spark specifications, but it might be more reasonable to mark only the corrupted fields as such, rather than the whole subdocument. What are your thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great question. I initially thought this wasn't defined and in a closed source module. However, having searched the spark open source code it is native to their json importer. So I checked the behavior in pyspark and it appears it is recursive!
>>> from pyspark.sql.types import _parse_datatype_string
>>> schema = _parse_datatype_string('struct<a:string,b:struct<c:string,d:bigint>,_corrupt_record:string>')
>>>
>>> json = ['{"a":"ok","b":{"c":"c","d":10}}','{"a":"bad","b":{"c":"c","d":"bad"}}']
>>> stringdf = sc.parallelize(json)
>>> df = spark.read.option('mode', 'PERMISSIVE').schema(schema).json(stringdf)
>>> df.show()
+---+---------+--------------------+
| a| b| _corrupt_record|
+---+---------+--------------------+
| ok| {c, 10}| NULL|
|bad|{c, NULL}|{"a":"bad","b":{"...|
+---+---------+--------------------+
I will update :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done and it was simplier than expected to add 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks very neat!
DROPMALFORMED; | ||
|
||
static ParseMode fromString(final String userParseMode) { | ||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If null
is provided through the option()
method, it currently results in a NullPointerException
. To make this more user-friendly and consistent, we could throw a ConfigException
, which provides clearer feedback.
try { | |
validateConfig(userParseMode, Objects::nonNull, () -> "The userParseMode can't be null"); | |
try { | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ended up reverting as this method is internal and only called via the readConfig method where there is a default value for the parse method.
…g.java Co-authored-by: Viacheslav Babanin <frest0512@gmail.com>
…age from ReadConfig includes a default value
DROPMALFORMED; | ||
|
||
static ParseMode fromString(final String userParseMode) { | ||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ended up reverting as this method is internal and only called via the readConfig method where there is a default value for the parse method.
if (!isPermissive) { | ||
throw e; | ||
} | ||
valueMap.put(field.name(), null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done and it was simplier than expected to add 👍
map.put(k, convertBsonValue(createFieldPath(fieldName, k), dataType.valueType(), v))); | ||
|
||
return JavaScala.asScala(map); | ||
return scala.collection.immutable.Map$.MODULE$.from(JavaScala.asScala(map)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found this bug when converting a row to json and Spark errored due to it not being the correct (immutable) scala map type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In PERMISSIVE mode, I see that corrupted fields are set to null, regardless of specifying them as non-nullable.
However, in covertToRow()
method when non-nullable field defined in schema is missing in the document, we throw an exception: com.mongodb.spark.sql.connector.exceptions.DataException: Missing field 'fieldName'
if (hasField || field.nullable()) {
values.add(
hasField
? convertBsonValue(fullFieldPath, field.dataType(), bsonDocument.get(field.name()))
: null);
} else {
throw missingFieldException(fullFieldPath, bsonDocument);
}
Given the possibility that users might have unstructured data in DB, where documents may vary in field counts, should we consider setting absent fields expected by the schema as null when operating in PERMISSIVE mode? I'm concerned that throwing exceptions in these situations could create issues for some users.
In similar contexts, Spark's strategy with
- CSV files sets missing fields to null when a record has fewer tokens than the schema expects.
- Avro, in PERMISSIVE mode: 'Corrupt records are processed as null result. Therefore, the data schema is forced to be fully nullable, which might be different from the one user provided,' implying a schema that is forced to full nullability.
(Note: I have not tested the above scenarios from the Spark's documentation).
As another option, we might also consider introducing another mode that allows setting nulls on missing fields without triggering failures. What are your thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, now force nullability. This is inline with json parsing in Spark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Adds the
mode
configuration allowing for different parsing strategies when handling documents that don't match the expected schema during reads.The options are:
FAILFAST
(default) throw an exception when parsing a document that doesn't match the schema.PERMISSIVE
Sets any invalid fields tonull
. Combine with thecolumnNameOfCorruptRecord
configuration if you want to store any invalid documents as an extended json string.DROPMALFORMED
ignores the whole document.Adds the
columnNameOfCorruptRecord
configuration whic extends thePERMISSIVE
mode. When configured it saves the whole invalid document as extended json in that column, as long as its defined in the Schema. Inferred schemas will add thecolumnNameOfCorruptRecord
column if set and themode
isPERMISSIVE
.Note: Names derive from existing spark json configurations, from where this feature takes inspiration.
SPARK-327