Skip to content

Commit e41bd1a

Browse files
dsergeevmaprekrivokonmapr
authored andcommitted
28339 bug fixed (apache#128)
1 parent d872b5e commit e41bd1a

File tree

2 files changed

+10
-2
lines changed

2 files changed

+10
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,12 @@ object FileFormatWriter extends Logging {
241241
try {
242242
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
243243
// Execute the task to write rows out and commit the task.
244+
val summary = writeTask.execute(iterator)
245+
writeTask.releaseResources()
246+
val waitingTimeForInit =
247+
SparkEnv.get.conf.getLong("spark.mapr.commitDelay", defaultValue = 0)
248+
Thread.sleep(waitingTimeForInit)
249+
WriteTaskResult(committer.commitTask(taskAttemptContext), summary)
244250
while (iterator.hasNext) {
245251
dataWriter.write(iterator.next())
246252
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,8 +221,10 @@ case class InsertIntoHadoopFsRelationCommand(
221221
// first clear the path determined by the static partition keys (e.g. /table/foo=1)
222222
val staticPrefixPath = qualifiedOutputPath.suffix(staticPartitionPrefix)
223223
if (fs.exists(staticPrefixPath) && !committer.deleteWithJob(fs, staticPrefixPath, true)) {
224-
throw new IOException(s"Unable to clear output " +
225-
s"directory $staticPrefixPath prior to writing to it")
224+
if (fs.exists(staticPrefixPath)) {
225+
throw new IOException(s"Unable to clear output " +
226+
s"directory $staticPrefixPath prior to writing to it")
227+
}
226228
}
227229
// now clear all custom partition locations (e.g. /custom/dir/where/foo=2/bar=4)
228230
for ((spec, customLoc) <- customPartitionLocations) {

0 commit comments

Comments
 (0)