@@ -87,17 +87,17 @@ case class InsertIntoHiveDirCommand(
87
87
88
88
val targetPath = new Path (storage.locationUri.get)
89
89
val qualifiedPath = FileUtils .makeQualified(targetPath, hadoopConf)
90
- val writeToPath =
90
+ val ( writeToPath : Path , fs : FileSystem ) =
91
91
if (isLocal) {
92
92
val localFileSystem = FileSystem .getLocal(jobConf)
93
- localFileSystem.makeQualified(targetPath)
93
+ ( localFileSystem.makeQualified(targetPath), localFileSystem )
94
94
} else {
95
- val dfs = qualifiedPath.getFileSystem(jobConf)
96
- if (! dfs.exists(qualifiedPath)) {
97
- dfs.mkdirs(qualifiedPath.getParent)
98
- }
99
- qualifiedPath
95
+ val dfs = qualifiedPath.getFileSystem(hadoopConf)
96
+ (qualifiedPath, dfs)
100
97
}
98
+ if (! fs.exists(writeToPath)) {
99
+ fs.mkdirs(writeToPath)
100
+ }
101
101
102
102
// The temporary path must be a HDFS path, not a local path.
103
103
val tmpPath = getExternalTmpPath(sparkSession, hadoopConf, qualifiedPath)
@@ -112,19 +112,19 @@ case class InsertIntoHiveDirCommand(
112
112
fileSinkConf = fileSinkConf,
113
113
outputLocation = tmpPath.toString)
114
114
115
- val fs = writeToPath.getFileSystem(hadoopConf)
116
115
if (overwrite && fs.exists(writeToPath)) {
117
116
fs.listStatus(writeToPath).foreach { existFile =>
118
117
if (Option (existFile.getPath) != createdTempDir) fs.delete(existFile.getPath, true )
119
118
}
120
119
}
121
120
122
- fs.listStatus(tmpPath).foreach {
121
+ val dfs = tmpPath.getFileSystem(hadoopConf)
122
+ dfs.listStatus(tmpPath).foreach {
123
123
tmpFile =>
124
124
if (isLocal) {
125
- fs .copyToLocalFile(tmpFile.getPath, writeToPath)
125
+ dfs .copyToLocalFile(tmpFile.getPath, writeToPath)
126
126
} else {
127
- fs .rename(tmpFile.getPath, writeToPath)
127
+ dfs .rename(tmpFile.getPath, writeToPath)
128
128
}
129
129
}
130
130
} catch {
0 commit comments