Skip to content

Commit a127387

Browse files
committed
[SPARK-33082][SQL] Remove hive-1.2 workaround code
### What changes were proposed in this pull request? This PR removes old Hive-1.2 profile related workaround code. ### Why are the changes needed? To simply the code. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CI. Closes #29961 from dongjoon-hyun/SPARK-HIVE12. Authored-by: Dongjoon Hyun <dhyun@apple.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
1 parent 3099fd9 commit a127387

File tree

22 files changed

+60
-226
lines changed

22 files changed

+60
-226
lines changed

sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -63,16 +63,6 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU
6363
uncaughtExceptionHandler.cleanStatus()
6464
}
6565

66-
def isHive23OrSpark: Boolean
67-
68-
// In Hive 1.2, the string representation of a decimal omits trailing zeroes.
69-
// But in Hive 2.3, it is always padded to 18 digits with trailing zeroes if necessary.
70-
val decimalToString: Column => Column = if (isHive23OrSpark) {
71-
c => c.cast("string")
72-
} else {
73-
c => c.cast("decimal(1, 0)").cast("string")
74-
}
75-
7666
def createScriptTransformationExec(
7767
input: Seq[Expression],
7868
script: String,
@@ -142,7 +132,7 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU
142132
'a.cast("string"),
143133
'b.cast("string"),
144134
'c.cast("string"),
145-
decimalToString('d),
135+
'd.cast("string"),
146136
'e.cast("string")).collect())
147137
}
148138
}

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,7 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll {
120120
}
121121
}
122122

123-
protected def testSelectiveDictionaryEncoding(isSelective: Boolean,
124-
isHive23: Boolean = false): Unit = {
123+
protected def testSelectiveDictionaryEncoding(isSelective: Boolean, isHiveOrc: Boolean): Unit = {
125124
val tableName = "orcTable"
126125

127126
withTempDir { dir =>
@@ -174,7 +173,7 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll {
174173
// Hive 0.11 and RLE v2 is introduced in Hive 0.12 ORC with more improvements.
175174
// For more details, see https://orc.apache.org/specification/
176175
assert(stripe.getColumns(1).getKind === DICTIONARY_V2)
177-
if (isSelective || isHive23) {
176+
if (isSelective || isHiveOrc) {
178177
assert(stripe.getColumns(2).getKind === DIRECT_V2)
179178
} else {
180179
assert(stripe.getColumns(2).getKind === DICTIONARY_V2)
@@ -581,7 +580,7 @@ class OrcSourceSuite extends OrcSuite with SharedSparkSession {
581580
}
582581

583582
test("Enforce direct encoding column-wise selectively") {
584-
testSelectiveDictionaryEncoding(isSelective = true)
583+
testSelectiveDictionaryEncoding(isSelective = true, isHiveOrc = false)
585584
}
586585

587586
test("SPARK-11412 read and merge orc schemas in parallel") {

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -125,10 +125,6 @@ private[hive] class SparkGetTablesOperation(
125125
tableType,
126126
comment.getOrElse(""))
127127
// Since HIVE-7575(Hive 2.0.0), adds 5 additional columns to the ResultSet of GetTables.
128-
if (HiveUtils.isHive23) {
129-
rowSet.addRow(rowData ++ Array(null, null, null, null, null))
130-
} else {
131-
rowSet.addRow(rowData)
132-
}
128+
rowSet.addRow(rowData ++ Array(null, null, null, null, null))
133129
}
134130
}

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -113,17 +113,10 @@ private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, sqlContext: SQLC
113113

114114
private[thriftserver] trait ReflectedCompositeService { this: AbstractService =>
115115

116-
private val logInfo = (msg: String) => if (HiveUtils.isHive23) {
117-
getAncestorField[Logger](this, 3, "LOG").info(msg)
118-
} else {
119-
getAncestorField[Log](this, 3, "LOG").info(msg)
120-
}
116+
private val logInfo = (msg: String) => getAncestorField[Logger](this, 3, "LOG").info(msg)
121117

122-
private val logError = (msg: String, e: Throwable) => if (HiveUtils.isHive23) {
118+
private val logError = (msg: String, e: Throwable) =>
123119
getAncestorField[Logger](this, 3, "LOG").error(msg, e)
124-
} else {
125-
getAncestorField[Log](this, 3, "LOG").error(msg, e)
126-
}
127120

128121
def initCompositeService(hiveConf: HiveConf): Unit = {
129122
// Emulating `CompositeService.init(hiveConf)`

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -544,11 +544,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
544544
conf += resultSet.getString(1) -> resultSet.getString(2)
545545
}
546546

547-
if (HiveUtils.isHive23) {
548-
assert(conf.get(HiveUtils.FAKE_HIVE_VERSION.key) === Some("2.3.7"))
549-
} else {
550-
assert(conf.get(HiveUtils.FAKE_HIVE_VERSION.key) === Some("1.2.1"))
551-
}
547+
assert(conf.get(HiveUtils.FAKE_HIVE_VERSION.key) === Some("2.3.7"))
552548
}
553549
}
554550

@@ -561,11 +557,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
561557
conf += resultSet.getString(1) -> resultSet.getString(2)
562558
}
563559

564-
if (HiveUtils.isHive23) {
565-
assert(conf.get(HiveUtils.FAKE_HIVE_VERSION.key) === Some("2.3.7"))
566-
} else {
567-
assert(conf.get(HiveUtils.FAKE_HIVE_VERSION.key) === Some("1.2.1"))
568-
}
560+
assert(conf.get(HiveUtils.FAKE_HIVE_VERSION.key) === Some("2.3.7"))
569561
}
570562
}
571563

@@ -643,11 +635,7 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
643635
val sessionHandle = client.openSession(user, "")
644636
val sessionID = sessionHandle.getSessionId
645637

646-
if (HiveUtils.isHive23) {
647-
assert(pipeoutFileList(sessionID).length == 2)
648-
} else {
649-
assert(pipeoutFileList(sessionID).length == 1)
650-
}
638+
assert(pipeoutFileList(sessionID).length == 2)
651639

652640
client.closeSession(sessionHandle)
653641

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkThriftServerProtocolVersionsSuite.scala

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -356,12 +356,8 @@ class SparkThriftServerProtocolVersionsSuite extends HiveThriftJdbcTest {
356356
assert(metaData.getColumnName(1) === "NULL")
357357
assert(metaData.getColumnTypeName(1) === "void")
358358
assert(metaData.getColumnType(1) === java.sql.Types.NULL)
359-
if (HiveUtils.isHive23) {
360-
// For Hive 1.2 the o.a.h.j.JdbcColumn.typeStringToHiveType can not recognize `null` as
361-
// type name.
362-
assert(metaData.getPrecision(1) === 0)
363-
assert(metaData.getScale(1) === 0)
364-
}
359+
assert(metaData.getPrecision(1) === 0)
360+
assert(metaData.getScale(1) === 0)
365361
}
366362
}
367363

sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1145,11 +1145,8 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
11451145
* The set of tests that are believed to be working in catalyst. Tests not on includeList or
11461146
* excludeList are implicitly marked as ignored.
11471147
*/
1148-
override def includeList: Seq[String] = if (HiveUtils.isHive23) {
1148+
override def includeList: Seq[String] =
11491149
commonIncludeList ++ Seq(
11501150
"decimal_1_1"
11511151
)
1152-
} else {
1153-
commonIncludeList
1154-
}
11551152
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala

Lines changed: 14 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -161,46 +161,26 @@ private[hive] object HiveShim {
161161
}
162162

163163
def deserializePlan[UDFType](is: java.io.InputStream, clazz: Class[_]): UDFType = {
164-
if (HiveUtils.isHive23) {
165-
val borrowKryo = serUtilClass.getMethod("borrowKryo")
166-
val kryo = borrowKryo.invoke(serUtilClass)
167-
val deserializeObjectByKryo = findMethod(serUtilClass, deserializeMethodName,
168-
kryo.getClass.getSuperclass, classOf[InputStream], classOf[Class[_]])
169-
try {
170-
deserializeObjectByKryo.invoke(null, kryo, is, clazz).asInstanceOf[UDFType]
171-
} finally {
172-
serUtilClass.getMethod("releaseKryo", kryo.getClass.getSuperclass).invoke(null, kryo)
173-
}
174-
} else {
175-
val runtimeSerializationKryo = utilClass.getField("runtimeSerializationKryo")
176-
val threadLocalValue = runtimeSerializationKryo.get(utilClass)
177-
val getMethod = threadLocalValue.getClass.getMethod("get")
178-
val kryo = getMethod.invoke(threadLocalValue)
179-
val deserializeObjectByKryo = findMethod(utilClass, deserializeMethodName,
180-
kryo.getClass, classOf[InputStream], classOf[Class[_]])
164+
val borrowKryo = serUtilClass.getMethod("borrowKryo")
165+
val kryo = borrowKryo.invoke(serUtilClass)
166+
val deserializeObjectByKryo = findMethod(serUtilClass, deserializeMethodName,
167+
kryo.getClass.getSuperclass, classOf[InputStream], classOf[Class[_]])
168+
try {
181169
deserializeObjectByKryo.invoke(null, kryo, is, clazz).asInstanceOf[UDFType]
170+
} finally {
171+
serUtilClass.getMethod("releaseKryo", kryo.getClass.getSuperclass).invoke(null, kryo)
182172
}
183173
}
184174

185175
def serializePlan(function: AnyRef, out: java.io.OutputStream): Unit = {
186-
if (HiveUtils.isHive23) {
187-
val borrowKryo = serUtilClass.getMethod("borrowKryo")
188-
val kryo = borrowKryo.invoke(serUtilClass)
189-
val serializeObjectByKryo = findMethod(serUtilClass, serializeMethodName,
190-
kryo.getClass.getSuperclass, classOf[Object], classOf[OutputStream])
191-
try {
192-
serializeObjectByKryo.invoke(null, kryo, function, out)
193-
} finally {
194-
serUtilClass.getMethod("releaseKryo", kryo.getClass.getSuperclass).invoke(null, kryo)
195-
}
196-
} else {
197-
val runtimeSerializationKryo = utilClass.getField("runtimeSerializationKryo")
198-
val threadLocalValue = runtimeSerializationKryo.get(utilClass)
199-
val getMethod = threadLocalValue.getClass.getMethod("get")
200-
val kryo = getMethod.invoke(threadLocalValue)
201-
val serializeObjectByKryo = findMethod(utilClass, serializeMethodName,
202-
kryo.getClass, classOf[Object], classOf[OutputStream])
176+
val borrowKryo = serUtilClass.getMethod("borrowKryo")
177+
val kryo = borrowKryo.invoke(serUtilClass)
178+
val serializeObjectByKryo = findMethod(serUtilClass, serializeMethodName,
179+
kryo.getClass.getSuperclass, classOf[Object], classOf[OutputStream])
180+
try {
203181
serializeObjectByKryo.invoke(null, kryo, function, out)
182+
} finally {
183+
serUtilClass.getMethod("releaseKryo", kryo.getClass.getSuperclass).invoke(null, kryo)
204184
}
205185
}
206186

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,9 @@ private[spark] object HiveUtils extends Logging {
5656
}
5757

5858
private val hiveVersion = HiveVersionInfo.getVersion
59-
val isHive23: Boolean = hiveVersion.startsWith("2.3")
6059

6160
/** The version of hive used internally by Spark SQL. */
62-
val builtinHiveVersion: String = if (isHive23) hiveVersion else "1.2.1"
61+
val builtinHiveVersion: String = hiveVersion
6362

6463
val HIVE_METASTORE_VERSION = buildStaticConf("spark.sql.hive.metastore.version")
6564
.doc("Version of the Hive metastore. Available options are " +

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -169,19 +169,15 @@ private[hive] class HiveClientImpl(
169169
// since HIVE-11878, and ADDJarCommand will add jars to clientLoader.classLoader.
170170
// For this reason we cannot load the jars added by ADDJarCommand because of class loader
171171
// got changed. We reset it to clientLoader.ClassLoader here.
172-
if (HiveUtils.isHive23) {
173-
state.getConf.setClassLoader(clientLoader.classLoader)
174-
}
172+
state.getConf.setClassLoader(clientLoader.classLoader)
175173
SessionState.start(state)
176174
state.out = new PrintStream(outputBuffer, true, UTF_8.name())
177175
state.err = new PrintStream(outputBuffer, true, UTF_8.name())
178176
state
179177
}
180178

181179
/** Returns the configuration for the current session. */
182-
def conf: HiveConf = if (!HiveUtils.isHive23) {
183-
state.getConf
184-
} else {
180+
def conf: HiveConf = {
185181
val hiveConf = state.getConf
186182
// Hive changed the default of datanucleus.schema.autoCreateAll from true to false
187183
// and hive.metastore.schema.verification from false to true since Hive 2.0.
@@ -293,8 +289,7 @@ private[hive] class HiveClientImpl(
293289
val ret = try {
294290
f
295291
} catch {
296-
case e: NoClassDefFoundError
297-
if HiveUtils.isHive23 && e.getMessage.contains("org/apache/hadoop/hive/serde2/SerDe") =>
292+
case e: NoClassDefFoundError if e.getMessage.contains("apache/hadoop/hive/serde2/SerDe") =>
298293
throw new ClassNotFoundException("The SerDe interface removed since Hive 2.3(HIVE-15167)." +
299294
" Please migrate your custom SerDes to Hive 2.3. See HIVE-15167 for more details.", e)
300295
} finally {

sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -350,19 +350,11 @@ private[hive] case class HiveUDAFFunction(
350350
}
351351

352352
val clazz = Utils.classForName(classOf[SimpleGenericUDAFParameterInfo].getName)
353-
if (HiveUtils.isHive23) {
354-
val ctor = clazz.getDeclaredConstructor(
355-
classOf[Array[ObjectInspector]], JBoolean.TYPE, JBoolean.TYPE, JBoolean.TYPE)
356-
val args = Array[AnyRef](inputInspectors, JBoolean.FALSE, JBoolean.FALSE, JBoolean.FALSE)
357-
val parameterInfo = ctor.newInstance(args: _*).asInstanceOf[SimpleGenericUDAFParameterInfo]
358-
resolver.getEvaluator(parameterInfo)
359-
} else {
360-
val ctor = clazz.getDeclaredConstructor(
361-
classOf[Array[ObjectInspector]], JBoolean.TYPE, JBoolean.TYPE)
362-
val args = Array[AnyRef](inputInspectors, JBoolean.FALSE, JBoolean.FALSE)
363-
val parameterInfo = ctor.newInstance(args: _*).asInstanceOf[SimpleGenericUDAFParameterInfo]
364-
resolver.getEvaluator(parameterInfo)
365-
}
353+
val ctor = clazz.getDeclaredConstructor(
354+
classOf[Array[ObjectInspector]], JBoolean.TYPE, JBoolean.TYPE, JBoolean.TYPE)
355+
val args = Array[AnyRef](inputInspectors, JBoolean.FALSE, JBoolean.FALSE, JBoolean.FALSE)
356+
val parameterInfo = ctor.newInstance(args: _*).asInstanceOf[SimpleGenericUDAFParameterInfo]
357+
resolver.getEvaluator(parameterInfo)
366358
}
367359

368360
private case class HiveEvaluator(

sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -71,21 +71,7 @@ private[orc] object OrcFilters extends Logging {
7171
}
7272

7373
def createFilter(schema: StructType, filters: Array[Filter]): Option[SearchArgument] = {
74-
if (HiveUtils.isHive23) {
75-
DatasourceOrcFilters.createFilter(schema, filters).asInstanceOf[Option[SearchArgument]]
76-
} else {
77-
val dataTypeMap = schema.map(f => quoteIfNeeded(f.name) -> f.dataType).toMap
78-
// TODO (SPARK-25557): ORC doesn't support nested predicate pushdown, so they are removed.
79-
val newFilters = filters.filter(!_.containsNestedColumn)
80-
// Combines all convertible filters using `And` to produce a single conjunction
81-
val conjunctionOptional = buildTree(convertibleFilters(schema, dataTypeMap, newFilters))
82-
conjunctionOptional.map { conjunction =>
83-
// Then tries to build a single ORC `SearchArgument` for the conjunction predicate.
84-
// The input predicate is fully convertible. There should not be any empty result in the
85-
// following recursive method call `buildSearchArgument`.
86-
buildSearchArgument(dataTypeMap, conjunction, newBuilder).build()
87-
}
88-
}
74+
DatasourceOrcFilters.createFilter(schema, filters).asInstanceOf[Option[SearchArgument]]
8975
}
9076

9177
def convertibleFilters(

sql/hive/src/test/scala/org/apache/spark/sql/hive/ClasspathDependenciesSuite.scala

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -57,20 +57,12 @@ class ClasspathDependenciesSuite extends SparkFunSuite {
5757
}
5858
}
5959

60-
test("shaded Protobuf") {
61-
if (HiveUtils.isHive23) {
62-
assertLoads("com.google.protobuf.ServiceException")
63-
} else {
64-
assertLoads("org.apache.hive.com.google.protobuf.ServiceException")
65-
}
60+
test("protobuf") {
61+
assertLoads("com.google.protobuf.ServiceException")
6662
}
6763

68-
test("shaded Kryo") {
69-
if (HiveUtils.isHive23) {
70-
assertLoads("com.esotericsoftware.kryo.Kryo")
71-
} else {
72-
assertLoads("org.apache.hive.com.esotericsoftware.kryo.Kryo")
73-
}
64+
test("kryo") {
65+
assertLoads("com.esotericsoftware.kryo.Kryo")
7466
}
7567

7668
test("hive-common") {
@@ -89,12 +81,7 @@ class ClasspathDependenciesSuite extends SparkFunSuite {
8981
}
9082

9183
test("parquet-hadoop-bundle") {
92-
if (HiveUtils.isHive23) {
93-
assertLoads("org.apache.parquet.hadoop.ParquetOutputFormat")
94-
assertLoads("org.apache.parquet.hadoop.ParquetInputFormat")
95-
} else {
96-
assertLoads("parquet.hadoop.ParquetOutputFormat")
97-
assertLoads("parquet.hadoop.ParquetInputFormat")
98-
}
84+
assertLoads("org.apache.parquet.hadoop.ParquetOutputFormat")
85+
assertLoads("org.apache.parquet.hadoop.ParquetInputFormat")
9986
}
10087
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -206,13 +206,8 @@ class DataSourceWithHiveMetastoreCatalogSuite
206206
assert(columns.map(_.dataType) === Seq(DecimalType(10, 3), StringType))
207207

208208
checkAnswer(table("t"), testDF)
209-
if (HiveUtils.isHive23) {
210-
assert(sparkSession.metadataHive.runSqlHive("SELECT * FROM t") ===
211-
Seq("1.100\t1", "2.100\t2"))
212-
} else {
213-
assert(sparkSession.metadataHive.runSqlHive("SELECT * FROM t") ===
214-
Seq("1.1\t1", "2.1\t2"))
215-
}
209+
assert(sparkSession.metadataHive.runSqlHive("SELECT * FROM t") ===
210+
Seq("1.100\t1", "2.100\t2"))
216211
}
217212
}
218213

@@ -244,13 +239,8 @@ class DataSourceWithHiveMetastoreCatalogSuite
244239
assert(columns.map(_.dataType) === Seq(DecimalType(10, 3), StringType))
245240

246241
checkAnswer(table("t"), testDF)
247-
if (HiveUtils.isHive23) {
248-
assert(sparkSession.metadataHive.runSqlHive("SELECT * FROM t") ===
249-
Seq("1.100\t1", "2.100\t2"))
250-
} else {
251-
assert(sparkSession.metadataHive.runSqlHive("SELECT * FROM t") ===
252-
Seq("1.1\t1", "2.1\t2"))
253-
}
242+
assert(sparkSession.metadataHive.runSqlHive("SELECT * FROM t") ===
243+
Seq("1.100\t1", "2.100\t2"))
254244
}
255245
}
256246
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveShimSuite.scala

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,18 +35,10 @@ class HiveShimSuite extends SparkFunSuite {
3535

3636
// test when READ_COLUMN_NAMES_CONF_STR is empty
3737
HiveShim.appendReadColumns(conf, ids, names)
38-
if (HiveUtils.isHive23) {
39-
assert(names === ColumnProjectionUtils.getReadColumnNames(conf))
40-
} else {
41-
assert(names.asJava === ColumnProjectionUtils.getReadColumnNames(conf))
42-
}
38+
assert(names === ColumnProjectionUtils.getReadColumnNames(conf))
4339

4440
// test when READ_COLUMN_NAMES_CONF_STR is non-empty
4541
HiveShim.appendReadColumns(conf, moreIds, moreNames)
46-
if (HiveUtils.isHive23) {
47-
assert((names ++ moreNames) === ColumnProjectionUtils.getReadColumnNames(conf))
48-
} else {
49-
assert((names ++ moreNames).asJava === ColumnProjectionUtils.getReadColumnNames(conf))
50-
}
42+
assert((names ++ moreNames) === ColumnProjectionUtils.getReadColumnNames(conf))
5143
}
5244
}

0 commit comments

Comments
 (0)