Skip to content

Commit a31def1

Browse files
lianchengrxin
authored andcommitted
[SPARK-2263][SQL] Support inserting MAP<K, V> to Hive tables
JIRA issue: [SPARK-2263](https://issues.apache.org/jira/browse/SPARK-2263) Map objects were not converted to Hive types before inserting into Hive tables. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #1205 from liancheng/spark-2263 and squashes the following commits: c7a4373 [Cheng Lian] Addressed @concretevitamin's comment 784940b [Cheng Lian] SARPK-2263: support inserting MAP<K, V> to Hive tables (cherry picked from commit 8fade89) Signed-off-by: Reynold Xin <rxin@apache.org>
1 parent d3dbaf5 commit a31def1

File tree

3 files changed

+20
-6
lines changed

3 files changed

+20
-6
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.spark.sql.hive.execution
1919

2020
import scala.collection.JavaConversions._
2121

22+
import java.util.{HashMap => JHashMap}
23+
2224
import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar}
2325
import org.apache.hadoop.hive.metastore.MetaStoreUtils
2426
import org.apache.hadoop.hive.ql.Context
@@ -88,6 +90,12 @@ case class InsertIntoHiveTable(
8890
val wrappedSeq = s.map(wrap(_, oi.getListElementObjectInspector))
8991
seqAsJavaList(wrappedSeq)
9092

93+
case (m: Map[_, _], oi: MapObjectInspector) =>
94+
val keyOi = oi.getMapKeyObjectInspector
95+
val valueOi = oi.getMapValueObjectInspector
96+
val wrappedMap = m.map { case (key, value) => wrap(key, keyOi) -> wrap(value, valueOi) }
97+
mapAsJavaMap(wrappedMap)
98+
9199
case (obj, _) =>
92100
obj
93101
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -228,15 +228,15 @@ class HiveQuerySuite extends HiveComparisonTest {
228228
val fixture = List(("foo", 2), ("bar", 1), ("foo", 4), ("bar", 3))
229229
.zipWithIndex.map {case Pair(Pair(value, attr), key) => HavingRow(key, value, attr)}
230230
TestHive.sparkContext.parallelize(fixture).registerAsTable("having_test")
231-
val results =
231+
val results =
232232
hql("SELECT value, max(attr) AS attr FROM having_test GROUP BY value HAVING attr > 3")
233233
.collect()
234234
.map(x => Pair(x.getString(0), x.getInt(1)))
235235

236236
assert(results === Array(Pair("foo", 4)))
237237
TestHive.reset()
238238
}
239-
239+
240240
test("SPARK-2180: HAVING with non-boolean clause raises no exceptions") {
241241
hql("select key, count(*) c from src group by key having c").collect()
242242
}
@@ -370,6 +370,16 @@ class HiveQuerySuite extends HiveComparisonTest {
370370
}
371371
}
372372

373+
test("SPARK-2263: Insert Map<K, V> values") {
374+
hql("CREATE TABLE m(value MAP<INT, STRING>)")
375+
hql("INSERT OVERWRITE TABLE m SELECT MAP(key, value) FROM src LIMIT 10")
376+
hql("SELECT * FROM m").collect().zip(hql("SELECT * FROM src LIMIT 10").collect()).map {
377+
case (Row(map: Map[Int, String]), Row(key: Int, value: String)) =>
378+
assert(map.size === 1)
379+
assert(map.head === (key, value))
380+
}
381+
}
382+
373383
test("parse HQL set commands") {
374384
// Adapted from its SQL counterpart.
375385
val testKey = "spark.sql.key.usedfortestonly"
@@ -460,7 +470,6 @@ class HiveQuerySuite extends HiveComparisonTest {
460470

461471
// Put tests that depend on specific Hive settings before these last two test,
462472
// since they modify /clear stuff.
463-
464473
}
465474

466475
// for SPARK-2180 test

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,3 @@ class PairUdf extends GenericUDF {
122122

123123
override def getDisplayString(p1: Array[String]): String = ""
124124
}
125-
126-
127-

0 commit comments

Comments
 (0)