Skip to content

[SPARK-30429][SQL] Optimize catalogString and usage in ValidateExternalType.errMsg to avoid OOM #27117

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from

Conversation

viirya
Copy link
Member

@viirya viirya commented Jan 7, 2020

What changes were proposed in this pull request?

This patch proposes:

  1. Fix OOM at WideSchemaBenchmark: make ValidateExternalType.errMsg lazy variable, i.e. not to initiate it in the constructor
  2. Truncate errMsg: Replacing catalogString with simpleString which is truncated
  3. Optimizing override def catalogString in StructType: Make catalogString more efficient in string generation by using StringConcat

Why are the changes needed?

In the JIRA, it is found that WideSchemaBenchmark fails with OOM, like:

[error] Exception in thread "main" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, tree: validateexternaltype(getexternalrowfield(input[0, org.apac
he.spark.sql.Row, true], 0, a), StructField(b,StructType(StructField(c,StructType(StructField(value_1,LongType,true), StructField(value_10,LongType,true), StructField(value_
100,LongType,true), StructField(value_1000,LongType,true), StructField(value_1001,LongType,true), StructField(value_1002,LongType,true), StructField(value_1003,LongType,true
), StructField(value_1004,LongType,true), StructField(value_1005,LongType,true), StructField(value_1006,LongType,true), StructField(value_1007,LongType,true), StructField(va
lue_1008,LongType,true), StructField(value_1009,LongType,true), StructField(value_101,LongType,true), StructField(value_1010,LongType,true), StructField(value_1011,LongType,
...
ue), StructField(value_99,LongType,true), StructField(value_990,LongType,true), StructField(value_991,LongType,true), StructField(value_992,LongType,true), StructField(value
_993,LongType,true), StructField(value_994,LongType,true), StructField(value_995,LongType,true), StructField(value_996,LongType,true), StructField(value_997,LongType,true), 
StructField(value_998,LongType,true), StructField(value_999,LongType,true)),true)) 
[error]         at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)                                                                                
[error]         at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:435)                                                                                 
[error]         at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:408)                                                                              
[error]         at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)                                                                              
[error]         at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:307)           
....
[error]         at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:404)                                                                   
[error]         at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:214)                                                                       
[error]         at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:374)                                                                              
[error]         at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)                                                                              
[error]         at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:307)                                                                              
[error]         at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:307)                                                                   
[error]         at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:376)                                                                   
[error]         at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:214)                                                                       
[error]         at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:374)                                                                              
[error]         at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)                                                                              
[error]         at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:307)                                                                              
[error]         at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.<init>(ExpressionEncoder.scala:198)                                                              
[error]         at org.apache.spark.sql.catalyst.encoders.RowEncoder$.apply(RowEncoder.scala:71)                                                                             
[error]         at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:88)                                                                                                    
[error]         at org.apache.spark.sql.SparkSession.internalCreateDataFrame(SparkSession.scala:554)                                                                         
[error]         at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:476)                                                                                      
[error]         at org.apache.spark.sql.execution.benchmark.WideSchemaBenchmark$.$anonfun$wideShallowlyNestedStructFieldReadAndWrite$1(WideSchemaBenchmark.scala:126) 
...
[error] Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
[error]         at java.util.Arrays.copyOf(Arrays.java:3332)
[error]         at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
[error]         at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
[error]         at java.lang.StringBuilder.append(StringBuilder.java:136)
[error]         at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:213)
[error]         at scala.collection.TraversableOnce.$anonfun$addString$1(TraversableOnce.scala:368)
[error]         at scala.collection.TraversableOnce$$Lambda$67/667447085.apply(Unknown Source)
[error]         at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
[error]         at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
[error]         at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
[error]         at scala.collection.TraversableOnce.addString(TraversableOnce.scala:362)
[error]         at scala.collection.TraversableOnce.addString$(TraversableOnce.scala:358)
[error]         at scala.collection.mutable.ArrayOps$ofRef.addString(ArrayOps.scala:198)
[error]         at scala.collection.TraversableOnce.mkString(TraversableOnce.scala:328)
[error]         at scala.collection.TraversableOnce.mkString$(TraversableOnce.scala:327)
[error]         at scala.collection.mutable.ArrayOps$ofRef.mkString(ArrayOps.scala:198)
[error]         at scala.collection.TraversableOnce.mkString(TraversableOnce.scala:330)
[error]         at scala.collection.TraversableOnce.mkString$(TraversableOnce.scala:330)
[error]         at scala.collection.mutable.ArrayOps$ofRef.mkString(ArrayOps.scala:198)
[error]         at org.apache.spark.sql.types.StructType.catalogString(StructType.scala:411)
[error]         at org.apache.spark.sql.catalyst.expressions.objects.ValidateExternalType.<init>(objects.scala:1695)
[error]         at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
[error]         at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
[error]         at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
[error]         at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
[error]         at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$makeCopy$7(TreeNode.scala:468)
[error]         at org.apache.spark.sql.catalyst.trees.TreeNode$$Lambda$934/387827651.apply(Unknown Source)
[error]         at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
[error]         at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$makeCopy$1(TreeNode.scala:467)
[error]         at org.apache.spark.sql.catalyst.trees.TreeNode$$Lambda$929/449240381.apply(Unknown Source)
[error]         at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
[error]         at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:435)

