-
Notifications
You must be signed in to change notification settings - Fork 707
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Clean up temporary files created by forceToDiskExecution #1621
Clean up temporary files created by forceToDiskExecution #1621
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great! Thanks for attacking this problem.
val newFn = { (conf: Config, mode: Mode) => | ||
(fn(conf, mode), otherFn(conf, mode)) | ||
} | ||
WriteExecution(head, h :: t ::: tail, newFn) | ||
WriteExecution(head, h :: t ::: tail, newFn, tempFiles) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't you need to merge the sets from the left and right? Seems like we are losing this.tempFilesToCleanup
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah good catch, I do need that here.
Set(tempFile) | ||
} | ||
|
||
Execution.write(writeFn, readFn, filesToDeleteFn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you test a .zip
to be sure we don't lose files there?
try { | ||
val path = new Path(file) | ||
if (fs.exists(path)) { | ||
fs.delete(path, true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you comment what the boolean means at this call site?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
case hdfsMode: HadoopMode => FileSystem.get(hdfsMode.jobConf) | ||
} | ||
|
||
filesToCleanup foreach { file: String => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we prefer the .
style and use filesToCleanup.foreach {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
f8a6cf2
to
53de7ff
Compare
Thanks for the feedback @oscar-stripe. I incorporated in the changes you suggested. I also noticed that the REPL expects that the temporary outputs from |
Hmm, I'm not sure I see why the CI build is failing. It's complaining about failing when compiling maple: I do see this in the log, maybe related?
|
} | ||
} catch { | ||
// If we fail in deleting a temp file, log the error but don't fail the run | ||
case e: Exception => LOG.info(s"Unable to delete temp file $file") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we pass e to the log statement? that way we get the message and stacktrace? Also should this be a warn?
@@ -284,6 +288,34 @@ object Execution { | |||
} | |||
|
|||
/** | |||
* This is a Thread used as a shutdown hook to clean up temporary files created by some Execution | |||
*/ | |||
case class TempFileCleanup(filesToCleanup: mutable.Set[String], mode: Mode) extends Thread { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we're just iterating over the filesToCleanup in this case class, maybe we can make this a more generic interface rather than mutable.Set? Iterable / Seq / Set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
// The "true" parameter here indicates that we should recursively delete everything under the given path | ||
fs.delete(path, true) | ||
} | ||
} catch { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as this is part of a shutdown hook, should we add a catch all clause?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean catch { case e: Throwable =>
so we catch anything? Is that what you mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@oscar-stripe yeah. If we for whatever reason throw an caught exception in this code that will result in the shutdown code not being run completely as I understand it.
@@ -284,6 +288,34 @@ object Execution { | |||
} | |||
|
|||
/** | |||
* This is a Thread used as a shutdown hook to clean up temporary files created by some Execution | |||
*/ | |||
case class TempFileCleanup(filesToCleanup: mutable.Set[String], mode: Mode) extends Thread { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe also call out that if the job is aborted this will not cleanup stuff (if folks haven't read / know about shutdown hook semantics)
@@ -284,6 +288,34 @@ object Execution { | |||
} | |||
|
|||
/** | |||
* This is a Thread used as a shutdown hook to clean up temporary files created by some Execution | |||
*/ | |||
case class TempFileCleanup(filesToCleanup: mutable.Set[String], mode: Mode) extends Thread { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to taking an immutable Iterable
here. Can we make the class private?
// The "true" parameter here indicates that we should recursively delete everything under the given path | ||
fs.delete(path, true) | ||
} | ||
} catch { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean catch { case e: Throwable =>
so we catch anything? Is that what you mean?
53de7ff
to
b34d3d4
Compare
I've incorporated the next round of feedback. I'm still looking into it, but I'm not able to reproduce the CI build failure. |
I restarted. Let's see if it is a CI issue. On Tue, Nov 15, 2016 at 8:17 AM Andrew Johnson notifications@github.com
|
This reverts commit b9a04b6.
@ajohnson-stripe changes look good to me. 👍 |
@@ -313,7 +347,7 @@ object Execution { | |||
new EvalCache { | |||
override protected[EvalCache] val messageQueue: LinkedBlockingQueue[EvalCache.FlowDefAction] = self.messageQueue | |||
override def start(): Unit = sys.error("Invalid to start child EvalCache") | |||
override def finished(): Unit = sys.error("Invalid to finish child EvalCache") | |||
override def finished(mode: Mode): Unit = sys.error("Invalid to finish child EvalCache") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't we need to override the addFilesToCleanup
to forward back to the parent list? I think any files added to the child will not be deleted, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that does need to be added
@@ -305,15 +338,17 @@ object Execution { | |||
type Counters = Map[Long, ExecutionCounters] | |||
private[this] val cache = new FutureCache[(Config, Execution[Any]), (Any, Counters)] | |||
private[this] val toWriteCache = new FutureCache[(Config, ToWrite), Counters] | |||
protected[EvalCache] val filesToCleanup = mutable.Set[String]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, I think we should keep this private and override the add method. The reason is we forgot one more thing: this is not thread-safe, but we have so many threads in play, we should make this thread-safe.
I think we are modifying this value from several threads.
So, if we just forward the method, we can handle the locking a bit more cleanly inside that method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I'll switch that up.
} | ||
} catch { | ||
// If we fail in deleting a temp file, log the error but don't fail the run | ||
case e: Throwable => LOG.warn(s"Unable to delete temp file $file", e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style-wise I think case e => LOG.warn(s"Unable to delete temp file $file", e)
is the same (without the :Throwable
part)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is the same, but leaving out the : Throwable
generates a compiler warning requesting that you be explicit. I see a mix of both in the codebase now, but I'm happy to go with whatever's the preferred style.
* | ||
* If the job is aborted the shutdown hook may not run and the temporary files will not get cleaned up | ||
*/ | ||
private[scalding] case class TempFileCleanup(filesToCleanup: Iterable[String], mode: Mode) extends Thread { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this extend Runnable instead? Does it have to extend Thread?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addShutdownHook
will only take a Thread
unfortunately
👍 looks good to me. |
For throwable we should be explicit and avoid the warning in my view.
|
This adds functionality for
WriteExecution
to keep track of temporary files created and then delete them after execution has finished. Currently this is only used byforceToDiskExecution
, which previously would leave around temporary files every time it was used.This fixes #1615.