Skip to content

Commit f12f11e

Browse files
kiszkdavies
authored andcommitted
[SPARK-14138] [SQL] Fix generated SpecificColumnarIterator code can exceed JVM size limit for cached DataFrames
## What changes were proposed in this pull request? This PR reduces Java byte code size of method in ```SpecificColumnarIterator``` by using two approaches: 1. Generate and call ```getTYPEColumnAccessor()``` for each type, which is actually used, for instantiating accessors 2. Group a lot of method calls (more than 4000) into a method ## How was this patch tested? Added a new unit test to ```InMemoryColumnarQuerySuite``` Here is generate code ```java /* 033 */ private org.apache.spark.sql.execution.columnar.CachedBatch batch = null; /* 034 */ /* 035 */ private org.apache.spark.sql.execution.columnar.IntColumnAccessor accessor; /* 036 */ private org.apache.spark.sql.execution.columnar.IntColumnAccessor accessor1; /* 037 */ /* 038 */ public SpecificColumnarIterator() { /* 039 */ this.nativeOrder = ByteOrder.nativeOrder(); /* 030 */ this.mutableRow = new MutableUnsafeRow(rowWriter); /* 041 */ } /* 042 */ /* 043 */ public void initialize(Iterator input, DataType[] columnTypes, int[] columnIndexes, /* 044 */ boolean columnNullables[]) { /* 044 */ this.input = input; /* 046 */ this.columnTypes = columnTypes; /* 047 */ this.columnIndexes = columnIndexes; /* 048 */ } /* 049 */ /* 050 */ /* 051 */ private org.apache.spark.sql.execution.columnar.IntColumnAccessor getIntColumnAccessor(int idx) { /* 052 */ byte[] buffer = batch.buffers()[columnIndexes[idx]]; /* 053 */ return new org.apache.spark.sql.execution.columnar.IntColumnAccessor(ByteBuffer.wrap(buffer).order(nativeOrder)); /* 054 */ } /* 055 */ /* 056 */ /* 057 */ /* 058 */ /* 059 */ /* 060 */ /* 061 */ public boolean hasNext() { /* 062 */ if (currentRow < numRowsInBatch) { /* 063 */ return true; /* 064 */ } /* 065 */ if (!input.hasNext()) { /* 066 */ return false; /* 067 */ } /* 068 */ /* 069 */ batch = (org.apache.spark.sql.execution.columnar.CachedBatch) input.next(); /* 070 */ currentRow = 0; /* 071 */ numRowsInBatch = batch.numRows(); /* 072 */ accessor = getIntColumnAccessor(0); /* 073 */ accessor1 = getIntColumnAccessor(1); /* 074 */ /* 075 */ return hasNext(); /* 076 */ } /* 077 */ /* 078 */ public InternalRow next() { /* 079 */ currentRow += 1; /* 080 */ bufferHolder.reset(); /* 081 */ rowWriter.zeroOutNullBytes(); /* 082 */ accessor.extractTo(mutableRow, 0); /* 083 */ accessor1.extractTo(mutableRow, 1); /* 084 */ unsafeRow.setTotalSize(bufferHolder.totalSize()); /* 085 */ return unsafeRow; /* 086 */ } ``` (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com> Closes apache#11984 from kiszk/SPARK-14138.
1 parent 3cc3d85 commit f12f11e

File tree

2 files changed

+54
-6
lines changed

2 files changed

+54
-6
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.execution.columnar
1919

20+
import scala.collection.mutable
21+
2022
import org.apache.spark.Logging
2123
import org.apache.spark.sql.catalyst.InternalRow
2224
import org.apache.spark.sql.catalyst.expressions._
@@ -88,7 +90,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
8890
case array: ArrayType => classOf[ArrayColumnAccessor].getName
8991
case t: MapType => classOf[MapColumnAccessor].getName
9092
}
91-
ctx.addMutableState(accessorCls, accessorName, s"$accessorName = null;")
93+
ctx.addMutableState(accessorCls, accessorName, "")
9294

