-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-41993][SQL] Move RowEncoder to AgnosticEncoders #39517
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@cloud-fan can you take a look? |
A note for the reviewers. I know that Catalyst tests pass. I have not run other tests, so there might still be a few things to iron out. |
@@ -306,7 +330,7 @@ object ScalaReflection extends ScalaReflection { | |||
* input object is located at ordinal 0 of a row, i.e., `BoundReference(0, _)`. | |||
*/ | |||
def serializerFor(enc: AgnosticEncoder[_]): Expression = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO check the generated code for boxed primitives. We might be doing double conversions there.
propagateNull = false, | ||
returnNullable = false) | ||
exprs.If( | ||
check, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can widen this to arrays where the element is allowed to be null. In that case we do need to make sure the element type is sound.
@@ -125,7 +125,7 @@ class RowEncoderSuite extends CodegenInterpretedPlanTest { | |||
new StructType() | |||
.add("mapOfIntAndString", MapType(IntegerType, StringType)) | |||
.add("mapOfStringAndArray", MapType(StringType, arrayOfString)) | |||
.add("mapOfArrayAndInt", MapType(arrayOfString, IntegerType)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
arrayOfString
doesn't work anymore inside map?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a mistake. My bad :)
walkedTypePath) | ||
expressionWithNullSafety(deserializer, enc.nullable, walkedTypePath) | ||
enc match { | ||
case RowEncoder(fields) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what encoder do we create for inner struct? how is it different from the root RowEncoder
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We create one with null checks. This one does not need them because we always return a Row (the toplevel row always exists).
elementNullable: Boolean, | ||
input: Expression, | ||
lenientSerialization: Boolean): Expression = { | ||
// Default serializer for Seq and generic Arrays. This does not work for primitive arrays. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, why the name is createSerializerForMapObjects
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know. It was like that before this.
element: AgnosticEncoder[E]) | ||
element: AgnosticEncoder[E], | ||
containsNull: Boolean, | ||
override val lenientSerialization: Boolean) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does lenient mean for a IterableEncoder
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It means we allow a Seq, a generic Array, or a primitive array as input for serialization
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we leave a code comment to mention it? It's not that obvious compared to DateEncoder
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah will do. TBH I was quite surprised by it.
externalDataTypeFor(enc, lenientSerialization = false) | ||
} | ||
|
||
private[catalyst] def lenientExternalDataTypeFor(enc: AgnosticEncoder[_]): DataType = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private[catalyst] def lenientExternalDataTypeFor(enc: AgnosticEncoder[_]): DataType = | |
private[catalyst] def lenientExternalDataTypeFor(enc: AgnosticEncoder[_]): DataType = |
assert(serializer.isInstanceOf[NewInstance]) | ||
assert(serializer.asInstanceOf[NewInstance] | ||
.cls.isAssignableFrom(classOf[org.apache.spark.sql.catalyst.util.GenericArrayData])) | ||
assert(serializer.isInstanceOf[MapObjects]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is MapObjects
better than NewInstance
for creating List[Int]
?
Only 2 minor comments, thanks, merging to master! |
This PR makes `RowEncoder` produce an `AgnosticEncoder`. The expression generation for these encoders is moved to `ScalaReflection` (this will be moved out in a subsequent PR). The generated serializer and deserializer expressions will slightly change for both schema and type based encoders. These are not semantically different from the old expressions. Concretely the following changes have been introduced: - There is more type validation in maps/arrays/seqs for type based encoders. This should be a positive change, since it disallows users to pass wrong data through erasure hackd. - Array/Seq serialization is a bit more strict. In the old scenario it was possible to pass in sequences/arrays with the wrong type and/or nullability. For the Spark Connect Scala Client we also want to be able to use `Row` based results. No This is a refactoring, existing tests should be sufficient. Closes apache#39517 from hvanhovell/SPARK-41993. Authored-by: Herman van Hovell <herman@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell and @cloud-fan .
Unfortunately, this breaks Scala 2.13 Kafka SQL module with SparkRuntimeException
during encoding. I verified that the failures are gone after reverting.
Cause: org.apache.spark.SparkRuntimeException:
Error while encoding: java.lang.RuntimeException:
scala.collection.mutable.ArraySeq$ofRef is not a valid external type for schema of array<struct<key:string,value:binary>>
[info] *** 1 SUITE ABORTED ***
[info] *** 8 TESTS FAILED ***
[error] Failed tests:
[error] org.apache.spark.sql.kafka010.KafkaRelationSuiteV1
[error] org.apache.spark.sql.kafka010.KafkaMicroBatchV2SourceSuite
[error] org.apache.spark.sql.kafka010.KafkaMicroBatchV1SourceWithAdminSuite
[error] org.apache.spark.sql.kafka010.KafkaMicroBatchV1SourceSuite
[error] org.apache.spark.sql.kafka010.KafkaRelationSuiteWithAdminV1
[error] org.apache.spark.sql.kafka010.KafkaSinkBatchSuiteV1
[error] org.apache.spark.sql.kafka010.KafkaMicroBatchV2SourceWithAdminSuite
[error] Error during tests:
[error] org.apache.spark.sql.kafka010.KafkaContinuousSourceSuite
[error] (sql-kafka-0-10 / Test / test) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 2513 s (41:53), completed Jan 16, 2023, 10:07:56 PM
The failures are massive across multiple suites and looks tricky. Let me revert this first because we have a branch cut schedule Today.
cc @HyukjinKwon , @xinrong-meng (3.4.0 release manager).
@dongjoon-hyun I am looking at it now. I am not sure if massive is the qualification I would use; all of these are likely to be caused by the same thing. Feel free to revert if you have too. |
Thank you, @hvanhovell . |
What changes were proposed in this pull request?
This PR makes
RowEncoder
produce anAgnosticEncoder
. The expression generation for these encoders is moved toScalaReflection
(this will be moved out in a subsequent PR).The generated serializer and deserializer expressions will slightly change for both schema and type based encoders. These are not semantically different from the old expressions. Concretely the following changes have been introduced:
Why are the changes needed?
For the Spark Connect Scala Client we also want to be able to use
Row
based results.Does this PR introduce any user-facing change?
No
How was this patch tested?
This is a refactoring, existing tests should be sufficient.