Skip to content

Commit fae9eff

Browse files
committed
Refactors InsertIntoHiveTable to a Command
1 parent 528e84c commit fae9eff

File tree

1 file changed

+4
-9
lines changed

1 file changed

+4
-9
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.sql.hive.execution
1919

2020
import scala.collection.JavaConversions._
21-
import scala.collection.mutable
2221

2322
import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar}
2423
import org.apache.hadoop.hive.conf.HiveConf
@@ -31,14 +30,12 @@ import org.apache.hadoop.hive.serde2.Serializer
3130
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
3231
import org.apache.hadoop.hive.serde2.objectinspector._
3332
import org.apache.hadoop.hive.serde2.objectinspector.primitive.{JavaHiveDecimalObjectInspector, JavaHiveVarcharObjectInspector}
34-
import org.apache.hadoop.io.Writable
3533
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf}
3634

37-
import org.apache.spark.SparkContext._
3835
import org.apache.spark.annotation.DeveloperApi
3936
import org.apache.spark.rdd.RDD
4037
import org.apache.spark.sql.catalyst.expressions.Row
41-
import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
38+
import org.apache.spark.sql.execution.{Command, SparkPlan, UnaryNode}
4239
import org.apache.spark.sql.hive._
4340
import org.apache.spark.{SerializableWritable, SparkException, TaskContext}
4441

@@ -52,7 +49,7 @@ case class InsertIntoHiveTable(
5249
child: SparkPlan,
5350
overwrite: Boolean)
5451
(@transient sc: HiveContext)
55-
extends UnaryNode {
52+
extends UnaryNode with Command {
5653

5754
@transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
5855
@transient private lazy val hiveContext = new Context(sc.hiveconf)
@@ -172,16 +169,14 @@ case class InsertIntoHiveTable(
172169
}
173170
}
174171

175-
override def execute() = result
176-
177172
/**
178173
* Inserts all the rows in the table into Hive. Row objects are properly serialized with the
179174
* `org.apache.hadoop.hive.serde2.SerDe` and the
180175
* `org.apache.hadoop.mapred.OutputFormat` provided by the table definition.
181176
*
182177
* Note: this is run once and then kept to avoid double insertions.
183178
*/
184-
private lazy val result: RDD[Row] = {
179+
override protected[sql] lazy val sideEffectResult: Seq[Row] = {
185180
// Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer
186181
// instances within the closure, since Serializer is not serializable while TableDesc is.
187182
val tableDesc = table.tableDesc
@@ -293,6 +288,6 @@ case class InsertIntoHiveTable(
293288
// however for now we return an empty list to simplify compatibility checks with hive, which
294289
// does not return anything for insert operations.
295290
// TODO: implement hive compatibility as rules.
296-
sc.sparkContext.makeRDD(Nil, 1)
291+
Seq.empty[Row]
297292
}
298293
}

0 commit comments

Comments
 (0)