Skip to content

Make timeouts actually cancel the underlying processes #133

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Dec 7, 2021
17 changes: 9 additions & 8 deletions core/src/main/scala/com/whisk/docker/DockerCommandExecutor.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.whisk.docker

import scala.concurrent.duration.Duration
import scala.concurrent.{ExecutionContext, Future}

object PortProtocol extends Enumeration {
Expand Down Expand Up @@ -29,28 +30,28 @@ trait DockerCommandExecutor {

def host: String

def createContainer(spec: DockerContainer)(implicit ec: ExecutionContext): Future[String]
def createContainer(spec: DockerContainer)(implicit ec: ExecutionContext, timeout: Duration): Future[String]

def startContainer(id: String)(implicit ec: ExecutionContext): Future[Unit]
def startContainer(id: String)(implicit ec: ExecutionContext, timeout: Duration): Future[Unit]

def inspectContainer(id: String)(
implicit ec: ExecutionContext): Future[Option[InspectContainerResult]]
implicit ec: ExecutionContext, timeout: Duration): Future[Option[InspectContainerResult]]

def withLogStreamLines(id: String, withErr: Boolean)(f: String => Unit)(
implicit docker: DockerCommandExecutor,
ec: ExecutionContext
ec: ExecutionContext, timeout: Duration
): Unit

def withLogStreamLinesRequirement(id: String, withErr: Boolean)(f: String => Boolean)(
implicit docker: DockerCommandExecutor,
ec: ExecutionContext): Future[Unit]
ec: ExecutionContext, timeout: Duration): Future[Unit]

def listImages()(implicit ec: ExecutionContext): Future[Set[String]]
def listImages()(implicit ec: ExecutionContext, timeout: Duration): Future[Set[String]]

def pullImage(image: String)(implicit ec: ExecutionContext): Future[Unit]
def pullImage(image: String)(implicit ec: ExecutionContext, timeout: Duration): Future[Unit]

def remove(id: String, force: Boolean = true, removeVolumes: Boolean = true)(
implicit ec: ExecutionContext): Future[Unit]
implicit ec: ExecutionContext, timeout: Duration): Future[Unit]

def close(): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ class DockerContainerManager(containers: Seq[DockerContainer], executor: DockerC
dockerStatesMap(container).isReady()
}