9395
val createCode = dt match {
9496
case t if ctx.isPrimitiveType(dt) =>
@@ -97,7 +99,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
9799
s"$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder));"
98100
case other =>
99101
s"""$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder),
100-
(${dt.getClass.getName}) columnTypes[$index]);"""
102+
(${dt.getClass.getName}) columnTypes[$index]);"""
101103
}
102104

103105
val extract = s"$accessorName.extractTo(mutableRow, $index);"
@@ -114,6 +116,42 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
114116
(createCode, extract + patch)
115117
}.unzip
116118

119+
/*
120+
* 200 = 6000 bytes / 30 (up to 30 bytes per one call))
121+
* the maximum byte code size to be compiled for HotSpot is 8000.
122+
* We should keep less than 8000
123+
*/
124+
val numberOfStatementsThreshold = 200
125+
val (initializerAccessorCalls, extractorCalls) =
126+
if (initializeAccessors.length <= numberOfStatementsThreshold) {
127+
(initializeAccessors.mkString("\n"), extractors.mkString("\n"))
128+
} else {
129+
val groupedAccessorsItr = initializeAccessors.grouped(numberOfStatementsThreshold)
130+
val groupedExtractorsItr = extractors.grouped(numberOfStatementsThreshold)
131+
var groupedAccessorsLength = 0
132+
groupedAccessorsItr.zipWithIndex.map { case (body, i) =>
133+
groupedAccessorsLength += 1
134+
val funcName = s"accessors$i"
135+
val funcCode = s"""
136+
|private void $funcName() {
137+
| ${body.mkString("\n")}
138+
|}
139+
""".stripMargin
140+
ctx.addNewFunction(funcName, funcCode)
141+
}
142+
groupedExtractorsItr.zipWithIndex.map { case (body, i) =>
143+
val funcName = s"extractors$i"
144+
val funcCode = s"""
145+
|private void $funcName() {
146+
| ${body.mkString("\n")}
147+
|}
148+
""".stripMargin
149+
ctx.addNewFunction(funcName, funcCode)
150+
}
151+
((0 to groupedAccessorsLength - 1).map { i => s"accessors$i();" }.mkString("\n"),
152+
(0 to groupedAccessorsLength - 1).map { i => s"extractors$i();" }.mkString("\n"))
153+
}
154+
117155
val code = s"""
118156
import java.nio.ByteBuffer;
119157
import java.nio.ByteOrder;
@@ -149,8 +187,6 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
149187
this.nativeOrder = ByteOrder.nativeOrder();
150188
this.buffers = new byte[${columnTypes.length}][];
151189
this.mutableRow = new MutableUnsafeRow(rowWriter);
152-
153-
${initMutableStates(ctx)}
154190
}
155191

156192
public void initialize(Iterator input, DataType[] columnTypes, int[] columnIndexes) {
@@ -159,6 +195,8 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
159195
this.columnIndexes = columnIndexes;
160196
}
161197

198+
${declareAddedFunctions(ctx)}
199+
162200
public boolean hasNext() {
163201
if (currentRow < numRowsInBatch) {
164202
return true;
@@ -173,7 +211,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
173211
for (int i = 0; i < columnIndexes.length; i ++) {
174212
buffers[i] = batch.buffers()[columnIndexes[i]];
175213
}
176-
${initializeAccessors.mkString("\n")}
214+
${initializerAccessorCalls}
177215

178216
return hasNext();
179217
}
@@ -182,7 +220,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
182220
currentRow += 1;
183221
bufferHolder.reset();
184222
rowWriter.initialize(bufferHolder, $numFields);
185-
${extractors.mkString("\n")}
223+
${extractorCalls}
186224
unsafeRow.pointTo(bufferHolder.buffer, $numFields, bufferHolder.totalSize());
187225
return unsafeRow;
188226
}

sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,4 +219,14 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
219219
assert(data.count() === 10)
220220
assert(data.filter($"s" === "3").count() === 1)
221221
}
222+
223+
test("SPARK-14138: Generated SpecificColumnarIterator can exceed JVM size limit for cached DF") {
224+
val length1 = 3999
225+
val columnTypes1 = List.fill(length1)(IntegerType)
226+
val columnarIterator1 = GenerateColumnAccessor.generate(columnTypes1)
227+
228+
val length2 = 10000
229+
val columnTypes2 = List.fill(length2)(IntegerType)
230+
val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2)
231+
}
222232
}

0 commit comments

Comments
 (0)