Skip to content

Commit 54d3f6e

Browse files
AngersZhuuuuwangyum
authored andcommitted
[SPARK-28982][SQL] Implementation Spark's own GetTypeInfoOperation
### What changes were proposed in this pull request? Current Spark Thrift Server return TypeInfo includes 1. INTERVAL_YEAR_MONTH 2. INTERVAL_DAY_TIME 3. UNION 4. USER_DEFINED Spark doesn't support INTERVAL_YEAR_MONTH, INTERVAL_YEAR_MONTH, UNION and won't return USER)DEFINED type. This PR overwrite GetTypeInfoOperation with SparkGetTypeInfoOperation to exclude types which we don't need. In hive-1.2.1 Type class is `org.apache.hive.service.cli.Type` In hive-2.3.x Type class is `org.apache.hadoop.hive.serde2.thrift.Type` Use ThrifrserverShimUtils to fit version problem and exclude types we don't need ### Why are the changes needed? We should return type info of Spark's own type info ### Does this PR introduce any user-facing change? No ### How was this patch tested? Manuel test & Added UT Closes #25694 from AngersZhuuuu/SPARK-28982. Lead-authored-by: angerszhu <angers.zhu@gmail.com> Co-authored-by: AngersZhuuuu <angers.zhu@gmail.com> Signed-off-by: Yuming Wang <wgyumg@gmail.com>
1 parent 7309e02 commit 54d3f6e

File tree

7 files changed

+150
-2
lines changed

7 files changed

+150
-2
lines changed
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hive.thriftserver
19+
20+
import java.util.UUID
21+
22+
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
23+
import org.apache.hive.service.cli.{HiveSQLException, OperationState}
24+
import org.apache.hive.service.cli.operation.GetTypeInfoOperation
25+
import org.apache.hive.service.cli.session.HiveSession
26+
27+
import org.apache.spark.internal.Logging
28+
import org.apache.spark.sql.SQLContext
29+
import org.apache.spark.util.{Utils => SparkUtils}
30+
31+
/**
32+
* Spark's own GetTypeInfoOperation
33+
*
34+
* @param sqlContext SQLContext to use
35+
* @param parentSession a HiveSession from SessionManager
36+
*/
37+
private[hive] class SparkGetTypeInfoOperation(
38+
sqlContext: SQLContext,
39+
parentSession: HiveSession)
40+
extends GetTypeInfoOperation(parentSession) with Logging {
41+
42+
private var statementId: String = _
43+
44+
override def close(): Unit = {
45+
super.close()
46+
HiveThriftServer2.listener.onOperationClosed(statementId)
47+
}
48+
49+
override def runInternal(): Unit = {
50+
statementId = UUID.randomUUID().toString
51+
val logMsg = "Listing type info"
52+
logInfo(s"$logMsg with $statementId")
53+
setState(OperationState.RUNNING)
54+
// Always use the latest class loader provided by executionHive's state.
55+
val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
56+
Thread.currentThread().setContextClassLoader(executionHiveClassLoader)
57+
58+
if (isAuthV2Enabled) {
59+
authorizeMetaGets(HiveOperationType.GET_TYPEINFO, null)
60+
}
61+
62+
HiveThriftServer2.listener.onStatementStart(
63+
statementId,
64+
parentSession.getSessionHandle.getSessionId.toString,
65+
logMsg,
66+
statementId,
67+
parentSession.getUsername)
68+
69+
try {
70+
ThriftserverShimUtils.supportedType().foreach(typeInfo => {
71+
val rowData = Array[AnyRef](
72+
typeInfo.getName, // TYPE_NAME
73+
typeInfo.toJavaSQLType.asInstanceOf[AnyRef], // DATA_TYPE
74+
typeInfo.getMaxPrecision.asInstanceOf[AnyRef], // PRECISION
75+
typeInfo.getLiteralPrefix, // LITERAL_PREFIX
76+
typeInfo.getLiteralSuffix, // LITERAL_SUFFIX
77+
typeInfo.getCreateParams, // CREATE_PARAMS
78+
typeInfo.getNullable.asInstanceOf[AnyRef], // NULLABLE
79+
typeInfo.isCaseSensitive.asInstanceOf[AnyRef], // CASE_SENSITIVE
80+
typeInfo.getSearchable.asInstanceOf[AnyRef], // SEARCHABLE
81+
typeInfo.isUnsignedAttribute.asInstanceOf[AnyRef], // UNSIGNED_ATTRIBUTE
82+
typeInfo.isFixedPrecScale.asInstanceOf[AnyRef], // FIXED_PREC_SCALE
83+
typeInfo.isAutoIncrement.asInstanceOf[AnyRef], // AUTO_INCREMENT
84+
typeInfo.getLocalizedName, // LOCAL_TYPE_NAME
85+
typeInfo.getMinimumScale.asInstanceOf[AnyRef], // MINIMUM_SCALE
86+
typeInfo.getMaximumScale.asInstanceOf[AnyRef], // MAXIMUM_SCALE
87+
null, // SQL_DATA_TYPE, unused
88+
null, // SQL_DATETIME_SUB, unused
89+
typeInfo.getNumPrecRadix // NUM_PREC_RADIX
90+
)
91+
rowSet.addRow(rowData)
92+
})
93+
setState(OperationState.FINISHED)
94+
} catch {
95+
case e: HiveSQLException =>
96+
setState(OperationState.ERROR)
97+
HiveThriftServer2.listener.onStatementError(
98+
statementId, e.getMessage, SparkUtils.exceptionString(e))
99+
throw e
100+
}
101+
HiveThriftServer2.listener.onStatementFinish(statementId)
102+
}
103+
}

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,4 +140,15 @@ private[thriftserver] class SparkSQLOperationManager()
140140
logDebug(s"Created GetFunctionsOperation with session=$parentSession.")
141141
operation
142142
}
143+
144+
override def newGetTypeInfoOperation(
145+
parentSession: HiveSession): GetTypeInfoOperation = synchronized {
146+
val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
147+
require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" +
148+
" initialized or had already closed.")
149+
val operation = new SparkGetTypeInfoOperation(sqlContext, parentSession)
150+
handleToOperation.put(operation.getHandle, operation)
151+
logDebug(s"Created GetTypeInfoOperation with session=$parentSession.")
152+
operation
153+
}
143154
}

sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SparkMetadataOperationSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,4 +231,20 @@ class SparkMetadataOperationSuite extends HiveThriftJdbcTest {
231231
assert(!rs.next())
232232
}
233233
}
234+
235+
test("GetTypeInfo Thrift API") {
236+
def checkResult(rs: ResultSet, typeNames: Seq[String]): Unit = {
237+
for (i <- typeNames.indices) {
238+
assert(rs.next())
239+
assert(rs.getString("TYPE_NAME") === typeNames(i))
240+
}
241+
// Make sure there are no more elements
242+
assert(!rs.next())
243+
}
244+
245+
withJdbcStatement() { statement =>
246+
val metaData = statement.getConnection.getMetaData
247+
checkResult(metaData.getTypeInfo, ThriftserverShimUtils.supportedType().map(_.getName))
248+
}
249+
}
234250
}

sql/hive-thriftserver/v1.2.1/src/main/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public class GetTypeInfoOperation extends MetadataOperation {
7373
.addPrimitiveColumn("NUM_PREC_RADIX", Type.INT_TYPE,
7474
"Usually 2 or 10");
7575

76-
private final RowSet rowSet;
76+
protected final RowSet rowSet;
7777

7878
protected GetTypeInfoOperation(HiveSession parentSession) {
7979
super(parentSession, OperationType.GET_TYPE_INFO);

sql/hive-thriftserver/v1.2.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/ThriftserverShimUtils.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import org.apache.commons.logging.LogFactory
2121
import org.apache.hadoop.hive.ql.exec.Utilities
2222
import org.apache.hadoop.hive.ql.session.SessionState
2323
import org.apache.hive.service.cli.{RowSet, RowSetFactory, TableSchema, Type}
24+
import org.apache.hive.service.cli.Type._
2425
import org.apache.hive.service.cli.thrift.TProtocolVersion._
2526

2627
/**
@@ -51,6 +52,14 @@ private[thriftserver] object ThriftserverShimUtils {
5152

5253
private[thriftserver] def toJavaSQLType(s: String): Int = Type.getType(s).toJavaSQLType
5354

55+
private[thriftserver] def supportedType(): Seq[Type] = {
56+
Seq(NULL_TYPE, BOOLEAN_TYPE, STRING_TYPE, BINARY_TYPE,
57+
TINYINT_TYPE, SMALLINT_TYPE, INT_TYPE, BIGINT_TYPE,
58+
FLOAT_TYPE, DOUBLE_TYPE, DECIMAL_TYPE,
59+
DATE_TYPE, TIMESTAMP_TYPE,
60+
ARRAY_TYPE, MAP_TYPE, STRUCT_TYPE)
61+
}
62+
5463
private[thriftserver] def addToClassPath(
5564
loader: ClassLoader,
5665
auxJars: Array[String]): ClassLoader = {

sql/hive-thriftserver/v2.3.5/src/main/java/org/apache/hive/service/cli/operation/GetTypeInfoOperation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public class GetTypeInfoOperation extends MetadataOperation {
7373
.addPrimitiveColumn("NUM_PREC_RADIX", Type.INT_TYPE,
7474
"Usually 2 or 10");
7575

76-
private final RowSet rowSet;
76+
protected final RowSet rowSet;
7777

7878
protected GetTypeInfoOperation(HiveSession parentSession) {
7979
super(parentSession, OperationType.GET_TYPE_INFO);

sql/hive-thriftserver/v2.3.5/src/main/scala/org/apache/spark/sql/hive/thriftserver/ThriftserverShimUtils.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
2424
import org.apache.hadoop.hive.ql.exec.AddToClassPathAction
2525
import org.apache.hadoop.hive.ql.session.SessionState
2626
import org.apache.hadoop.hive.serde2.thrift.Type
27+
import org.apache.hadoop.hive.serde2.thrift.Type._
2728
import org.apache.hive.service.cli.{RowSet, RowSetFactory, TableSchema}
2829
import org.apache.hive.service.rpc.thrift.TProtocolVersion._
2930
import org.slf4j.LoggerFactory
@@ -56,6 +57,14 @@ private[thriftserver] object ThriftserverShimUtils {
5657

5758
private[thriftserver] def toJavaSQLType(s: String): Int = Type.getType(s).toJavaSQLType
5859

60+
private[thriftserver] def supportedType(): Seq[Type] = {
61+
Seq(NULL_TYPE, BOOLEAN_TYPE, STRING_TYPE, BINARY_TYPE,
62+
TINYINT_TYPE, SMALLINT_TYPE, INT_TYPE, BIGINT_TYPE,
63+
FLOAT_TYPE, DOUBLE_TYPE, DECIMAL_TYPE,
64+
DATE_TYPE, TIMESTAMP_TYPE,
65+
ARRAY_TYPE, MAP_TYPE, STRUCT_TYPE)
66+
}
67+
5968
private[thriftserver] def addToClassPath(
6069
loader: ClassLoader,
6170
auxJars: Array[String]): ClassLoader = {

0 commit comments

Comments
 (0)