Skip to content

Commit 1160457

Browse files
viiryadongjoon-hyun
authored andcommitted
[SPARK-30429][SQL] Optimize catalogString and usage in ValidateExternalType.errMsg to avoid OOM
### What changes were proposed in this pull request? This patch proposes: 1. Fix OOM at WideSchemaBenchmark: make `ValidateExternalType.errMsg` lazy variable, i.e. not to initiate it in the constructor 2. Truncate `errMsg`: Replacing `catalogString` with `simpleString` which is truncated 3. Optimizing `override def catalogString` in `StructType`: Make `catalogString` more efficient in string generation by using `StringConcat` ### Why are the changes needed? In the JIRA, it is found that WideSchemaBenchmark fails with OOM, like: ``` [error] Exception in thread "main" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, tree: validateexternaltype(getexternalrowfield(input[0, org.apac he.spark.sql.Row, true], 0, a), StructField(b,StructType(StructField(c,StructType(StructField(value_1,LongType,true), StructField(value_10,LongType,true), StructField(value_ 100,LongType,true), StructField(value_1000,LongType,true), StructField(value_1001,LongType,true), StructField(value_1002,LongType,true), StructField(value_1003,LongType,true ), StructField(value_1004,LongType,true), StructField(value_1005,LongType,true), StructField(value_1006,LongType,true), StructField(value_1007,LongType,true), StructField(va lue_1008,LongType,true), StructField(value_1009,LongType,true), StructField(value_101,LongType,true), StructField(value_1010,LongType,true), StructField(value_1011,LongType, ... ue), StructField(value_99,LongType,true), StructField(value_990,LongType,true), StructField(value_991,LongType,true), StructField(value_992,LongType,true), StructField(value _993,LongType,true), StructField(value_994,LongType,true), StructField(value_995,LongType,true), StructField(value_996,LongType,true), StructField(value_997,LongType,true), StructField(value_998,LongType,true), StructField(value_999,LongType,true)),true)) [error] at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) [error] at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:435) [error] at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:408) [error] at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327) [error] at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:307) .... [error] at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:404) [error] at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:214) [error] at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:374) [error] at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327) [error] at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:307) [error] at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:307) [error] at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:376) [error] at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:214) [error] at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:374) [error] at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327) [error] at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:307) [error] at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.<init>(ExpressionEncoder.scala:198) [error] at org.apache.spark.sql.catalyst.encoders.RowEncoder$.apply(RowEncoder.scala:71) [error] at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:88) [error] at org.apache.spark.sql.SparkSession.internalCreateDataFrame(SparkSession.scala:554) [error] at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:476) [error] at org.apache.spark.sql.execution.benchmark.WideSchemaBenchmark$.$anonfun$wideShallowlyNestedStructFieldReadAndWrite$1(WideSchemaBenchmark.scala:126) ... [error] Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded [error] at java.util.Arrays.copyOf(Arrays.java:3332) [error] at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124) [error] at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448) [error] at java.lang.StringBuilder.append(StringBuilder.java:136) [error] at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:213) [error] at scala.collection.TraversableOnce.$anonfun$addString$1(TraversableOnce.scala:368) [error] at scala.collection.TraversableOnce$$Lambda$67/667447085.apply(Unknown Source) [error] at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) [error] at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) [error] at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) [error] at scala.collection.TraversableOnce.addString(TraversableOnce.scala:362) [error] at scala.collection.TraversableOnce.addString$(TraversableOnce.scala:358) [error] at scala.collection.mutable.ArrayOps$ofRef.addString(ArrayOps.scala:198) [error] at scala.collection.TraversableOnce.mkString(TraversableOnce.scala:328) [error] at scala.collection.TraversableOnce.mkString$(TraversableOnce.scala:327) [error] at scala.collection.mutable.ArrayOps$ofRef.mkString(ArrayOps.scala:198) [error] at scala.collection.TraversableOnce.mkString(TraversableOnce.scala:330) [error] at scala.collection.TraversableOnce.mkString$(TraversableOnce.scala:330) [error] at scala.collection.mutable.ArrayOps$ofRef.mkString(ArrayOps.scala:198) [error] at org.apache.spark.sql.types.StructType.catalogString(StructType.scala:411) [error] at org.apache.spark.sql.catalyst.expressions.objects.ValidateExternalType.<init>(objects.scala:1695) [error] at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) [error] at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) [error] at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) [error] at java.lang.reflect.Constructor.newInstance(Constructor.java:423) [error] at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$makeCopy$7(TreeNode.scala:468) [error] at org.apache.spark.sql.catalyst.trees.TreeNode$$Lambda$934/387827651.apply(Unknown Source) [error] at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) [error] at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$makeCopy$1(TreeNode.scala:467) [error] at org.apache.spark.sql.catalyst.trees.TreeNode$$Lambda$929/449240381.apply(Unknown Source) [error] at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) [error] at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:435) ``` It is after cb5ea20 commit which refactors `ExpressionEncoder`. The stacktrace shows it fails at `transformUp` on `objSerializer` in `ExpressionEncoder`. In particular, it fails at initializing `ValidateExternalType.errMsg`, that interpolates `catalogString` of given `expected` data type in a string. In WideSchemaBenchmark we have very deeply nested data type. When we transform on the serializer which contains `ValidateExternalType`, we create redundant big string `errMsg`. Because we just in transforming it and don't use it yet, it is useless and waste a lot of memory. After make `ValidateExternalType.errMsg` as lazy variable, WideSchemaBenchmark works. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Manual test with WideSchemaBenchmark. Closes #27117 from viirya/SPARK-30429. Lead-authored-by: Liang-Chi Hsieh <liangchi@uber.com> Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
1 parent 390e6bd commit 1160457

File tree

2 files changed

+13
-4
lines changed

2 files changed

+13
-4
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1692,7 +1692,7 @@ case class ValidateExternalType(child: Expression, expected: DataType)
16921692

16931693
override val dataType: DataType = RowEncoder.externalDataTypeForInput(expected)
16941694

1695-
private val errMsg = s" is not a valid external type for schema of ${expected.catalogString}"
1695+
private lazy val errMsg = s" is not a valid external type for schema of ${expected.simpleString}"
16961696

16971697
private lazy val checkType: (Any) => Boolean = expected match {
16981698
case _: DecimalType =>

sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.SparkException
2727
import org.apache.spark.annotation.Stable
2828
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, InterpretedOrdering}
2929
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, LegacyTypeStringParser}
30-
import org.apache.spark.sql.catalyst.util.{quoteIdentifier, truncatedString}
30+
import org.apache.spark.sql.catalyst.util.{quoteIdentifier, truncatedString, StringUtils}
3131
import org.apache.spark.sql.internal.SQLConf
3232

3333
/**
@@ -407,8 +407,17 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
407407

408408
override def catalogString: String = {
409409
// in catalogString, we should not truncate
410-
val fieldTypes = fields.map(field => s"${field.name}:${field.dataType.catalogString}")
411-
s"struct<${fieldTypes.mkString(",")}>"
410+
val stringConcat = new StringUtils.StringConcat()
411+
val len = fields.length
412+
stringConcat.append("struct<")
413+
var i = 0
414+
while (i < len) {
415+
stringConcat.append(s"${fields(i).name}:${fields(i).dataType.catalogString}")
416+
i += 1
417+
if (i < len) stringConcat.append(",")
418+
}
419+
stringConcat.append(">")
420+
stringConcat.toString
412421
}
413422

414423
override def sql: String = {

0 commit comments

Comments
 (0)