Skip to content

Commit 58b4e4f

Browse files
nonglirxin
authored andcommitted
[SPARK-11787][SPARK-11883][SQL][FOLLOW-UP] Cleanup for this patch.
This mainly moves SqlNewHadoopRDD to the sql package. There is some state that is shared between core and I've left that in core. This allows some other associated minor cleanup. Author: Nong Li <nong@databricks.com> Closes apache#9845 from nongli/spark-11787.
1 parent ed47b1e commit 58b4e4f

File tree

10 files changed

+175
-80
lines changed

10 files changed

+175
-80
lines changed

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -215,8 +215,8 @@ class HadoopRDD[K, V](
215215

216216
// Sets the thread local variable for the file's name
217217
split.inputSplit.value match {
218-
case fs: FileSplit => SqlNewHadoopRDD.setInputFileName(fs.getPath.toString)
219-
case _ => SqlNewHadoopRDD.unsetInputFileName()
218+
case fs: FileSplit => SqlNewHadoopRDDState.setInputFileName(fs.getPath.toString)
219+
case _ => SqlNewHadoopRDDState.unsetInputFileName()
220220
}
221221

222222
// Find a function that will return the FileSystem bytes read by this thread. Do this before
@@ -256,7 +256,7 @@ class HadoopRDD[K, V](
256256

257257
override def close() {
258258
if (reader != null) {
259-
SqlNewHadoopRDD.unsetInputFileName()
259+
SqlNewHadoopRDDState.unsetInputFileName()
260260
// Close the reader and release it. Note: it's very important that we don't close the
261261
// reader more than once, since that exposes us to MAPREDUCE-5918 when running against
262262
// Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.rdd
19+
20+
import org.apache.spark.unsafe.types.UTF8String
21+
22+
/**
23+
* State for SqlNewHadoopRDD objects. This is split this way because of the package splits.
24+
* TODO: Move/Combine this with org.apache.spark.sql.datasources.SqlNewHadoopRDD
25+
*/
26+
private[spark] object SqlNewHadoopRDDState {
27+
/**
28+
* The thread variable for the name of the current file being read. This is used by
29+
* the InputFileName function in Spark SQL.
30+
*/
31+
private[this] val inputFileName: ThreadLocal[UTF8String] = new ThreadLocal[UTF8String] {
32+
override protected def initialValue(): UTF8String = UTF8String.fromString("")
33+
}
34+
35+
def getInputFileName(): UTF8String = inputFileName.get()
36+
37+
private[spark] def setInputFileName(file: String) = inputFileName.set(UTF8String.fromString(file))
38+
39+
private[spark] def unsetInputFileName(): Unit = inputFileName.remove()
40+
41+
}

sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@
1717

1818
package org.apache.spark.sql.catalyst.expressions;
1919

20-
import java.io.*;
20+
import java.io.Externalizable;
21+
import java.io.IOException;
22+
import java.io.ObjectInput;
23+
import java.io.ObjectOutput;
24+
import java.io.OutputStream;
2125
import java.math.BigDecimal;
2226
import java.math.BigInteger;
2327
import java.nio.ByteBuffer;
@@ -26,22 +30,50 @@
2630
import java.util.HashSet;
2731
import java.util.Set;
2832

29-
import com.esotericsoftware.kryo.Kryo;
30-
import com.esotericsoftware.kryo.KryoSerializable;
31-
import com.esotericsoftware.kryo.io.Input;
32-
import com.esotericsoftware.kryo.io.Output;
33-
34-
import org.apache.spark.sql.types.*;
33+
import org.apache.spark.sql.types.ArrayType;
34+
import org.apache.spark.sql.types.BinaryType;
35+
import org.apache.spark.sql.types.BooleanType;
36+
import org.apache.spark.sql.types.ByteType;
37+
import org.apache.spark.sql.types.CalendarIntervalType;
38+
import org.apache.spark.sql.types.DataType;
39+
import org.apache.spark.sql.types.DateType;
40+
import org.apache.spark.sql.types.Decimal;
41+
import org.apache.spark.sql.types.DecimalType;
42+
import org.apache.spark.sql.types.DoubleType;
43+
import org.apache.spark.sql.types.FloatType;
44+
import org.apache.spark.sql.types.IntegerType;
45+
import org.apache.spark.sql.types.LongType;
46+
import org.apache.spark.sql.types.MapType;
47+
import org.apache.spark.sql.types.NullType;
48+
import org.apache.spark.sql.types.ShortType;
49+
import org.apache.spark.sql.types.StringType;
50+
import org.apache.spark.sql.types.StructType;
51+
import org.apache.spark.sql.types.TimestampType;
52+
import org.apache.spark.sql.types.UserDefinedType;
3553
import org.apache.spark.unsafe.Platform;
3654
import org.apache.spark.unsafe.array.ByteArrayMethods;
3755
import org.apache.spark.unsafe.bitset.BitSetMethods;
3856
import org.apache.spark.unsafe.hash.Murmur3_x86_32;
3957
import org.apache.spark.unsafe.types.CalendarInterval;
4058
import org.apache.spark.unsafe.types.UTF8String;
4159

42-
import static org.apache.spark.sql.types.DataTypes.*;
60+
import static org.apache.spark.sql.types.DataTypes.BooleanType;
61+
import static org.apache.spark.sql.types.DataTypes.ByteType;
62+
import static org.apache.spark.sql.types.DataTypes.DateType;
63+
import static org.apache.spark.sql.types.DataTypes.DoubleType;
64+
import static org.apache.spark.sql.types.DataTypes.FloatType;
65+
import static org.apache.spark.sql.types.DataTypes.IntegerType;
66+
import static org.apache.spark.sql.types.DataTypes.LongType;
67+
import static org.apache.spark.sql.types.DataTypes.NullType;
68+
import static org.apache.spark.sql.types.DataTypes.ShortType;
69+
import static org.apache.spark.sql.types.DataTypes.TimestampType;
4370
import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET;
4471

72+
import com.esotericsoftware.kryo.Kryo;
73+
import com.esotericsoftware.kryo.KryoSerializable;
74+
import com.esotericsoftware.kryo.io.Input;
75+
import com.esotericsoftware.kryo.io.Output;
76+
4577
/**
4678
* An Unsafe implementation of Row which is backed by raw memory instead of Java objects.
4779
*
@@ -116,11 +148,6 @@ public static boolean isMutable(DataType dt) {
116148
/** The size of this row's backing data, in bytes) */
117149
private int sizeInBytes;
118150

119-
private void setNotNullAt(int i) {
120-
assertIndexIsValid(i);
121-
BitSetMethods.unset(baseObject, baseOffset, i);
122-
}
123-
124151
/** The width of the null tracking bit set, in bytes */
125152
private int bitSetWidthInBytes;
126153

@@ -187,6 +214,12 @@ public void pointTo(byte[] buf, int sizeInBytes) {
187214
pointTo(buf, numFields, sizeInBytes);
188215
}
189216

217+
218+
public void setNotNullAt(int i) {
219+
assertIndexIsValid(i);
220+
BitSetMethods.unset(baseObject, baseOffset, i);
221+
}
222+
190223
@Override
191224
public void setNullAt(int i) {
192225
assertIndexIsValid(i);

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InputFileName.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql.catalyst.expressions
1919

20-
import org.apache.spark.rdd.SqlNewHadoopRDD
20+
import org.apache.spark.rdd.SqlNewHadoopRDDState
2121
import org.apache.spark.sql.catalyst.InternalRow
2222
import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext}
2323
import org.apache.spark.sql.types.{DataType, StringType}
@@ -37,13 +37,13 @@ case class InputFileName() extends LeafExpression with Nondeterministic {
3737
override protected def initInternal(): Unit = {}
3838

3939
override protected def evalInternal(input: InternalRow): UTF8String = {
40-
SqlNewHadoopRDD.getInputFileName()
40+
SqlNewHadoopRDDState.getInputFileName()
4141
}
4242

4343
override def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = {
4444
ev.isNull = "false"
4545
s"final ${ctx.javaType(dataType)} ${ev.value} = " +
46-
"org.apache.spark.rdd.SqlNewHadoopRDD.getInputFileName();"
46+
"org.apache.spark.rdd.SqlNewHadoopRDDState.getInputFileName();"
4747
}
4848

4949
}

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,19 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
108108
*/
109109
private static final int DEFAULT_VAR_LEN_SIZE = 32;
110110

111+
/**
112+
* Tries to initialize the reader for this split. Returns true if this reader supports reading
113+
* this split and false otherwise.
114+
*/
115+
public boolean tryInitialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) {
116+
try {
117+
initialize(inputSplit, taskAttemptContext);
118+
return true;
119+
} catch (Exception e) {
120+
return false;
121+
}
122+
}
123+
111124
/**
112125
* Implementation of RecordReader API.
113126
*/
@@ -326,6 +339,7 @@ private void decodeBinaryBatch(int col, int num) throws IOException {
326339
} else {
327340
rowWriters[n].write(col, bytes.array(), bytes.position(), len);
328341
}
342+
rows[n].setNotNullAt(col);
329343
} else {
330344
rows[n].setNullAt(col);
331345
}

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,11 @@ private[spark] object SQLConf {
323323
"option must be set in Hadoop Configuration. 2. This option overrides " +
324324
"\"spark.sql.sources.outputCommitterClass\".")
325325

326+
val PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED = booleanConf(
327+
key = "spark.sql.parquet.enableUnsafeRowRecordReader",
328+
defaultValue = Some(true),
329+
doc = "Enables using the custom ParquetUnsafeRowRecordReader.")
330+
326331
val ORC_FILTER_PUSHDOWN_ENABLED = booleanConf("spark.sql.orc.filterPushdown",
327332
defaultValue = Some(false),
328333
doc = "When true, enable filter pushdown for ORC files.")

core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala renamed to sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala

Lines changed: 21 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ package org.apache.spark.rdd
2020
import java.text.SimpleDateFormat
2121
import java.util.Date
2222

23+
import scala.reflect.ClassTag
24+
2325
import org.apache.hadoop.conf.{Configurable, Configuration}
2426
import org.apache.hadoop.io.Writable
2527
import org.apache.hadoop.mapreduce._
@@ -28,13 +30,12 @@ import org.apache.spark.broadcast.Broadcast
2830
import org.apache.spark.deploy.SparkHadoopUtil
2931
import org.apache.spark.executor.DataReadMethod
3032
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
33+
import org.apache.spark.sql.{SQLConf, SQLContext}
34+
import org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader
3135
import org.apache.spark.storage.StorageLevel
32-
import org.apache.spark.unsafe.types.UTF8String
33-
import org.apache.spark.util.{Utils, SerializableConfiguration, ShutdownHookManager}
36+
import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager}
3437
import org.apache.spark.{Partition => SparkPartition, _}
3538

36-
import scala.reflect.ClassTag
37-
3839

3940
private[spark] class SqlNewHadoopPartition(
4041
rddId: Int,
@@ -61,13 +62,13 @@ private[spark] class SqlNewHadoopPartition(
6162
* changes based on [[org.apache.spark.rdd.HadoopRDD]].
6263
*/
6364
private[spark] class SqlNewHadoopRDD[V: ClassTag](
64-
sc : SparkContext,
65+
sqlContext: SQLContext,
6566
broadcastedConf: Broadcast[SerializableConfiguration],
6667
@transient private val initDriverSideJobFuncOpt: Option[Job => Unit],
6768
initLocalJobFuncOpt: Option[Job => Unit],
6869
inputFormatClass: Class[_ <: InputFormat[Void, V]],
6970
valueClass: Class[V])
70-
extends RDD[V](sc, Nil)
71+
extends RDD[V](sqlContext.sparkContext, Nil)
7172
with SparkHadoopMapReduceUtil
7273
with Logging {
7374

@@ -99,7 +100,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
99100
// If true, enable using the custom RecordReader for parquet. This only works for
100101
// a subset of the types (no complex types).
101102
protected val enableUnsafeRowParquetReader: Boolean =
102-
sc.conf.getBoolean("spark.parquet.enableUnsafeRowRecordReader", true)
103+
sqlContext.getConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key).toBoolean
103104

104105
override def getPartitions: Array[SparkPartition] = {
105106
val conf = getConf(isDriverSide = true)
@@ -120,8 +121,8 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
120121
}
121122

122123
override def compute(
123-
theSplit: SparkPartition,
124-
context: TaskContext): Iterator[V] = {
124+
theSplit: SparkPartition,
125+
context: TaskContext): Iterator[V] = {
125126
val iter = new Iterator[V] {
126127
val split = theSplit.asInstanceOf[SqlNewHadoopPartition]
127128
logInfo("Input split: " + split.serializableHadoopSplit)
@@ -132,8 +133,8 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
132133

133134
// Sets the thread local variable for the file's name
134135
split.serializableHadoopSplit.value match {
135-
case fs: FileSplit => SqlNewHadoopRDD.setInputFileName(fs.getPath.toString)
136-
case _ => SqlNewHadoopRDD.unsetInputFileName()
136+
case fs: FileSplit => SqlNewHadoopRDDState.setInputFileName(fs.getPath.toString)
137+
case _ => SqlNewHadoopRDDState.unsetInputFileName()
137138
}
138139

139140
// Find a function that will return the FileSystem bytes read by this thread. Do this before
@@ -163,15 +164,13 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
163164
* TODO: plumb this through a different way?
164165
*/
165166
if (enableUnsafeRowParquetReader &&
166-
format.getClass.getName == "org.apache.parquet.hadoop.ParquetInputFormat") {
167-
// TODO: move this class to sql.execution and remove this.
168-
reader = Utils.classForName(
169-
"org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader")
170-
.newInstance().asInstanceOf[RecordReader[Void, V]]
171-
try {
172-
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
173-
} catch {
174-
case e: Exception => reader = null
167+
format.getClass.getName == "org.apache.parquet.hadoop.ParquetInputFormat") {
168+
val parquetReader: UnsafeRowParquetRecordReader = new UnsafeRowParquetRecordReader()
169+
if (!parquetReader.tryInitialize(
170+
split.serializableHadoopSplit.value, hadoopAttemptContext)) {
171+
parquetReader.close()
172+
} else {
173+
reader = parquetReader.asInstanceOf[RecordReader[Void, V]]
175174
}
176175
}
177176

@@ -217,7 +216,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
217216

218217
private def close() {
219218
if (reader != null) {
220-
SqlNewHadoopRDD.unsetInputFileName()
219+
SqlNewHadoopRDDState.unsetInputFileName()
221220
// Close the reader and release it. Note: it's very important that we don't close the
222221
// reader more than once, since that exposes us to MAPREDUCE-5918 when running against
223222
// Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
@@ -235,7 +234,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
235234
if (bytesReadCallback.isDefined) {
236235
inputMetrics.updateBytesRead()
237236
} else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
238-
split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
237+
split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
239238
// If we can't get the bytes read from the FS stats, fall back to the split size,
240239
// which may be inaccurate.
241240
try {
@@ -276,23 +275,6 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
276275
}
277276
super.persist(storageLevel)
278277
}
279-
}
280-
281-
private[spark] object SqlNewHadoopRDD {
282-
283-
/**
284-
* The thread variable for the name of the current file being read. This is used by
285-
* the InputFileName function in Spark SQL.
286-
*/
287-
private[this] val inputFileName: ThreadLocal[UTF8String] = new ThreadLocal[UTF8String] {
288-
override protected def initialValue(): UTF8String = UTF8String.fromString("")
289-
}
290-
291-
def getInputFileName(): UTF8String = inputFileName.get()
292-
293-
private[spark] def setInputFileName(file: String) = inputFileName.set(UTF8String.fromString(file))
294-
295-
private[spark] def unsetInputFileName(): Unit = inputFileName.remove()
296278

297279
/**
298280
* Analogous to [[org.apache.spark.rdd.MapPartitionsRDD]], but passes in an InputSplit to

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ private[sql] class ParquetRelation(
319319

320320
Utils.withDummyCallSite(sqlContext.sparkContext) {
321321
new SqlNewHadoopRDD(
322-
sc = sqlContext.sparkContext,
322+
sqlContext = sqlContext,
323323
broadcastedConf = broadcastedConf,
324324
initDriverSideJobFuncOpt = Some(setInputPaths),
325325
initLocalJobFuncOpt = Some(initLocalJobFuncOpt),

0 commit comments

Comments
 (0)