-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-29347][SQL] Add JSON serialization for external Rows #26013
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This is to be used for observable metrics where the `StreamingQueryProgress` contains a map of observed metrics rows which needs to be serialized in some cases. Added a new test suite: `RowJsonSuite` that should test this.
Test build #111740 has finished for PR 26013 at commit
|
@@ -501,4 +513,88 @@ trait Row extends Serializable { | |||
private def getAnyValAs[T <: AnyVal](i: Int): T = | |||
if (isNullAt(i)) throw new NullPointerException(s"Value at index $i is null") | |||
else getAs[T](i) | |||
|
|||
/** The compact JSON representation of this row. */ | |||
def json: String = compact(jsonValue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell, how about reusing JacksonGenerator
in our JSON datasource?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's pretty
option for prettyJson
too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, right, schema can be unknown ..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well you still need the schema. The main reason for not using Jackson generator is that we need to convert back to an internal row and this is super slow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, this API looks already pretty slow though, and I suspect this API should not be called in a critical path .. ?
If it's supposed to be used in a critical path, we might rather have to provide a API to make a convert function given schema (so that we avoid type dispatch for every row).
One rather minor concern is that the JSON representation for a row seems different comparing to JSON datasource. e.g.) https://github.com/apache/spark/pull/26013/files#r331463832 and https://github.com/apache/spark/pull/26013/files#diff-78ce4e47d137bbb0d4350ad732b48d5bR576-R578
and here a bit duplicates the codes ..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So two things to consider here.
I want to use this in StreamingQueryProgress right? All the JSON serialization there is based on the json4s AST and not strings (which is what JacksonGenerator produces).
There is a difference between it being slow, and what you are suggesting. The latter being crazy inefficient. Let's break that down:
- Row to InternalRow conversion. You will need to create a converter per row because there is currently no way we can safely cache a converter. You can either use
ScalaReflection
orRowEncoder
here, the latter is particularly bad because it uses code generation (which takes in the order of mills and which is weakly cached on the driver). - Setting up the JacksonGenerator, again this is uncached and we need to set up the same thing for each tuple.
- Generating the string.
Do you see my point here? Or shall I write a benchmark?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's one API case we dropped performance improvement in Row
as an example (see #23271).
@deprecated("This method is deprecated and will be removed in future versions.", "3.0.0")
def merge(rows: Row*): Row = {
// TODO: Improve the performance of this if used in performance critical part.
new GenericRow(rows.flatMap(_.toSeq).toArray)
}
Do you mind if I ask to add @Unstable
or @Private
for these new APIs instead just for future improvement in case, with @since
in the Scaladoc?
Row
itself is marked as @Stable
so it might better explicitly note that this can be changed in the future. With this LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to be used for observable metrics where the StreamingQueryProgress ...
Is it the only purpose of new methods? If so, maybe those methods should be put to a separate utils objects out of general Row
? How many Spark users are interested in those functions?
@MaxGekk while the immediate reason is observable metrics, there surely is a different use case to be found here. I prefer not to hide these things somewhere, if we can also add it to the class it self. |
sql/catalyst/src/test/scala/org/apache/spark/sql/RowJsonSuite.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/test/scala/org/apache/spark/sql/RowJsonSuite.scala
Outdated
Show resolved
Hide resolved
iteratorToJsonArray(a.iterator, elementType) | ||
case (s: Seq[_], ArrayType(elementType, _)) => | ||
iteratorToJsonArray(s.iterator, elementType) | ||
case (m: Map[String @unchecked, _], MapType(StringType, valueType, _)) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it really worth to have a special format for string-type-key map?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason would that is emits more readable JSON. This is similar to the way StreamingQueryProgress is rendering maps. I can revert if you feel strongly about this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to convert the JSON string back to a Row? If we do then I think it's better to keep the ser/de simply. If not I'm fine with the code here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In its current form it is not really meant to be converted back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can other primitive types like Int be good for this format too?
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
.add("c1", "string") | ||
.add("c2", IntegerType) | ||
|
||
private def testJson(name: String, value: Any, dt: DataType, expected: JValue): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: wrong indentation
LGTM, we probably need to wait a few days until jenkins is back online. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
retest this please |
Test build #112026 has finished for PR 26013 at commit
|
Test build #112039 has finished for PR 26013 at commit
|
Merging to master |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @hvanhovell. LGTM too.
What changes were proposed in this pull request?
This PR adds JSON serialization for Spark external Rows.
Why are the changes needed?
This is to be used for observable metrics where the
StreamingQueryProgress
contains a map of observed metrics rows which needs to be serialized in some cases.Does this PR introduce any user-facing change?
Yes, a user can call
toJson
on rows returned when collecting a DataFrame to the driver.How was this patch tested?
Added a new test suite:
RowJsonSuite
that should test this.