Skip to content

[SPARK-33082][SQL] Remove hive-1.2 workaround code #29961

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,6 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU
uncaughtExceptionHandler.cleanStatus()
}

def isHive23OrSpark: Boolean

// In Hive 1.2, the string representation of a decimal omits trailing zeroes.
// But in Hive 2.3, it is always padded to 18 digits with trailing zeroes if necessary.
val decimalToString: Column => Column = if (isHive23OrSpark) {
c => c.cast("string")
} else {
c => c.cast("decimal(1, 0)").cast("string")
}

def createScriptTransformationExec(
input: Seq[Expression],
script: String,
Expand Down Expand Up @@ -142,7 +132,7 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU
'a.cast("string"),
'b.cast("string"),
'c.cast("string"),
decimalToString('d),
'd.cast("string"),
'e.cast("string")).collect())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,7 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll {
}
}

protected def testSelectiveDictionaryEncoding(isSelective: Boolean,
isHive23: Boolean = false): Unit = {
protected def testSelectiveDictionaryEncoding(isSelective: Boolean, isHiveOrc: Boolean): Unit = {
val tableName = "orcTable"

withTempDir { dir =>
Expand Down Expand Up @@ -174,7 +173,7 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll {
// Hive 0.11 and RLE v2 is introduced in Hive 0.12 ORC with more improvements.
// For more details, see https://orc.apache.org/specification/
assert(stripe.getColumns(1).getKind === DICTIONARY_V2)
if (isSelective || isHive23) {
if (isSelective || isHiveOrc) {
assert(stripe.getColumns(2).getKind === DIRECT_V2)
} else {
assert(stripe.getColumns(2).getKind === DICTIONARY_V2)
Expand Down Expand Up @@ -581,7 +580,7 @@ class OrcSourceSuite extends OrcSuite with SharedSparkSession {
}

test("Enforce direct encoding column-wise selectively") {
testSelectiveDictionaryEncoding(isSelective = true)
testSelectiveDictionaryEncoding(isSelective = true, isHiveOrc = false)
}

test("SPARK-11412 read and merge orc schemas in parallel") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,6 @@ private[hive] class SparkGetTablesOperation(
tableType,
comment.getOrElse(""))
// Since HIVE-7575(Hive 2.0.0), adds 5 additional columns to the ResultSet of GetTables.
if (HiveUtils.isHive23) {
rowSet.addRow(rowData ++ Array(null, null, null, null, null))
} else {
rowSet.addRow(rowData)
}
rowSet.addRow(rowData ++ Array(null, null, null, null, null))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,17 +113,10 @@ private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, sqlContext: SQLC

private[thriftserver] trait ReflectedCompositeService { this: AbstractService =>

private val logInfo = (msg: String) => if (HiveUtils.isHive23) {
getAncestorField[Logger](this, 3, "LOG").info(msg)
} else {
getAncestorField[Log](this, 3, "LOG").info(msg)
}
private val logInfo = (msg: String) => getAncestorField[Logger](this, 3, "LOG").info(msg)

private val logError = (msg: String, e: Throwable) => if (HiveUtils.isHive23) {
private val logError = (msg: String, e: Throwable) =>
getAncestorField[Logger](this, 3, "LOG").error(msg, e)
} else {
getAncestorField[Log](this, 3, "LOG").error(msg, e)
}

def initCompositeService(hiveConf: HiveConf): Unit = {
// Emulating `CompositeService.init(hiveConf)`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,11 +544,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
conf += resultSet.getString(1) -> resultSet.getString(2)
}

if (HiveUtils.isHive23) {
assert(conf.get(HiveUtils.FAKE_HIVE_VERSION.key) === Some("2.3.7"))
} else {
assert(conf.get(HiveUtils.FAKE_HIVE_VERSION.key) === Some("1.2.1"))
}
assert(conf.get(HiveUtils.FAKE_HIVE_VERSION.key) === Some("2.3.7"))
}
}

Expand All @@ -561,11 +557,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
conf += resultSet.getString(1) -> resultSet.getString(2)
}

if (HiveUtils.isHive23) {
assert(conf.get(HiveUtils.FAKE_HIVE_VERSION.key) === Some("2.3.7"))
} else {
assert(conf.get(HiveUtils.FAKE_HIVE_VERSION.key) === Some("1.2.1"))
}
assert(conf.get(HiveUtils.FAKE_HIVE_VERSION.key) === Some("2.3.7"))
}
}

Expand Down Expand Up @@ -643,11 +635,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
val sessionHandle = client.openSession(user, "")
val sessionID = sessionHandle.getSessionId

if (HiveUtils.isHive23) {
assert(pipeoutFileList(sessionID).length == 2)
} else {
assert(pipeoutFileList(sessionID).length == 1)
}
assert(pipeoutFileList(sessionID).length == 2)

client.closeSession(sessionHandle)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,12 +356,8 @@ class SparkThriftServerProtocolVersionsSuite extends HiveThriftJdbcTest {
assert(metaData.getColumnName(1) === "NULL")
assert(metaData.getColumnTypeName(1) === "void")
assert(metaData.getColumnType(1) === java.sql.Types.NULL)
if (HiveUtils.isHive23) {
// For Hive 1.2 the o.a.h.j.JdbcColumn.typeStringToHiveType can not recognize `null` as
// type name.
assert(metaData.getPrecision(1) === 0)
assert(metaData.getScale(1) === 0)
}
assert(metaData.getPrecision(1) === 0)
assert(metaData.getScale(1) === 0)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1145,11 +1145,8 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
* The set of tests that are believed to be working in catalyst. Tests not on includeList or
* excludeList are implicitly marked as ignored.
*/
override def includeList: Seq[String] = if (HiveUtils.isHive23) {
override def includeList: Seq[String] =
commonIncludeList ++ Seq(
"decimal_1_1"
)
} else {
commonIncludeList
}
}
48 changes: 14 additions & 34 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
Original file line number Diff line number Diff line change
Expand Up @@ -161,46 +161,26 @@ private[hive] object HiveShim {
}

def deserializePlan[UDFType](is: java.io.InputStream, clazz: Class[_]): UDFType = {
if (HiveUtils.isHive23) {
val borrowKryo = serUtilClass.getMethod("borrowKryo")
val kryo = borrowKryo.invoke(serUtilClass)
val deserializeObjectByKryo = findMethod(serUtilClass, deserializeMethodName,
kryo.getClass.getSuperclass, classOf[InputStream], classOf[Class[_]])
try {
deserializeObjectByKryo.invoke(null, kryo, is, clazz).asInstanceOf[UDFType]
} finally {
serUtilClass.getMethod("releaseKryo", kryo.getClass.getSuperclass).invoke(null, kryo)
}
} else {
val runtimeSerializationKryo = utilClass.getField("runtimeSerializationKryo")
val threadLocalValue = runtimeSerializationKryo.get(utilClass)
val getMethod = threadLocalValue.getClass.getMethod("get")
val kryo = getMethod.invoke(threadLocalValue)
val deserializeObjectByKryo = findMethod(utilClass, deserializeMethodName,
kryo.getClass, classOf[InputStream], classOf[Class[_]])
val borrowKryo = serUtilClass.getMethod("borrowKryo")
val kryo = borrowKryo.invoke(serUtilClass)
val deserializeObjectByKryo = findMethod(serUtilClass, deserializeMethodName,
kryo.getClass.getSuperclass, classOf[InputStream], classOf[Class[_]])
try {
deserializeObjectByKryo.invoke(null, kryo, is, clazz).asInstanceOf[UDFType]
} finally {
serUtilClass.getMethod("releaseKryo", kryo.getClass.getSuperclass).invoke(null, kryo)
}
}

def serializePlan(function: AnyRef, out: java.io.OutputStream): Unit = {
if (HiveUtils.isHive23) {
val borrowKryo = serUtilClass.getMethod("borrowKryo")
val kryo = borrowKryo.invoke(serUtilClass)
val serializeObjectByKryo = findMethod(serUtilClass, serializeMethodName,
kryo.getClass.getSuperclass, classOf[Object], classOf[OutputStream])
try {
serializeObjectByKryo.invoke(null, kryo, function, out)
} finally {
serUtilClass.getMethod("releaseKryo", kryo.getClass.getSuperclass).invoke(null, kryo)
}
} else {
val runtimeSerializationKryo = utilClass.getField("runtimeSerializationKryo")
val threadLocalValue = runtimeSerializationKryo.get(utilClass)
val getMethod = threadLocalValue.getClass.getMethod("get")
val kryo = getMethod.invoke(threadLocalValue)
val serializeObjectByKryo = findMethod(utilClass, serializeMethodName,
kryo.getClass, classOf[Object], classOf[OutputStream])
val borrowKryo = serUtilClass.getMethod("borrowKryo")
val kryo = borrowKryo.invoke(serUtilClass)
val serializeObjectByKryo = findMethod(serUtilClass, serializeMethodName,
kryo.getClass.getSuperclass, classOf[Object], classOf[OutputStream])
try {
serializeObjectByKryo.invoke(null, kryo, function, out)
} finally {
serUtilClass.getMethod("releaseKryo", kryo.getClass.getSuperclass).invoke(null, kryo)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,9 @@ private[spark] object HiveUtils extends Logging {
}

private val hiveVersion = HiveVersionInfo.getVersion
val isHive23: Boolean = hiveVersion.startsWith("2.3")

/** The version of hive used internally by Spark SQL. */
val builtinHiveVersion: String = if (isHive23) hiveVersion else "1.2.1"
val builtinHiveVersion: String = hiveVersion

val HIVE_METASTORE_VERSION = buildStaticConf("spark.sql.hive.metastore.version")
.doc("Version of the Hive metastore. Available options are " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,19 +169,15 @@ private[hive] class HiveClientImpl(
// since HIVE-11878, and ADDJarCommand will add jars to clientLoader.classLoader.
// For this reason we cannot load the jars added by ADDJarCommand because of class loader
// got changed. We reset it to clientLoader.ClassLoader here.
if (HiveUtils.isHive23) {
state.getConf.setClassLoader(clientLoader.classLoader)
}
state.getConf.setClassLoader(clientLoader.classLoader)
SessionState.start(state)
state.out = new PrintStream(outputBuffer, true, UTF_8.name())
state.err = new PrintStream(outputBuffer, true, UTF_8.name())
state
}

/** Returns the configuration for the current session. */
def conf: HiveConf = if (!HiveUtils.isHive23) {
state.getConf
} else {
def conf: HiveConf = {
val hiveConf = state.getConf
// Hive changed the default of datanucleus.schema.autoCreateAll from true to false
// and hive.metastore.schema.verification from false to true since Hive 2.0.
Expand Down Expand Up @@ -293,8 +289,7 @@ private[hive] class HiveClientImpl(
val ret = try {
f
} catch {
case e: NoClassDefFoundError
if HiveUtils.isHive23 && e.getMessage.contains("org/apache/hadoop/hive/serde2/SerDe") =>
case e: NoClassDefFoundError if e.getMessage.contains("apache/hadoop/hive/serde2/SerDe") =>
throw new ClassNotFoundException("The SerDe interface removed since Hive 2.3(HIVE-15167)." +
" Please migrate your custom SerDes to Hive 2.3. See HIVE-15167 for more details.", e)
} finally {
Expand Down
18 changes: 5 additions & 13 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -350,19 +350,11 @@ private[hive] case class HiveUDAFFunction(
}

val clazz = Utils.classForName(classOf[SimpleGenericUDAFParameterInfo].getName)
if (HiveUtils.isHive23) {
val ctor = clazz.getDeclaredConstructor(
classOf[Array[ObjectInspector]], JBoolean.TYPE, JBoolean.TYPE, JBoolean.TYPE)
val args = Array[AnyRef](inputInspectors, JBoolean.FALSE, JBoolean.FALSE, JBoolean.FALSE)
val parameterInfo = ctor.newInstance(args: _*).asInstanceOf[SimpleGenericUDAFParameterInfo]
resolver.getEvaluator(parameterInfo)
} else {
val ctor = clazz.getDeclaredConstructor(
classOf[Array[ObjectInspector]], JBoolean.TYPE, JBoolean.TYPE)
val args = Array[AnyRef](inputInspectors, JBoolean.FALSE, JBoolean.FALSE)
val parameterInfo = ctor.newInstance(args: _*).asInstanceOf[SimpleGenericUDAFParameterInfo]
resolver.getEvaluator(parameterInfo)
}
val ctor = clazz.getDeclaredConstructor(
classOf[Array[ObjectInspector]], JBoolean.TYPE, JBoolean.TYPE, JBoolean.TYPE)
val args = Array[AnyRef](inputInspectors, JBoolean.FALSE, JBoolean.FALSE, JBoolean.FALSE)
val parameterInfo = ctor.newInstance(args: _*).asInstanceOf[SimpleGenericUDAFParameterInfo]
resolver.getEvaluator(parameterInfo)
}

private case class HiveEvaluator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,21 +71,7 @@ private[orc] object OrcFilters extends Logging {
}

def createFilter(schema: StructType, filters: Array[Filter]): Option[SearchArgument] = {
if (HiveUtils.isHive23) {
DatasourceOrcFilters.createFilter(schema, filters).asInstanceOf[Option[SearchArgument]]
} else {
val dataTypeMap = schema.map(f => quoteIfNeeded(f.name) -> f.dataType).toMap
// TODO (SPARK-25557): ORC doesn't support nested predicate pushdown, so they are removed.
val newFilters = filters.filter(!_.containsNestedColumn)
// Combines all convertible filters using `And` to produce a single conjunction
val conjunctionOptional = buildTree(convertibleFilters(schema, dataTypeMap, newFilters))
conjunctionOptional.map { conjunction =>
// Then tries to build a single ORC `SearchArgument` for the conjunction predicate.
// The input predicate is fully convertible. There should not be any empty result in the
// following recursive method call `buildSearchArgument`.
buildSearchArgument(dataTypeMap, conjunction, newBuilder).build()
}
}
DatasourceOrcFilters.createFilter(schema, filters).asInstanceOf[Option[SearchArgument]]
}

def convertibleFilters(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,12 @@ class ClasspathDependenciesSuite extends SparkFunSuite {
}
}

test("shaded Protobuf") {
if (HiveUtils.isHive23) {
assertLoads("com.google.protobuf.ServiceException")
} else {
assertLoads("org.apache.hive.com.google.protobuf.ServiceException")
}
test("protobuf") {
assertLoads("com.google.protobuf.ServiceException")
}

test("shaded Kryo") {
if (HiveUtils.isHive23) {
assertLoads("com.esotericsoftware.kryo.Kryo")
} else {
assertLoads("org.apache.hive.com.esotericsoftware.kryo.Kryo")
}
test("kryo") {
assertLoads("com.esotericsoftware.kryo.Kryo")
}

test("hive-common") {
Expand All @@ -89,12 +81,7 @@ class ClasspathDependenciesSuite extends SparkFunSuite {
}

test("parquet-hadoop-bundle") {
if (HiveUtils.isHive23) {
assertLoads("org.apache.parquet.hadoop.ParquetOutputFormat")
assertLoads("org.apache.parquet.hadoop.ParquetInputFormat")
} else {
assertLoads("parquet.hadoop.ParquetOutputFormat")
assertLoads("parquet.hadoop.ParquetInputFormat")
}
assertLoads("org.apache.parquet.hadoop.ParquetOutputFormat")
assertLoads("org.apache.parquet.hadoop.ParquetInputFormat")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -206,13 +206,8 @@ class DataSourceWithHiveMetastoreCatalogSuite
assert(columns.map(_.dataType) === Seq(DecimalType(10, 3), StringType))

checkAnswer(table("t"), testDF)
if (HiveUtils.isHive23) {
assert(sparkSession.metadataHive.runSqlHive("SELECT * FROM t") ===
Seq("1.100\t1", "2.100\t2"))
} else {
assert(sparkSession.metadataHive.runSqlHive("SELECT * FROM t") ===
Seq("1.1\t1", "2.1\t2"))
}
assert(sparkSession.metadataHive.runSqlHive("SELECT * FROM t") ===
Seq("1.100\t1", "2.100\t2"))
}
}

Expand Down Expand Up @@ -244,13 +239,8 @@ class DataSourceWithHiveMetastoreCatalogSuite
assert(columns.map(_.dataType) === Seq(DecimalType(10, 3), StringType))

checkAnswer(table("t"), testDF)
if (HiveUtils.isHive23) {
assert(sparkSession.metadataHive.runSqlHive("SELECT * FROM t") ===
Seq("1.100\t1", "2.100\t2"))
} else {
assert(sparkSession.metadataHive.runSqlHive("SELECT * FROM t") ===
Seq("1.1\t1", "2.1\t2"))
}
assert(sparkSession.metadataHive.runSqlHive("SELECT * FROM t") ===
Seq("1.100\t1", "2.100\t2"))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,10 @@ class HiveShimSuite extends SparkFunSuite {

// test when READ_COLUMN_NAMES_CONF_STR is empty
HiveShim.appendReadColumns(conf, ids, names)
if (HiveUtils.isHive23) {
assert(names === ColumnProjectionUtils.getReadColumnNames(conf))
} else {
assert(names.asJava === ColumnProjectionUtils.getReadColumnNames(conf))
}
assert(names === ColumnProjectionUtils.getReadColumnNames(conf))

// test when READ_COLUMN_NAMES_CONF_STR is non-empty
HiveShim.appendReadColumns(conf, moreIds, moreNames)
if (HiveUtils.isHive23) {
assert((names ++ moreNames) === ColumnProjectionUtils.getReadColumnNames(conf))
} else {
assert((names ++ moreNames).asJava === ColumnProjectionUtils.getReadColumnNames(conf))
}
assert((names ++ moreNames) === ColumnProjectionUtils.getReadColumnNames(conf))
}
}
Loading