def pullImages(): Future[Seq[String]] = {
def pullImages(timeout: Duration): Future[Seq[String]] = {
implicit val tout = timeout
executor.listImages().flatMap { images =>
val imagesToPull: Seq[String] = containers.map(_.image).filterNot { image =>
val cImage = if (image.contains(":")) image else image + ":latest"
Expand All @@ -40,10 +41,13 @@ class DockerContainerManager(containers: Seq[DockerContainer], executor: DockerC
containerStartTimeout: Duration): Future[Seq[(DockerContainerState, Boolean)]] = {
import DockerContainerManager._

implicit val timeout = containerStartTimeout

@tailrec
def initGraph(graph: ContainerDependencyGraph,
previousInits: Future[Seq[DockerContainerState]] = Future.successful(Seq.empty))
: Future[Seq[DockerContainerState]] = {

val initializedContainers = previousInits.flatMap { prev =>
Future.traverse(graph.containers.map(dockerStatesMap))(_.init()).map(prev ++ _)
}
Expand All @@ -54,6 +58,7 @@ class DockerContainerManager(containers: Seq[DockerContainer], executor: DockerC
val readyInits: Future[Seq[Future[Boolean]]] =
initializedContainers.map(_.map(state => state.isReady()))
val simplifiedReadyInits: Future[Seq[Boolean]] = readyInits.flatMap(Future.sequence(_))

Await.result(simplifiedReadyInits, containerStartTimeout)
initGraph(dependants, initializedContainers)
}
Expand All @@ -68,7 +73,8 @@ class DockerContainerManager(containers: Seq[DockerContainer], executor: DockerC
})
}

def stopRmAll(): Future[Unit] = {
def stopRmAll(timeout: Duration): Future[Unit] = {
implicit val tout = timeout
val future = Future.traverse(states)(_.remove(force = true, removeVolumes = true)).map(_ => ())
future.onComplete { _ =>
executor.close()
Expand Down
20 changes: 12 additions & 8 deletions core/src/main/scala/com/whisk/docker/DockerContainerState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import java.util.concurrent.atomic.AtomicBoolean

import org.slf4j.LoggerFactory

import scala.concurrent.duration.Duration
import scala.concurrent.{ExecutionContext, Future, Promise}

class DockerContainerState(spec: DockerContainer) {
Expand Down Expand Up @@ -39,10 +40,11 @@ class DockerContainerState(spec: DockerContainer) {

def isReady(): Future[Boolean] = _isReady.future

def isRunning()(implicit docker: DockerCommandExecutor, ec: ExecutionContext): Future[Boolean] =
def isRunning()(implicit docker: DockerCommandExecutor, ec: ExecutionContext, timeout: Duration): Future[Boolean] = {
getRunningContainer().map(_.isDefined)
}

def init()(implicit docker: DockerCommandExecutor, ec: ExecutionContext): Future[this.type] = {
def init()(implicit docker: DockerCommandExecutor, ec: ExecutionContext, timeout: Duration): Future[this.type] = {
for {
s <- _id.init(docker.createContainer(spec))
_ <- docker.startContainer(s)
Expand All @@ -56,7 +58,8 @@ class DockerContainerState(spec: DockerContainer) {
}

private def runReadyCheck()(implicit docker: DockerCommandExecutor,
ec: ExecutionContext): Future[Boolean] =
ec: ExecutionContext, timeout: Duration): Future[Boolean] = {

_isReady.init(
(for {
r <- isRunning() if r
Expand All @@ -72,28 +75,29 @@ class DockerContainerState(spec: DockerContainer) {
Future.successful(false)
}
)
}

protected def getRunningContainer()(
implicit docker: DockerCommandExecutor,
ec: ExecutionContext): Future[Option[InspectContainerResult]] =
ec: ExecutionContext, timeout: Duration): Future[Option[InspectContainerResult]] =
id.flatMap(docker.inspectContainer)

def getName()(implicit docker: DockerCommandExecutor, ec: ExecutionContext): Future[String] =
def getName()(implicit docker: DockerCommandExecutor, ec: ExecutionContext, timeout: Duration): Future[String] =
getRunningContainer().flatMap {
case Some(res) => Future.successful(res.name)
case None => Future.failed(new RuntimeException(s"Container ${spec.image} is not running"))
}

def getIpAddresses()(implicit docker: DockerCommandExecutor,
ec: ExecutionContext): Future[Seq[String]] = getRunningContainer().flatMap {
ec: ExecutionContext, timeout: Duration): Future[Seq[String]] = getRunningContainer().flatMap {
case Some(res) => Future.successful(res.ipAddresses)
case None => Future.failed(new RuntimeException(s"Container ${spec.image} is not running"))
}

private val _ports = SinglePromise[Map[Int, Int]]

def getPorts()(implicit docker: DockerCommandExecutor,
ec: ExecutionContext): Future[Map[Int, Int]] = {
ec: ExecutionContext, timeout: Duration): Future[Map[Int, Int]] = {
def portsFuture: Future[Map[Int, Int]] = getRunningContainer().flatMap {
case None => Future.failed(new RuntimeException(s"Container ${spec.image} is not running"))
case Some(c) =>
Expand All @@ -108,6 +112,6 @@ class DockerContainerState(spec: DockerContainer) {

def remove(force: Boolean = true, removeVolumes: Boolean = true)(
implicit docker: DockerCommandExecutor,
ec: ExecutionContext): Future[Unit] =
ec: ExecutionContext, timeout: Duration): Future[Unit] =
id.flatMap(x => docker.remove(x, force, removeVolumes))
}
20 changes: 10 additions & 10 deletions core/src/main/scala/com/whisk/docker/DockerKit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,28 +41,28 @@ trait DockerKit {
}

def startAllOrFail(): Unit = {
Await.result(containerManager.pullImages(), PullImagesTimeout)
val allRunning: Boolean = try {
Await.result(containerManager.pullImages(PullImagesTimeout), PullImagesTimeout)
val allRunning: Boolean = {
val future: Future[Boolean] =
containerManager.initReadyAll(StartContainersTimeout).map(_.map(_._2).forall(identity))
containerManager.initReadyAll(StartContainersTimeout).map(_.map(_._2).forall(identity)).recover {
case e: Exception =>
log.error("Exception during container initialization", e)
false
}
sys.addShutdownHook(
Await.ready(containerManager.stopRmAll(), StopContainersTimeout)
Await.ready(containerManager.stopRmAll(StopContainersTimeout), StopContainersTimeout)
)
Await.result(future, StartContainersTimeout)
} catch {
case e: Exception =>
log.error("Exception during container initialization", e)
false
}
if (!allRunning) {
Await.ready(containerManager.stopRmAll(), StopContainersTimeout)
Await.ready(containerManager.stopRmAll(StopContainersTimeout), StopContainersTimeout)
throw new RuntimeException("Cannot run all required containers")
}
}

def stopAllQuietly(): Unit = {
try {
Await.ready(containerManager.stopRmAll(), StopContainersTimeout)
Await.ready(containerManager.stopRmAll(StopContainersTimeout), StopContainersTimeout)
} catch {
case e: Throwable =>
log.error(e.getMessage, e)
Expand Down
20 changes: 10 additions & 10 deletions core/src/main/scala/com/whisk/docker/DockerReadyChecker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package com.whisk.docker
import java.net.{HttpURLConnection, URL}
import java.util.{Timer, TimerTask}

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.concurrent.{ExecutionContext, Future, Promise, TimeoutException}

trait DockerReadyChecker {

def apply(container: DockerContainerState)(implicit docker: DockerCommandExecutor,
ec: ExecutionContext): Future[Boolean]
ec: ExecutionContext, timeout: Duration): Future[Boolean]

@deprecated("this method will be removed. Use DockerReadyChecker.And(a, b)", "0.9.6")
def and(other: DockerReadyChecker): DockerReadyChecker = {
Expand Down Expand Up @@ -81,7 +81,7 @@ object DockerReadyChecker {

case class And(r1: DockerReadyChecker, r2: DockerReadyChecker) extends DockerReadyChecker {
override def apply(container: DockerContainerState)(implicit docker: DockerCommandExecutor,
ec: ExecutionContext): Future[Boolean] = {
ec: ExecutionContext, timeout: Duration): Future[Boolean] = {
val aF = r1(container)
val bF = r2(container)
for {
Expand All @@ -93,7 +93,7 @@ object DockerReadyChecker {

case class Or(r1: DockerReadyChecker, r2: DockerReadyChecker) extends DockerReadyChecker {
override def apply(container: DockerContainerState)(implicit docker: DockerCommandExecutor,
ec: ExecutionContext): Future[Boolean] = {
ec: ExecutionContext, timeout: Duration): Future[Boolean] = {
val aF = r1(container)
val bF = r2(container)
val p = Promise[Boolean]()
Expand All @@ -111,7 +111,7 @@ object DockerReadyChecker {

object Always extends DockerReadyChecker {
override def apply(container: DockerContainerState)(implicit docker: DockerCommandExecutor,
ec: ExecutionContext): Future[Boolean] =
ec: ExecutionContext, timeout: Duration): Future[Boolean] =
Future.successful(true)
}

Expand All @@ -121,7 +121,7 @@ object DockerReadyChecker {
code: Int = 200)
extends DockerReadyChecker {
override def apply(container: DockerContainerState)(implicit docker: DockerCommandExecutor,
ec: ExecutionContext): Future[Boolean] = {
ec: ExecutionContext, timeout: Duration): Future[Boolean] = {
container.getPorts().map(_(port)).flatMap { p =>
val url = new URL("http", host.getOrElse(docker.host), p, path)
Future {
Expand All @@ -139,7 +139,7 @@ object DockerReadyChecker {

case class LogLineContains(str: String) extends DockerReadyChecker {
override def apply(container: DockerContainerState)(implicit docker: DockerCommandExecutor,
ec: ExecutionContext): Future[Boolean] = {
ec: ExecutionContext, timeout: Duration): Future[Boolean] = {
for {
id <- container.id
_ <- docker.withLogStreamLinesRequirement(id, withErr = true)(_.contains(str))
Expand All @@ -153,7 +153,7 @@ object DockerReadyChecker {
extends DockerReadyChecker {

override def apply(container: DockerContainerState)(implicit docker: DockerCommandExecutor,
ec: ExecutionContext): Future[Boolean] = {
ec: ExecutionContext, timeout: Duration): Future[Boolean] = {
RetryUtils.runWithin(underlying(container), duration).recover {
case _: TimeoutException =>
false
Expand All @@ -167,14 +167,14 @@ object DockerReadyChecker {
extends DockerReadyChecker {

override def apply(container: DockerContainerState)(implicit docker: DockerCommandExecutor,
ec: ExecutionContext): Future[Boolean] = {
ec: ExecutionContext, timeout: Duration): Future[Boolean] = {
RetryUtils.looped(underlying(container).filter(identity), attempts, delay)
}
}

case class F(f: DockerContainerState => Future[Boolean]) extends DockerReadyChecker {
override def apply(container: DockerContainerState)(implicit docker: DockerCommandExecutor,
ec: ExecutionContext): Future[Boolean] =
ec: ExecutionContext, timeout: Duration): Future[Boolean] =
f(container)
}

Expand Down
42 changes: 42 additions & 0 deletions core/src/main/scala/com/whisk/docker/package.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
package com.whisk

import java.util.TimerTask
import java.util.concurrent.{Callable, CancellationException, FutureTask}

import scala.concurrent.{ExecutionContext, Future, Promise, TimeoutException}
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.util.{Failure, Try}

/**
* General utility functions
*/
Expand All @@ -10,4 +17,39 @@ package object docker {
case Some(x) => f(content, x)
}
}

private[docker] object PerishableFuture {

def apply[T](body: => T)(implicit ec: ExecutionContext, timeout: Duration): Future[T] = timeout match {
case finiteTimeout: FiniteDuration =>
val promise = Promise[T]

val futureTask = new FutureTask[T](new Callable[T] {
override def call(): T = body
}) {
override def done(): Unit = promise.tryComplete {
Try(get()).recoverWith {
case _: CancellationException => Failure(new TimeoutException())
}
}
}

val reaperTask = new TimerTask {
override def run(): Unit = {
futureTask.cancel(true)
promise.tryFailure(new TimeoutException())
}
}

timer.schedule(reaperTask, finiteTimeout.toMillis)
ec.execute(futureTask)

promise.future

case _ => Future.apply(body)
}

private val timer = new java.util.Timer(true)

}
}
Loading