Skip to content

Conversation

rozza
Copy link
Member

@rozza rozza commented Jun 12, 2024

Adds the mode configuration allowing for different parsing strategies when handling documents that don't match the expected schema during reads.

The options are:

  • FAILFAST (default) throw an exception when parsing a document that doesn't match the schema.
  • PERMISSIVE Sets any invalid fields to null. Combine with the columnNameOfCorruptRecord configuration if you want to store any invalid documents as an extended json string.
  • DROPMALFORMED ignores the whole document.

Adds the columnNameOfCorruptRecord configuration whic extends the PERMISSIVE mode. When configured it saves the whole invalid document as extended json in that column, as long as its defined in the Schema. Inferred schemas will add the columnNameOfCorruptRecord column if set and the mode is PERMISSIVE.

Note: Names derive from existing spark json configurations, from where this feature takes inspiration.

SPARK-327

Adds the `mode` configuration allowing for different parsing strategies when handling documents
that don't match the expected schema during reads.

The options are:

  - `FAILFAST` (default) throw an exception when parsing a document that doesn't match the schema.
  - `PERMISSIVE` Sets any invalid fields to `null`.
     Combine with the `columnNameOfCorruptRecord` configuration if you want to store any invalid documents
     as an extended json string.
  - `DROPMALFORMED` ignores the whole document.

Adds the `columnNameOfCorruptRecord` configuration whic extends the `PERMISSIVE` mode. When configured it
saves the whole invalid document as extended json in that column, as long as its defined in the Schema. Inferred
schemas will add the `columnNameOfCorruptRecord` column if set and the `mode` is `PERMISSIVE`.

Note: Names derive from existing spark json configurations, from where this feature takes inspiration.

SPARK-327

@ParameterizedTest
@ValueSource(strings = {"SINGLE", "MULTIPLE", "ALL"})
@ValueSource(strings = {"SINGLE", "MULTIPLE"})
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes a bug where it was checking the writes against the source db and not the sink db! As you cannot test reading from all collections and writing to that db at the same time. I opted to remove the ALL step. Added SPARK-429 to track.

final Consumer<MongoConfig> setup,
final Consumer<MongoConfig>... consumers) {
testStreamingQuery(writeFormat, mongoConfig, DEFAULT_SCHEMA, null, setup, consumers);
testStreamingQuery(writeFormat, mongoConfig, DEFAULT_SCHEMA, null, null, setup, consumers);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes below are test infrastructure related eg adding stream listener support and / or simplify the testing.

MongoCollection<BsonDocument> collection = writeConfig.withClient(client -> client
.getDatabase(writeConfig.getDatabaseName())
.getCollection(collectionName(), BsonDocument.class));
.getCollection(writeConfig.getCollectionName(), BsonDocument.class));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the write config to source the collection name to write to!!

}
options.put(
ReadConfig.READ_PREFIX + ReadConfig.COLLECTION_NAME_CONFIG, collectionsConfigOptionValue);
options.put(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sets the write config collection name.

@rozza rozza requested a review from vbabanin June 12, 2024 11:31
if (!isPermissive) {
throw e;
}
valueMap.put(field.name(), null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, if one out of N fields in a subdocument is corrupted, the entire subdocument is marked as null. I'm not entirely sure if this behavior aligns with Spark specifications, but it might be more reasonable to mark only the corrupted fields as such, rather than the whole subdocument. What are your thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question. I initially thought this wasn't defined and in a closed source module. However, having searched the spark open source code it is native to their json importer. So I checked the behavior in pyspark and it appears it is recursive!

>>> from pyspark.sql.types import _parse_datatype_string
>>> schema = _parse_datatype_string('struct<a:string,b:struct<c:string,d:bigint>,_corrupt_record:string>')
>>>
>>> json = ['{"a":"ok","b":{"c":"c","d":10}}','{"a":"bad","b":{"c":"c","d":"bad"}}']
>>> stringdf = sc.parallelize(json)
>>> df = spark.read.option('mode', 'PERMISSIVE').schema(schema).json(stringdf)
>>> df.show()
+---+---------+--------------------+
|  a|        b|     _corrupt_record|
+---+---------+--------------------+
| ok|  {c, 10}|                NULL|
|bad|{c, NULL}|{"a":"bad","b":{"...|
+---+---------+--------------------+

I will update :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done and it was simplier than expected to add 👍

Copy link
Member

@vbabanin vbabanin Jun 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks very neat!

DROPMALFORMED;

static ParseMode fromString(final String userParseMode) {
try {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If null is provided through the option() method, it currently results in a NullPointerException. To make this more user-friendly and consistent, we could throw a ConfigException , which provides clearer feedback.

Suggested change
try {
validateConfig(userParseMode, Objects::nonNull, () -> "The userParseMode can't be null");
try {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up reverting as this method is internal and only called via the readConfig method where there is a default value for the parse method.

DROPMALFORMED;

static ParseMode fromString(final String userParseMode) {
try {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up reverting as this method is internal and only called via the readConfig method where there is a default value for the parse method.

if (!isPermissive) {
throw e;
}
valueMap.put(field.name(), null);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done and it was simplier than expected to add 👍

map.put(k, convertBsonValue(createFieldPath(fieldName, k), dataType.valueType(), v)));

return JavaScala.asScala(map);
return scala.collection.immutable.Map$.MODULE$.from(JavaScala.asScala(map));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found this bug when converting a row to json and Spark errored due to it not being the correct (immutable) scala map type.

@rozza rozza requested a review from vbabanin June 27, 2024 12:26
Copy link
Member

@vbabanin vbabanin Jun 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In PERMISSIVE mode, I see that corrupted fields are set to null, regardless of specifying them as non-nullable.

However, in covertToRow() method when non-nullable field defined in schema is missing in the document, we throw an exception: com.mongodb.spark.sql.connector.exceptions.DataException: Missing field 'fieldName'

   if (hasField || field.nullable()) {
        values.add(
            hasField
                ? convertBsonValue(fullFieldPath, field.dataType(), bsonDocument.get(field.name()))
                : null);
      } else {
        throw missingFieldException(fullFieldPath, bsonDocument);
      }  

Given the possibility that users might have unstructured data in DB, where documents may vary in field counts, should we consider setting absent fields expected by the schema as null when operating in PERMISSIVE mode? I'm concerned that throwing exceptions in these situations could create issues for some users.

In similar contexts, Spark's strategy with

  • CSV files sets missing fields to null when a record has fewer tokens than the schema expects.
  • Avro, in PERMISSIVE mode: 'Corrupt records are processed as null result. Therefore, the data schema is forced to be fully nullable, which might be different from the one user provided,' implying a schema that is forced to full nullability.

(Note: I have not tested the above scenarios from the Spark's documentation).

As another option, we might also consider introducing another mode that allows setting nulls on missing fields without triggering failures. What are your thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, now force nullability. This is inline with json parsing in Spark.

@rozza rozza requested a review from vbabanin July 9, 2024 16:44
Copy link
Member

@vbabanin vbabanin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@rozza rozza merged commit c4043ae into mongodb:main Jul 15, 2024
@rozza rozza deleted the SPARK-327 branch July 15, 2024 10:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants