Skip to content

Commit

Permalink
[SPARK-43186][SQL][HIVE] Remove workaround for FileSinkDesc
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Remove `org.apache.spark.sql.hive.HiveShim.ShimFileSinkDesc`, which is used to address serializable issue of `org.apache.hadoop.hive.ql.plan.FileSinkDesc`

### Why are the changes needed?

[HIVE-6171](https://issues.apache.org/jira/browse/HIVE-6171) changed `FileSinkDesc`'s property from `String dirName` to `Path dirName`, but the `Path` is not serializable until [HADOOP-13519](https://issues.apache.org/jira/browse/HADOOP-13519) (got fixed in Hadoop 3.0.0).

Since SPARK-42452 removed support for Hadoop2, we can remove this workaround now.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass GA.

Closes #40848 from pan3793/SPARK-43186.

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Chao Sun <sunchao@apple.com>
  • Loading branch information
pan3793 authored and sunchao committed Apr 19, 2023
1 parent 8db31aa commit 2097581
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 58 deletions.
53 changes: 0 additions & 53 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,18 @@ package org.apache.spark.sql.hive
import java.rmi.server.UID

import scala.collection.JavaConverters._
import scala.language.implicitConversions

import com.google.common.base.Objects
import org.apache.avro.Schema
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.ql.exec.SerializationUtilities
import org.apache.hadoop.hive.ql.exec.UDF
import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFMacro
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils
import org.apache.hadoop.hive.serde2.avro.{AvroGenericRecordWritable, AvroSerdeUtils}
import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector
import org.apache.hadoop.io.Writable

import org.apache.spark.internal.Logging
import org.apache.spark.sql.types.Decimal
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -215,53 +211,4 @@ private[hive] object HiveShim {
}
}
}

/*
* Bug introduced in hive-0.13. FileSinkDesc is serializable, but its member path is not.
* Fix it through wrapper.
*/
implicit def wrapperToFileSinkDesc(w: ShimFileSinkDesc): FileSinkDesc = {
val f = new FileSinkDesc(new Path(w.dir), w.tableInfo, w.compressed)
f.setCompressCodec(w.compressCodec)
f.setCompressType(w.compressType)
f.setTableInfo(w.tableInfo)
f.setDestTableId(w.destTableId)
f
}

/*
* Bug introduced in hive-0.13. FileSinkDesc is serializable, but its member path is not.
* Fix it through wrapper.
*/
private[hive] class ShimFileSinkDesc(
var dir: String,
var tableInfo: TableDesc,
var compressed: Boolean)
extends Serializable with Logging {
var compressCodec: String = _
var compressType: String = _
var destTableId: Int = _

def setCompressed(compressed: Boolean): Unit = {
this.compressed = compressed
}

def getDirName(): String = dir

def setDestTableId(destTableId: Int): Unit = {
this.destTableId = destTableId
}

def setTableInfo(tableInfo: TableDesc): Unit = {
this.tableInfo = tableInfo
}

def setCompressCodec(intermediateCompressorCodec: String): Unit = {
compressCodec = intermediateCompressorCodec
}

def setCompressType(intermediateCompressType: String): Unit = {
compressType = intermediateCompressType
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.collection.JavaConverters._
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hive.ql.exec.Utilities
import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
import org.apache.hadoop.hive.ql.plan.FileSinkDesc
import org.apache.hadoop.hive.serde2.Serializer
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, StructObjectInspector}
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
Expand All @@ -37,7 +38,6 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.hive.{HiveInspectors, HiveTableUtil}
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableJobConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.execution

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.common.FileUtils
import org.apache.hadoop.hive.ql.plan.FileSinkDesc
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
Expand Down Expand Up @@ -101,8 +102,7 @@ case class InsertIntoHiveDirCommand(
// The temporary path must be a HDFS path, not a local path.
val hiveTempPath = new HiveTempPath(sparkSession, hadoopConf, qualifiedPath)
val tmpPath = hiveTempPath.externalTempPath
val fileSinkConf = new org.apache.spark.sql.hive.HiveShim.ShimFileSinkDesc(
tmpPath.toString, tableDesc, false)
val fileSinkConf = new FileSinkDesc(tmpPath, tableDesc, false)
setupHadoopConfForCompression(fileSinkConf, hadoopConf, sparkSession)
hiveTempPath.createTmpPath()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.execution
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.plan.FileSinkDesc
import org.apache.hadoop.hive.ql.plan.TableDesc

import org.apache.spark.sql.{Row, SparkSession}
Expand All @@ -33,7 +34,6 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.CommandUtils
import org.apache.spark.sql.execution.datasources.{FileFormat, V1WriteCommand, V1WritesUtils}
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.hive.client.HiveClientImpl
import org.apache.spark.sql.hive.client.hive._

Expand Down Expand Up @@ -309,7 +309,7 @@ object InsertIntoHiveTable extends V1WritesHiveUtils {
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val tableLocation = hiveQlTable.getDataLocation
val hiveTempPath = new HiveTempPath(sparkSession, hadoopConf, tableLocation)
val fileSinkConf = new FileSinkDesc(hiveTempPath.externalTempPath.toString, tableDesc, false)
val fileSinkConf = new FileSinkDesc(hiveTempPath.externalTempPath, tableDesc, false)
setupHadoopConfForCompression(fileSinkConf, hadoopConf, sparkSession)
val fileFormat: FileFormat = new HiveFileFormat(fileSinkConf)

Expand Down

0 comments on commit 2097581

Please sign in to comment.