Skip to content

[SPARK-29347][SQL] Add JSON serialization for external Rows #26013

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from

Conversation

hvanhovell
Copy link
Contributor

What changes were proposed in this pull request?

This PR adds JSON serialization for Spark external Rows.

Why are the changes needed?

This is to be used for observable metrics where the StreamingQueryProgress contains a map of observed metrics rows which needs to be serialized in some cases.

Does this PR introduce any user-facing change?

Yes, a user can call toJson on rows returned when collecting a DataFrame to the driver.

How was this patch tested?

Added a new test suite: RowJsonSuite that should test this.

This is to be used for observable metrics where the `StreamingQueryProgress` contains a map of observed metrics rows which needs to be serialized in some cases.

Added a new test suite: `RowJsonSuite` that should test this.
@hvanhovell hvanhovell added the SQL label Oct 3, 2019
@hvanhovell hvanhovell requested a review from cloud-fan October 3, 2019 14:53
@hvanhovell hvanhovell changed the title [SPARK-29347] This PR adds JSON serialization for Spark external Rows. [SPARK-29347] Adds JSON serialization for external Rows Oct 3, 2019
@hvanhovell hvanhovell changed the title [SPARK-29347] Adds JSON serialization for external Rows [SPARK-29347] Add JSON serialization for external Rows Oct 3, 2019
@dongjoon-hyun dongjoon-hyun changed the title [SPARK-29347] Add JSON serialization for external Rows [SPARK-29347][SQL] Add JSON serialization for external Rows Oct 3, 2019
@SparkQA
Copy link

SparkQA commented Oct 3, 2019

Test build #111740 has finished for PR 26013 at commit 98d42e2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • s\"(class of $

@@ -501,4 +513,88 @@ trait Row extends Serializable {
private def getAnyValAs[T <: AnyVal](i: Int): T =
if (isNullAt(i)) throw new NullPointerException(s"Value at index $i is null")
else getAs[T](i)

/** The compact JSON representation of this row. */
def json: String = compact(jsonValue)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell, how about reusing JacksonGenerator in our JSON datasource?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's pretty option for prettyJson too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, right, schema can be unknown ..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well you still need the schema. The main reason for not using Jackson generator is that we need to convert back to an internal row and this is super slow.

Copy link
Member

@HyukjinKwon HyukjinKwon Oct 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, this API looks already pretty slow though, and I suspect this API should not be called in a critical path .. ?
If it's supposed to be used in a critical path, we might rather have to provide a API to make a convert function given schema (so that we avoid type dispatch for every row).

One rather minor concern is that the JSON representation for a row seems different comparing to JSON datasource. e.g.) https://github.com/apache/spark/pull/26013/files#r331463832 and https://github.com/apache/spark/pull/26013/files#diff-78ce4e47d137bbb0d4350ad732b48d5bR576-R578

and here a bit duplicates the codes ..

Copy link
Contributor Author

@hvanhovell hvanhovell Oct 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So two things to consider here.

I want to use this in StreamingQueryProgress right? All the JSON serialization there is based on the json4s AST and not strings (which is what JacksonGenerator produces).

There is a difference between it being slow, and what you are suggesting. The latter being crazy inefficient. Let's break that down:

  • Row to InternalRow conversion. You will need to create a converter per row because there is currently no way we can safely cache a converter. You can either use ScalaReflection or RowEncoder here, the latter is particularly bad because it uses code generation (which takes in the order of mills and which is weakly cached on the driver).
  • Setting up the JacksonGenerator, again this is uncached and we need to set up the same thing for each tuple.
  • Generating the string.

Do you see my point here? Or shall I write a benchmark?

Copy link
Member

@HyukjinKwon HyukjinKwon Oct 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's one API case we dropped performance improvement in Row as an example (see #23271).

  @deprecated("This method is deprecated and will be removed in future versions.", "3.0.0")
  def merge(rows: Row*): Row = {
    // TODO: Improve the performance of this if used in performance critical part.
    new GenericRow(rows.flatMap(_.toSeq).toArray)
  }

Do you mind if I ask to add @Unstable or @Private for these new APIs instead just for future improvement in case, with @since in the Scaladoc?

Row itself is marked as @Stable so it might better explicitly note that this can be changed in the future. With this LGTM.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will mark them as @unstable. @Private is debatable, because it is not really meant as an internal only API.

Copy link
Member

@MaxGekk MaxGekk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be used for observable metrics where the StreamingQueryProgress ...

Is it the only purpose of new methods? If so, maybe those methods should be put to a separate utils objects out of general Row? How many Spark users are interested in those functions?

@hvanhovell
Copy link
Contributor Author

@MaxGekk while the immediate reason is observable metrics, there surely is a different use case to be found here. I prefer not to hide these things somewhere, if we can also add it to the class it self.

iteratorToJsonArray(a.iterator, elementType)
case (s: Seq[_], ArrayType(elementType, _)) =>
iteratorToJsonArray(s.iterator, elementType)
case (m: Map[String @unchecked, _], MapType(StringType, valueType, _)) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it really worth to have a special format for string-type-key map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason would that is emits more readable JSON. This is similar to the way StreamingQueryProgress is rendering maps. I can revert if you feel strongly about this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to convert the JSON string back to a Row? If we do then I think it's better to keep the ser/de simply. If not I'm fine with the code here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In its current form it is not really meant to be converted back.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can other primitive types like Int be good for this format too?

.add("c1", "string")
.add("c2", IntegerType)

private def testJson(name: String, value: Any, dt: DataType, expected: JValue): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: wrong indentation

@cloud-fan
Copy link
Contributor

LGTM, we probably need to wait a few days until jenkins is back online.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Oct 14, 2019

Test build #112026 has finished for PR 26013 at commit abe9ffa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 14, 2019

Test build #112039 has finished for PR 26013 at commit 43c2d24.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hvanhovell
Copy link
Contributor Author

Merging to master

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @hvanhovell. LGTM too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants