@@ -24,7 +24,9 @@ import java.util.{HashMap => JHashMap}
24
24
import org .apache .hadoop .hive .common .`type` .{HiveDecimal , HiveVarchar }
25
25
import org .apache .hadoop .hive .metastore .MetaStoreUtils
26
26
import org .apache .hadoop .hive .ql .Context
27
+ import org .apache .hadoop .hive .ql .ErrorMsg
27
28
import org .apache .hadoop .hive .ql .metadata .Hive
29
+ import org .apache .hadoop .hive .ql .parse .SemanticException
28
30
import org .apache .hadoop .hive .ql .plan .{FileSinkDesc , TableDesc }
29
31
import org .apache .hadoop .hive .serde2 .Serializer
30
32
import org .apache .hadoop .hive .serde2 .objectinspector ._
@@ -40,6 +42,7 @@ import org.apache.spark.rdd.RDD
40
42
import org .apache .spark .sql .catalyst .expressions .Row
41
43
import org .apache .spark .sql .execution .{SparkPlan , UnaryNode }
42
44
import org .apache .spark .sql .hive .{HiveContext , MetastoreRelation , SparkHiveHadoopWriter }
45
+ import org .apache .hadoop .hive .conf .HiveConf
43
46
44
47
/**
45
48
* :: DeveloperApi ::
@@ -159,7 +162,7 @@ case class InsertIntoHiveTable(
159
162
writer.commitJob()
160
163
}
161
164
162
- def getDynamicPartDir (tableInfo : TableDesc , row : Row , dynamicPartNum2 : Int ) : String = {
165
+ def getDynamicPartDir (tableInfo : TableDesc , row : Row , dynamicPartNum2 : Int , jobConf : JobConf ) : String = {
163
166
dynamicPartNum2 match {
164
167
case 0 => " "
165
168
case i => {
@@ -169,18 +172,26 @@ case class InsertIntoHiveTable(
169
172
var buf = new StringBuffer ()
170
173
if (partCols.length == dynamicPartNum2) {
171
174
for (j <- 0 until partCols.length) {
172
- buf.append(" /" ).append(partCols(j)).append(" =" ).append(row(j + row.length - colsNum ))
175
+ buf.append(" /" ).append(partCols(j)).append(" =" ).append(handleNull( row(colsNum + j ), jobConf ))
173
176
}
174
177
} else {
175
178
for (j <- 0 until dynamicPartNum2) {
176
- buf.append(" /" ).append(partCols(j + partCols.length - dynamicPartNum2)).append(" =" ).append(row(j + colsNum ))
179
+ buf.append(" /" ).append(partCols(j + partCols.length - dynamicPartNum2)).append(" =" ).append(handleNull( row(colsNum + j), jobConf ))
177
180
}
178
181
}
179
182
buf.toString
180
183
}
181
184
}
182
185
}
183
186
187
+ def handleNull (obj : Any , jobConf : JobConf ) : String = {
188
+ if (obj == null || obj.toString.length == 0 ) {
189
+ jobConf.get(" hive.exec.default.partition.name " , " __HIVE_DEFAULT_PARTITION__" )
190
+ } else {
191
+ obj.toString
192
+ }
193
+ }
194
+
184
195
override def execute () = result
185
196
186
197
/**
@@ -201,11 +212,38 @@ case class InsertIntoHiveTable(
201
212
val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation)
202
213
val fileSinkConf = new FileSinkDesc (tmpLocation.toString, tableDesc, false )
203
214
var dynamicPartNum = 0
215
+ var numStaPart = 0
204
216
var dynamicPartPath = " " ;
205
217
val partitionSpec = partition.map {
206
- case (key, Some (value)) => key -> value
207
- case (key, None ) => { dynamicPartNum += 1 ; key -> " " }// Should not reach here right now.
218
+ case (key, Some (value)) => { numStaPart += 1 ; key -> value }
219
+ case (key, None ) => { dynamicPartNum += 1 ; key -> " " }
208
220
}
221
+ // ORC stores compression information in table properties. While, there are other formats
222
+ // (e.g. RCFile) that rely on hadoop configurations to store compression information.
223
+ val jobConf = new JobConf (sc.hiveconf)
224
+ val jobConfSer = new SerializableWritable (jobConf)
225
+ // check if the partition spec is valid
226
+ if (dynamicPartNum > 0 ) {
227
+ if (! sc.hiveconf.getBoolVar(HiveConf .ConfVars .DYNAMICPARTITIONING )) {
228
+ throw new SemanticException (
229
+ ErrorMsg .DYNAMIC_PARTITION_DISABLED .getMsg())
230
+ }
231
+ if (numStaPart == 0 && sc.hiveconf.getVar(HiveConf .ConfVars .DYNAMICPARTITIONINGMODE ).equalsIgnoreCase(" strict" )) {
232
+ throw new SemanticException (ErrorMsg .DYNAMIC_PARTITION_STRICT_MODE .getMsg());
233
+ }
234
+ // check if static partition appear after dynamic partitions
235
+ for ((k,v) <- partitionSpec) {
236
+ if (partitionSpec(k) == " " ) {
237
+ if (numStaPart > 0 ) { // found a DP, but there exists ST as subpartition
238
+ throw new SemanticException (
239
+ ErrorMsg .PARTITION_DYN_STA_ORDER .getMsg());
240
+ }
241
+ } else {
242
+ numStaPart -= 1
243
+ }
244
+ }
245
+ }
246
+
209
247
val rdd = childRdd.mapPartitions { iter =>
210
248
val serializer = newSerializer(fileSinkConf.getTableInfo)
211
249
val standardOI = ObjectInspectorUtils
@@ -221,7 +259,7 @@ case class InsertIntoHiveTable(
221
259
var i = 0
222
260
while (i < fieldOIs.length) {
223
261
if (fieldOIs.length < row.length && row.length - fieldOIs.length == dynamicPartNum) {
224
- dynamicPartPath = getDynamicPartDir(fileSinkConf.getTableInfo, row, dynamicPartNum)
262
+ dynamicPartPath = getDynamicPartDir(fileSinkConf.getTableInfo, row, dynamicPartNum, jobConfSer.value )
225
263
}
226
264
// Casts Strings to HiveVarchars when necessary.
227
265
outputData(i) = wrap(row(i), fieldOIs(i))
@@ -232,10 +270,6 @@ case class InsertIntoHiveTable(
232
270
}
233
271
}
234
272
235
- // ORC stores compression information in table properties. While, there are other formats
236
- // (e.g. RCFile) that rely on hadoop configurations to store compression information.
237
- val jobConf = new JobConf (sc.hiveconf)
238
- val jobConfSer = new SerializableWritable (jobConf)
239
273
if (dynamicPartNum> 0 ) {
240
274
if (outputClass == null ) {
241
275
throw new SparkException (" Output value class not set" )
@@ -300,8 +334,6 @@ case class InsertIntoHiveTable(
300
334
v.commitJob()
301
335
}
302
336
writerMap.clear()
303
- // writer.commitJob()
304
-
305
337
} else {
306
338
saveAsHiveFile(
307
339
rdd,
0 commit comments