Skip to content

Commit 9cc8474

Browse files
committed
fix write path
1 parent cc1172e commit 9cc8474

File tree

2 files changed

+6
-14
lines changed

2 files changed

+6
-14
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1722,7 +1722,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
17221722
case "parquet" =>
17231723
createTableDesc
17241724
.setOutputFormat("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat")
1725-
createTableDesc.setSerName("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
1725+
createTableDesc
1726+
.setSerName("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")
17261727

17271728
case "rcfile" =>
17281729
createTableDesc.setOutputFormat("org.apache.hadoop.hive.ql.io.RCFileOutputFormat")

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/WriteToDirectory.scala

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -55,14 +55,14 @@ case class WriteToDirectory(
5555
val jobConfSer = new SerializableJobConf(jobConf)
5656
val targetPath = new Path(path)
5757

58-
val (tmpPath, destPath) = if (isLocal) {
58+
val writeToPath = if (isLocal) {
5959
val localFileSystem = FileSystem.getLocal(jobConf)
6060
val localPath = localFileSystem.makeQualified(targetPath)
6161
// remove old dir
6262
if (localFileSystem.exists(localPath)) {
6363
localFileSystem.delete(localPath, true)
6464
}
65-
(context.getExternalTmpPath(localPath), localPath)
65+
localPath
6666
} else {
6767
val qualifiedPath = FileUtils.makeQualified(targetPath, hiveContext.hiveconf)
6868
val dfs = qualifiedPath.getFileSystem(jobConf)
@@ -71,10 +71,10 @@ case class WriteToDirectory(
7171
} else {
7272
dfs.mkdirs(qualifiedPath.getParent)
7373
}
74-
(context.getExternalTmpPath(qualifiedPath), qualifiedPath)
74+
qualifiedPath
7575
}
7676

77-
val fileSinkConf = new FileSinkDesc(tmpPath.toString, desc, false)
77+
val fileSinkConf = new FileSinkDesc(writeToPath.toString, desc, false)
7878
val isCompressed = hiveContext.hiveconf.getBoolean(
7979
ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal)
8080

@@ -100,15 +100,6 @@ case class WriteToDirectory(
100100
jobConfSer,
101101
writerContainer)
102102

103-
val fs = tmpPath.getFileSystem(jobConf)
104-
105-
// move tmp file to dest dir
106-
if (isLocal) {
107-
fs.moveToLocalFile(tmpPath, destPath)
108-
} else if (!fs.rename(tmpPath, destPath)) {
109-
throw new IOException("Unable to write data to " + destPath)
110-
}
111-
112103
Seq.empty[Row]
113104
}
114105

0 commit comments

Comments
 (0)