Skip to content

[SPARK-41993][SQL] Move RowEncoder to AgnosticEncoders #39517

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 16 commits into from

Conversation

hvanhovell
Copy link
Contributor

What changes were proposed in this pull request?

This PR makes RowEncoder produce an AgnosticEncoder. The expression generation for these encoders is moved to ScalaReflection (this will be moved out in a subsequent PR).

The generated serializer and deserializer expressions will slightly change for both schema and type based encoders. These are not semantically different from the old expressions. Concretely the following changes have been introduced:

  • There is more type validation in maps/arrays/seqs for type based encoders. This should be a positive change, since it disallows users to pass wrong data through erasure hackd.
  • Array/Seq serialization is a bit more strict. In the old scenario it was possible to pass in sequences/arrays with the wrong type and/or nullability.

Why are the changes needed?

For the Spark Connect Scala Client we also want to be able to use Row based results.

Does this PR introduce any user-facing change?

No

How was this patch tested?

This is a refactoring, existing tests should be sufficient.

@hvanhovell hvanhovell requested a review from cloud-fan January 11, 2023 20:34
@github-actions github-actions bot added the SQL label Jan 11, 2023
@hvanhovell
Copy link
Contributor Author

@cloud-fan can you take a look?

@hvanhovell
Copy link
Contributor Author

A note for the reviewers. I know that Catalyst tests pass. I have not run other tests, so there might still be a few things to iron out.

@@ -306,7 +330,7 @@ object ScalaReflection extends ScalaReflection {
* input object is located at ordinal 0 of a row, i.e., `BoundReference(0, _)`.
*/
def serializerFor(enc: AgnosticEncoder[_]): Expression = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO check the generated code for boxed primitives. We might be doing double conversions there.

propagateNull = false,
returnNullable = false)
exprs.If(
check,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can widen this to arrays where the element is allowed to be null. In that case we do need to make sure the element type is sound.

@@ -125,7 +125,7 @@ class RowEncoderSuite extends CodegenInterpretedPlanTest {
new StructType()
.add("mapOfIntAndString", MapType(IntegerType, StringType))
.add("mapOfStringAndArray", MapType(StringType, arrayOfString))
.add("mapOfArrayAndInt", MapType(arrayOfString, IntegerType))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

arrayOfString doesn't work anymore inside map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a mistake. My bad :)

walkedTypePath)
expressionWithNullSafety(deserializer, enc.nullable, walkedTypePath)
enc match {
case RowEncoder(fields) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what encoder do we create for inner struct? how is it different from the root RowEncoder?

Copy link
Contributor Author

@hvanhovell hvanhovell Jan 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We create one with null checks. This one does not need them because we always return a Row (the toplevel row always exists).

elementNullable: Boolean,
input: Expression,
lenientSerialization: Boolean): Expression = {
// Default serializer for Seq and generic Arrays. This does not work for primitive arrays.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, why the name is createSerializerForMapObjects?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know. It was like that before this.

element: AgnosticEncoder[E])
element: AgnosticEncoder[E],
containsNull: Boolean,
override val lenientSerialization: Boolean)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does lenient mean for a IterableEncoder?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It means we allow a Seq, a generic Array, or a primitive array as input for serialization

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we leave a code comment to mention it? It's not that obvious compared to DateEncoder.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah will do. TBH I was quite surprised by it.

externalDataTypeFor(enc, lenientSerialization = false)
}

private[catalyst] def lenientExternalDataTypeFor(enc: AgnosticEncoder[_]): DataType =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private[catalyst] def lenientExternalDataTypeFor(enc: AgnosticEncoder[_]): DataType =
private[catalyst] def lenientExternalDataTypeFor(enc: AgnosticEncoder[_]): DataType =

assert(serializer.isInstanceOf[NewInstance])
assert(serializer.asInstanceOf[NewInstance]
.cls.isAssignableFrom(classOf[org.apache.spark.sql.catalyst.util.GenericArrayData]))
assert(serializer.isInstanceOf[MapObjects])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is MapObjects better than NewInstance for creating List[Int]?

@cloud-fan
Copy link
Contributor

Only 2 minor comments, thanks, merging to master!

@cloud-fan cloud-fan closed this in 2d4be52 Jan 16, 2023
cloud-fan pushed a commit to cloud-fan/spark that referenced this pull request Jan 16, 2023
This PR makes `RowEncoder` produce an `AgnosticEncoder`. The expression generation for these encoders is moved to `ScalaReflection` (this will be moved out in a subsequent PR).

The generated serializer and deserializer expressions will slightly change for both schema and type based encoders. These are not semantically different from the old expressions. Concretely the following changes have been introduced:
- There is more type validation in maps/arrays/seqs for type based encoders. This should be a positive change, since it disallows users to pass wrong data through erasure hackd.
- Array/Seq serialization is a bit more strict. In the old scenario it was possible to pass in sequences/arrays with the wrong type and/or nullability.

For the Spark Connect Scala Client we also want to be able to use `Row` based results.

No

This is a refactoring, existing tests should be sufficient.

Closes apache#39517 from hvanhovell/SPARK-41993.

Authored-by: Herman van Hovell <herman@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell and @cloud-fan .

Unfortunately, this breaks Scala 2.13 Kafka SQL module with SparkRuntimeException during encoding. I verified that the failures are gone after reverting.

Cause: org.apache.spark.SparkRuntimeException:
Error while encoding: java.lang.RuntimeException:
scala.collection.mutable.ArraySeq$ofRef is not a valid external type for schema of array<struct<key:string,value:binary>>
     [info] *** 1 SUITE ABORTED ***
     [info] *** 8 TESTS FAILED ***
     [error] Failed tests:
     [error] 	org.apache.spark.sql.kafka010.KafkaRelationSuiteV1
     [error] 	org.apache.spark.sql.kafka010.KafkaMicroBatchV2SourceSuite
     [error] 	org.apache.spark.sql.kafka010.KafkaMicroBatchV1SourceWithAdminSuite
     [error] 	org.apache.spark.sql.kafka010.KafkaMicroBatchV1SourceSuite
     [error] 	org.apache.spark.sql.kafka010.KafkaRelationSuiteWithAdminV1
     [error] 	org.apache.spark.sql.kafka010.KafkaSinkBatchSuiteV1
     [error] 	org.apache.spark.sql.kafka010.KafkaMicroBatchV2SourceWithAdminSuite
     [error] Error during tests:
     [error] 	org.apache.spark.sql.kafka010.KafkaContinuousSourceSuite
     [error] (sql-kafka-0-10 / Test / test) sbt.TestsFailedException: Tests unsuccessful
     [error] Total time: 2513 s (41:53), completed Jan 16, 2023, 10:07:56 PM

The failures are massive across multiple suites and looks tricky. Let me revert this first because we have a branch cut schedule Today.

cc @HyukjinKwon , @xinrong-meng (3.4.0 release manager).

@hvanhovell
Copy link
Contributor Author

@dongjoon-hyun I am looking at it now. I am not sure if massive is the qualification I would use; all of these are likely to be caused by the same thing. Feel free to revert if you have too.

@dongjoon-hyun
Copy link
Member

Thank you, @hvanhovell .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants