Skip to content

Commit

Permalink
Exclude Marathon tasks in health-check grace period (linkerd#2098) (l…
Browse files Browse the repository at this point in the history
…inkerd#2099)

This fixes linkerd#2098. The Marathon namer has a `useHealthCheck` option that allows for excluding
tasks that Marathon claims to be unhealthy. However, if a Marathon app
has a health-check grace period defined, a freshly started task will show up as
running, but without any health-related status.
In this case the Marathon API returns a `TASK_RUNNING` status, but there aren't
any `healthCheckResults` present in the response. Obviously, Linkerd should not
route any request to such task and wait until it shows up as healthy.

My change adds a check if a Marathon app has any health checks defined
(a non-empty `healthChecks` attribute of the `app` record), and, if so,
expect the same number of `healthCheckResults` to be positive before
routing any request to such task.

I have added unit tests that include the `healthChecks` attribute in the
Marathon API response and verify that the Marathon namer behavior is as expected.

Signed-off-by: Tomasz Rogozik <trogozik@applause.com>
  • Loading branch information
rogoman authored and Dennis Adjei-Baah committed Aug 16, 2018
1 parent 3d08313 commit 2bed7fd
Show file tree
Hide file tree
Showing 2 changed files with 321 additions and 25 deletions.
65 changes: 44 additions & 21 deletions marathon/src/main/scala/io/buoyant/marathon/v2/Api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ object Api {
state: Option[String]
)

final case class HealthCheck()

final case class TaskIpAddress(
ipAddress: Option[String]
)
Expand All @@ -64,6 +66,7 @@ object Api {
final case class App(
id: Option[String],
ipAddress: Option[AppIpAddress],
healthChecks: Option[Seq[HealthCheck]],
tasks: Option[Seq[Task]]
)

Expand All @@ -75,6 +78,11 @@ object Api {
app: Option[App] = None
)

final case class TaskWithHealthCheckInfo(
task: Task,
numberOfHealthChecks: Int
)

object discoveryPort {
def unapply(addr: Option[AppIpAddress]): Option[Int] = {
addr.flatMap(_.discovery).flatMap(_.ports).flatMap(_.headOption).flatMap(_.number)
Expand All @@ -92,9 +100,9 @@ object Api {
}

object healthyHostPort {
def unapply(task: Task): Option[(String, Int)] = {
if (isHealthy(task)) {
hostPort.unapply(task)
def unapply(extendedTask: TaskWithHealthCheckInfo): Option[(String, Int)] = {
if (isHealthy(extendedTask)) {
hostPort.unapply(extendedTask.task)
} else {
None
}
Expand All @@ -108,15 +116,23 @@ object Api {
}

object healthyIpAddress {
def unapply(task: Task): Option[String] = {
if (isHealthy(task)) {
ipAddress.unapply(task)
def unapply(extendedTask: TaskWithHealthCheckInfo): Option[String] = {
if (isHealthy(extendedTask)) {
ipAddress.unapply(extendedTask.task)
} else {
None
}
}
}

object healthCheckCount {
def unapply(healthChecks: Option[Seq[HealthCheck]]): Option[Int] =
healthChecks match {
case Some(healthCheckSeq) => Some(healthCheckSeq.size)
case None => Some(0)
}
}

def apply(client: Client, uriPrefix: String, useHealthCheck: Boolean): Api =
new AppIdApi(client, s"$uriPrefix/$versionString", useHealthCheck)

Expand Down Expand Up @@ -146,33 +162,40 @@ object Api {
private[this] def toAppIds(appsRsp: AppsRsp): Api.AppIds = {
appsRsp.apps match {
case Some(apps) =>
apps.collect { case App(Some(id), _, _) => Path.read(id.toLowerCase) }.toSet
apps.collect { case App(Some(id), _, _, _) => Path.read(id.toLowerCase) }.toSet
case None => Set.empty
}
}

private[v2] def toAddresses(appRsp: AppRsp, useHealthCheck: Boolean): Set[Address] =
appRsp.app match {
case Some(App(_, discoveryPort(port), Some(tasks))) =>
tasks.collect {
case ipAddress(host) if !useHealthCheck => Address(host, port)
case healthyIpAddress(host) => Address(host, port)
}.toSet
case Some(App(_, _, Some(tasks))) =>
tasks.collect {
case hostPort(host, port) if !useHealthCheck => Address(host, port)
case healthyHostPort(host, port) => Address(host, port)
}.toSet
case Some(App(_, discoveryPort(port), healthCheckCount(count), Some(tasks))) =>
if (!useHealthCheck)
tasks.collect { case ipAddress(host) => Address(host, port) }.toSet
else
tasks.map(TaskWithHealthCheckInfo(_, count)).collect {
case healthyIpAddress(host) => Address(host, port)
}.toSet
case Some(App(_, _, healthCheckCount(count), Some(tasks))) =>
if (!useHealthCheck)
tasks.collect { case hostPort(host, port) => Address(host, port) }.toSet
else
tasks.map(TaskWithHealthCheckInfo(_, count)).collect {
case healthyHostPort(host, port) => Address(host, port)
}.toSet
case _ => Set.empty
}

private[this] def isHealthy(task: Task): Boolean =
task match {
case Task(_, _, _, _, Some(healthCheckResults), Some(state)) =>
private[this] def isHealthy(extendedTask: TaskWithHealthCheckInfo): Boolean = {
extendedTask.task match {
case Task(_, _, _, _, Some(healthCheckResults), Some(state)) if extendedTask.numberOfHealthChecks > 0 =>
state == taskRunning &&
healthCheckResults.forall(_ == HealthCheckResult(Some(true)))
healthCheckResults.count(_ == HealthCheckResult(Some(true))) >= extendedTask.numberOfHealthChecks
case Task(_, _, _, _, _, Some(state)) =>
state == taskRunning && extendedTask.numberOfHealthChecks == 0
case _ => false
}
}
}

private class AppIdApi(client: Api.Client, apiPrefix: String, useHealthCheck: Boolean)
Expand Down
Loading

0 comments on commit 2bed7fd

Please sign in to comment.