Skip to content

Commit

Permalink
Clean up temporary files created by forceToDiskExecution
Browse files Browse the repository at this point in the history
  • Loading branch information
ajohnson-stripe committed Nov 15, 2016
1 parent f9ba9b8 commit b34d3d4
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 24 deletions.
63 changes: 55 additions & 8 deletions scalding-core/src/main/scala/com/twitter/scalding/Execution.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import scala.concurrent.{ Await, Future, ExecutionContext => ConcurrentExecution
import scala.util.{ Failure, Success, Try }
import scala.util.control.NonFatal
import cascading.flow.{ FlowDef, Flow }
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{ FileSystem, Path }
import org.slf4j.LoggerFactory

import scala.collection.mutable
import scala.runtime.ScalaRunTime

Expand Down Expand Up @@ -148,7 +152,7 @@ sealed trait Execution[+T] extends java.io.Serializable { self: Product =>
// get on Trampoline
val result = runStats(confWithId, mode, ec)(cec).get.map(_._1)
// When the final future in complete we stop the submit thread
result.onComplete { _ => ec.finished() }
result.onComplete { _ => ec.finished(mode) }
// wait till the end to start the thread in case the above throws
ec.start()
result
Expand Down Expand Up @@ -283,6 +287,35 @@ object Execution {
a.zip(b).map { case (ta, tb) => Monoid.plus(ta, tb) }
}

/**
* This is a Thread used as a shutdown hook to clean up temporary files created by some Execution
*
* If the job is aborted the shutdown hook may not run and the temporary files will not get cleaned up
*/
private[scalding] case class TempFileCleanup(filesToCleanup: Iterable[String], mode: Mode) extends Thread {
val LOG = LoggerFactory.getLogger(this.getClass)

override def run(): Unit = {
val fs = mode match {
case localMode: CascadingLocal => FileSystem.getLocal(new Configuration)
case hdfsMode: HadoopMode => FileSystem.get(hdfsMode.jobConf)
}

filesToCleanup.foreach { file: String =>
try {
val path = new Path(file)
if (fs.exists(path)) {
// The "true" parameter here indicates that we should recursively delete everything under the given path
fs.delete(path, true)
}
} catch {
// If we fail in deleting a temp file, log the error but don't fail the run
case e: Throwable => LOG.warn(s"Unable to delete temp file $file", e)
}
}
}
}

/**
* This is a mutable state that is kept internal to an execution
* as it is evaluating.
Expand All @@ -305,6 +338,7 @@ object Execution {
type Counters = Map[Long, ExecutionCounters]
private[this] val cache = new FutureCache[(Config, Execution[Any]), (Any, Counters)]
private[this] val toWriteCache = new FutureCache[(Config, ToWrite), Counters]
private[this] val filesToCleanup = mutable.Set[String]()

// This method with return a 'clean' cache, that shares
// the underlying thread and message queue of the parent evalCache
Expand All @@ -313,7 +347,7 @@ object Execution {
new EvalCache {
override protected[EvalCache] val messageQueue: LinkedBlockingQueue[EvalCache.FlowDefAction] = self.messageQueue
override def start(): Unit = sys.error("Invalid to start child EvalCache")
override def finished(): Unit = sys.error("Invalid to finish child EvalCache")
override def finished(mode: Mode): Unit = sys.error("Invalid to finish child EvalCache")
}
}

Expand Down Expand Up @@ -378,7 +412,12 @@ object Execution {
/*
* This is called after we are done submitting all jobs
*/
def finished(): Unit = messageQueue.put(Stop)
def finished(mode: Mode): Unit = {
messageQueue.put(Stop)
if (filesToCleanup.nonEmpty) {
Runtime.getRuntime.addShutdownHook(TempFileCleanup(filesToCleanup, mode))
}
}

def getOrLock(cfg: Config, write: ToWrite): Either[Promise[Counters], Future[Counters]] =
toWriteCache.getOrPromise((cfg, write))
Expand All @@ -392,6 +431,8 @@ object Execution {
def getOrElseInsert[T](cfg: Config, ex: Execution[T],
res: => Future[(T, Counters)]): Future[(T, Counters)] =
getOrElseInsertWithFeedback(cfg, ex, res)._2

def addFilesToCleanup(files: TraversableOnce[String]): Unit = filesToCleanup ++= files
}

private case class FutureConst[T](get: ConcurrentExecutionContext => Future[T]) extends Execution[T] {
Expand Down Expand Up @@ -630,7 +671,7 @@ object Execution {
* are based on on this one. By keeping the Pipe and the Sink, can inspect the Execution
* DAG and optimize it later (a goal, but not done yet).
*/
private case class WriteExecution[T](head: ToWrite, tail: List[ToWrite], fn: (Config, Mode) => T) extends Execution[T] {
private case class WriteExecution[T](head: ToWrite, tail: List[ToWrite], fn: (Config, Mode) => T, tempFilesToCleanup: (Config, Mode) => Set[String] = (_, _) => Set()) extends Execution[T] {

/**
* Apply a pure function to the result. This may not
Expand Down Expand Up @@ -671,6 +712,7 @@ object Execution {
// Anything not already ran we run as part of a single flow def, using their combined counters for the others
protected def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) = {
Trampoline(cache.getOrElseInsert(conf, this, {
cache.addFilesToCleanup(tempFilesToCleanup(conf, mode))
val cacheLookup: List[(ToWrite, Either[Promise[Map[Long, ExecutionCounters]], Future[Map[Long, ExecutionCounters]]])] =
(head :: tail).map{ tw => (tw, cache.getOrLock(conf, tw)) }
val (weDoOperation, someoneElseDoesOperation) = unwrapListEither(cacheLookup)
Expand Down Expand Up @@ -713,11 +755,15 @@ object Execution {
*/
override def zip[U](that: Execution[U]): Execution[(T, U)] =
that match {
case WriteExecution(h, t, otherFn) =>
case WriteExecution(h, t, otherFn, tempFiles) =>
val newFn = { (conf: Config, mode: Mode) =>
(fn(conf, mode), otherFn(conf, mode))
}
WriteExecution(head, h :: t ::: tail, newFn)
val newTempFilesFn = { (conf: Config, mode: Mode) =>
tempFilesToCleanup(conf, mode) ++ tempFiles(conf, mode)
}

WriteExecution(head, h :: t ::: tail, newFn, newTempFilesFn)
case o => Zipped(this, that)
}

Expand Down Expand Up @@ -798,11 +844,12 @@ object Execution {
* Here we allow both the targets to write and the sources to be generated from the config and mode.
* This allows us to merge things looking for the config and mode without using flatmap.
*/
private[scalding] def write[T, U](fn: (Config, Mode) => (TypedPipe[T], TypedSink[T]), generatorFn: (Config, Mode) => U): Execution[U] =
private[scalding] def write[T, U](fn: (Config, Mode) => (TypedPipe[T], TypedSink[T]), generatorFn: (Config, Mode) => U,
tempFilesToCleanup: (Config, Mode) => Set[String] = (_, _) => Set()): Execution[U] =
WriteExecution(PreparedWrite({ (cfg: Config, m: Mode) =>
val r = fn(cfg, m)
SimpleWrite(r._1, r._2)
}), Nil, generatorFn)
}), Nil, generatorFn, tempFilesToCleanup)

/**
* Convenience method to get the Args
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ limitations under the License.
package com.twitter.scalding.typed

import java.io.{ OutputStream, InputStream, Serializable }
import java.util.Random
import java.util.{ Random, UUID }

import cascading.flow.FlowDef
import cascading.pipe.{ Each, Pipe }
Expand Down Expand Up @@ -503,14 +503,18 @@ trait TypedPipe[+T] extends Serializable {
val cachedRandomUUID = java.util.UUID.randomUUID
lazy val inMemoryDest = new MemorySink[T]

def hadoopTypedSource(conf: Config): TypedSource[T] with TypedSink[T] = {
// come up with unique temporary filename, use the config here
// TODO: refactor into TemporarySequenceFile class
def temporaryPath(conf: Config, uuid: UUID): String = {
val tmpDir = conf.get("hadoop.tmp.dir")
.orElse(conf.get("cascading.tmp.dir"))
.getOrElse("/tmp")

val tmpSeq = tmpDir + "/scalding/snapshot-" + cachedRandomUUID + ".seq"
tmpDir + "/scalding/snapshot-" + uuid + ".seq"
}

def hadoopTypedSource(conf: Config): TypedSource[T] with TypedSink[T] = {
// come up with unique temporary filename, use the config here
// TODO: refactor into TemporarySequenceFile class
val tmpSeq = temporaryPath(conf, cachedRandomUUID)
source.TypedSequenceFile[T](tmpSeq)

}
Expand All @@ -532,7 +536,16 @@ trait TypedPipe[+T] extends Serializable {
}
}

Execution.write(writeFn, readFn)
val filesToDeleteFn = { (conf: Config, mode: Mode) =>
mode match {
case _: CascadingLocal => // Local or Test mode
Set[String]()
case _: HadoopMode =>
Set(temporaryPath(conf, cachedRandomUUID))
}
}

Execution.write(writeFn, readFn, filesToDeleteFn)
}

/**
Expand Down
121 changes: 111 additions & 10 deletions scalding-core/src/test/scala/com/twitter/scalding/ExecutionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,20 @@ limitations under the License.
package com.twitter.scalding

import com.twitter.algebird.monad.Reader

import com.twitter.scalding.serialization.macros.impl.ordered_serialization.runtime_helpers.MacroEqualityOrderedSerialization
import com.twitter.scalding.serialization.OrderedSerialization
import java.nio.file.{FileSystems, Files, Path}
import java.util

import java.nio.file.FileSystems
import java.nio.file.Path

import org.scalatest.{ Matchers, WordSpec }
import org.scalatest.{Matchers, WordSpec}

import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{ Await, Future, ExecutionContext => ConcurrentExecutionContext, Promise }
import scala.concurrent.{Await, Future, Promise, ExecutionContext => ConcurrentExecutionContext}
import scala.util.Random
import scala.util.{ Try, Success, Failure }

import scala.util.{Failure, Success, Try}
import ExecutionContext._
import com.twitter.scalding.Execution.TempFileCleanup

object ExecutionTestJobs {
def wordCount(in: String, out: String) =
Expand All @@ -57,10 +56,24 @@ object ExecutionTestJobs {

(source.mapValues(_ * 2) ++ (source.mapValues(_ * 3))).toIterableExecution
}

def writeExecutionWithTempFile(tempFile: String, testData: List[String]): Execution[List[String]] = {
val writeFn = { (conf: Config, mode: Mode) =>
(TypedPipe.from(testData), TypedTsv[String](tempFile))
}
val readFn = { (conf: Config, mode: Mode) =>
testData
}

val filesToDeleteFn = { (conf: Config, mode: Mode) =>
Set(tempFile)
}

Execution.write(writeFn, readFn, filesToDeleteFn)
}
}

class WordCountEc(args: Args) extends ExecutionJob[Unit](args) {
def execution = ExecutionTestJobs.wordCount(args("input"), args("output"))
abstract class TestExecutionJob[+T](args: Args) extends ExecutionJob[T](args) {
// In tests, classloader issues with sbt mean we should not
// really use threads, so we run immediately
override def concurrentExecutionContext = new scala.concurrent.ExecutionContext {
Expand All @@ -69,6 +82,22 @@ class WordCountEc(args: Args) extends ExecutionJob[Unit](args) {
}
}

class WordCountEc(args: Args) extends TestExecutionJob[Unit](args) {
def execution = ExecutionTestJobs.wordCount(args("input"), args("output"))
}

class ExecutionWithTempFiles(args: Args, tempFile: String, testData: List[String]) extends TestExecutionJob[List[String]](args) {
override def execution = ExecutionTestJobs.writeExecutionWithTempFile(tempFile, testData)
}

class ZippedExecutionWithTempFiles(args: Args, tempFileOne: String, tempFileTwo: String, testDataOne: List[String], testDataTwo: List[String]) extends TestExecutionJob[(List[String], List[String])](args) {
override def execution = {
val executionOne = ExecutionTestJobs.writeExecutionWithTempFile(tempFileOne, testDataOne)
val executionTwo = ExecutionTestJobs.writeExecutionWithTempFile(tempFileTwo, testDataTwo)
executionOne.zip(executionTwo)
}
}

case class MyCustomType(s: String)

class ExecutionTest extends WordSpec with Matchers {
Expand All @@ -86,6 +115,18 @@ class ExecutionTest extends WordSpec with Matchers {
}
}

def getShutdownHooks: Seq[Thread] = {
// The list of attached shutdown hooks are not accessible normally, so we must use reflection to get them
val clazz = Class.forName("java.lang.ApplicationShutdownHooks")
val hooksField = clazz.getDeclaredField("hooks")
hooksField.setAccessible(true)
hooksField.get(null).asInstanceOf[util.IdentityHashMap[Thread, Thread]].asScala.keys.toSeq
}

def isTempFileCleanupHook(hook: Thread): Boolean = {
classOf[TempFileCleanup].isAssignableFrom(hook.getClass)
}

"An Execution" should {
"run" in {
ExecutionTestJobs.wordCount2(TypedPipe.from(List("a b b c c c", "d d d d")))
Expand Down Expand Up @@ -251,6 +292,66 @@ class ExecutionTest extends WordSpec with Matchers {
}
}
"Executions" should {
"shutdown hook should clean up temporary files" in {
val tempFileOne = Files.createTempDirectory("scalding-execution-test")
val tempFileTwo = Files.createTempDirectory("scalding-execution-test")
val mode = Test(Map())

Files.exists(tempFileOne) should be(true)
Files.exists(tempFileTwo) should be(true)

val cleanupThread = TempFileCleanup(Seq(tempFileOne.toFile.getAbsolutePath, tempFileTwo.toFile.getAbsolutePath), mode)
cleanupThread.run()

Files.exists(tempFileOne) should be(false)
Files.exists(tempFileTwo) should be(false)
}

"clean up temporary files on exit" in {
val tempFile = Files.createTempDirectory("scalding-execution-test").toFile.getAbsolutePath
val testData = List("a", "b", "c")
getShutdownHooks.foreach { hook: Thread =>
isTempFileCleanupHook(hook) should be(false)
}

ExecutionTestJobs.writeExecutionWithTempFile(tempFile, testData).shouldSucceed()

// This is hacky, but there's a small chance that the new cleanup hook isn't registered by the time we get here
// A small sleep like this appears to be sufficient to ensure we can see it
Thread.sleep(1000)
val cleanupHook = getShutdownHooks.find(isTempFileCleanupHook)
cleanupHook shouldBe defined

cleanupHook.get.asInstanceOf[TempFileCleanup].filesToCleanup should contain theSameElementsAs Set(tempFile)
cleanupHook.get.run()
// Remove the hook so it doesn't show up in the list of shutdown hooks for other tests
Runtime.getRuntime.removeShutdownHook(cleanupHook.get)
}

"clean up temporary files on exit with a zip" in {
val tempFileOne = Files.createTempDirectory("scalding-execution-test").toFile.getAbsolutePath
val tempFileTwo = Files.createTempDirectory("scalding-execution-test").toFile.getAbsolutePath
val testDataOne = List("a", "b", "c")
val testDataTwo = List("x", "y", "z")
getShutdownHooks.foreach { hook: Thread =>
isTempFileCleanupHook(hook) should be(false)
}

ExecutionTestJobs.writeExecutionWithTempFile(tempFileOne, testDataOne)
.zip(ExecutionTestJobs.writeExecutionWithTempFile(tempFileTwo, testDataTwo)).shouldSucceed()

// This is hacky, but there's a small chance that the new cleanup hook isn't registered by the time we get here
// A small sleep like this appears to be sufficient to ensure we can see it
Thread.sleep(1000)
val cleanupHook = getShutdownHooks.find(isTempFileCleanupHook)
cleanupHook shouldBe defined

cleanupHook.get.asInstanceOf[TempFileCleanup].filesToCleanup should contain theSameElementsAs Set(tempFileOne, tempFileTwo)
cleanupHook.get.run()
// Remove the hook so it doesn't show up in the list of shutdown hooks for other tests
Runtime.getRuntime.removeShutdownHook(cleanupHook.get)
}

"evaluate once per run" in {
var first = 0
var second = 0
Expand Down

0 comments on commit b34d3d4

Please sign in to comment.