It is after cb5ea20 commit which refactors ExpressionEncoder.

The stacktrace shows it fails at transformUp on objSerializer in ExpressionEncoder. In particular, it fails at initializing ValidateExternalType.errMsg, that interpolates catalogString of given expected data type in a string. In WideSchemaBenchmark we have very deeply nested data type. When we transform on the serializer which contains ValidateExternalType, we create redundant big string errMsg. Because we just in transforming it and don't use it yet, it is useless and waste a lot of memory.

After make ValidateExternalType.errMsg as lazy variable, WideSchemaBenchmark works.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Manual test with WideSchemaBenchmark.

@viirya
Copy link
Member Author

viirya commented Jan 7, 2020

Copy link
Member

@MaxGekk MaxGekk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we really need to output the error message for large expected.catalogString, it will fail with OOM, right? which is not good. To avoid such kind of issues, it would be nice to use StringConcat:

/**
* Concatenation of sequence of strings to final string with cheap append method
* and one memory allocation for the final string. Can also bound the final size of
* the string.
*/
class StringConcat(val maxLength: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {

or truncate expected.catalogString somehow else.

@viirya Or OOM happens because we create a lot of ValidateExternalType.errMsg?

@SparkQA
Copy link

SparkQA commented Jan 7, 2020

Test build #116234 has finished for PR 27117 at commit 5fea579.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

+1 to truncate expected.catalogString

@viirya
Copy link
Member Author

viirya commented Jan 7, 2020

If we really need to output the error message for large expected.catalogString, it will fail with OOM, right? which is not good. To avoid such kind of issues, it would be nice to use StringConcat:

/**
* Concatenation of sequence of strings to final string with cheap append method
* and one memory allocation for the final string. Can also bound the final size of
* the string.
*/
class StringConcat(val maxLength: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {

or truncate expected.catalogString somehow else.

@viirya Or OOM happens because we create a lot of ValidateExternalType.errMsg?

When we transform on the serializer, we copy expressions there, it creates redundant ValidateExternalType.errMsgs. So we initiates more than needed errMsg (of the transformed serializer) now. Currently the OOM at WideSchemaBenchmark is due to this.

As we will initiate errMsg when we actually need to use ValidateExternalType. The current nested level at WideSchemaBenchmark is ok. But more deeply nested case might cause OOM possibly if it asks more memory.

I think as you and @cloud-fan, we may need to truncate expected.catalogString in ValidateExternalType to prevent that.

@SparkQA
Copy link

SparkQA commented Jan 7, 2020

Test build #116251 has finished for PR 27117 at commit 3a6d3c2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 7, 2020

Test build #116253 has finished for PR 27117 at commit 25a1f63.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Jan 7, 2020

empty array.init isn't empty but throws java.lang.UnsupportedOperationException...

@dongjoon-hyun
Copy link
Member

Thank you for reporting, @MaxGekk . And, thank you for quick fix, @viirya !

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this PR, there are three different themes. Could you update the PR title and description accordingly?

  1. Using lazy: With lazy, the OOM is gone at benchmark
  2. Replacing catalogString with simpleString: This looks independent improvement for errMsg.
  3. Optimizing override def catalogString: This is not used at errMsg due to (2). So, this becomes independent in this PR.

If the PR title and description is correctly updated, we don't need to split this PR since the code is simple. Otherwise, we may want to split this into multiple PRs.

@viirya viirya changed the title [SPARK-30429][SQL] ValidateExternalType should not initiate errMsg in the constructor [SPARK-30429][SQL] Optimize catalogString and usage in ValidateExternalType.errMsg to avoid OOM Jan 7, 2020
@viirya
Copy link
Member Author

viirya commented Jan 7, 2020

@dongjoon-hyun Thanks! I've updated the PR description and title.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. (Pending Jenkins).
Thank you for updating, @viirya .

@viirya
Copy link
Member Author

viirya commented Jan 7, 2020

Thanks! @dongjoon-hyun @MaxGekk

@SparkQA
Copy link

SparkQA commented Jan 7, 2020

Test build #116256 has finished for PR 27117 at commit 87eb2b6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 8, 2020

Test build #116258 has finished for PR 27117 at commit 5f25d79.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Merged to master. Thank you all!

@viirya viirya deleted the SPARK-30429 branch December 27, 2023 18:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants