Skip to content
This repository was archived by the owner on May 25, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ case class Annotations(
cpu: Option[Double],
endpoints: Map[String, Endpoint],
managementEndpointName: Option[String],
remotingEndpointName: Option[String],
secrets: Seq[Secret],
annotations: Seq[Annotation] = Seq.empty,
privileged: Boolean,
Expand All @@ -45,6 +46,18 @@ case class Annotations(
modules: Set[String],
akkaClusterBootstrapSystemName: Option[String]) {

def headlessEndpoints: Map[String, Endpoint] =
endpoints filterKeys { (k: String) =>
(k.some === managementEndpointName) ||
(k.some === remotingEndpointName)
}

def publicEndpoints: Map[String, Endpoint] =
endpoints filterKeys { (k: String) =>
!((k.some === managementEndpointName) ||
(k.some === remotingEndpointName))
}

def applicationValidation(application: Option[String]): ValidationNel[String, Option[Seq[String]]] =
application match {
case None =>
Expand Down Expand Up @@ -118,6 +131,7 @@ object Annotations extends LazyLogging {
cpu = args.cpu.orElse(cpu(labels)),
endpoints = endpoints(selectArrayWithIndex(labels, ns("endpoints")), applicationVersion),
managementEndpointName = managementEndpointName(labels),
remotingEndpointName = remotingEndpointName(labels),
secrets = secrets(selectArray(labels, ns("secrets"))),
annotations = annotations(selectArray(labels, ns("annotations"))),
privileged = privileged(labels),
Expand Down Expand Up @@ -217,6 +231,10 @@ object Annotations extends LazyLogging {
labels
.get(ns("management-endpoint"))

private[annotations] def remotingEndpointName(labels: Map[String, String]): Option[String] =
labels
.get(ns("remoting-endpoint"))

private[annotations] def endpoints(endpoints: Seq[(Int, Map[String, String])], version: Option[String]): Map[String, Endpoint] =
endpoints.flatMap(v => endpoint(v._2, v._1, version)).toMap

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,20 @@ case object CanaryDeploymentType extends DeploymentType
case object BlueGreenDeploymentType extends DeploymentType
case object RollingDeploymentType extends DeploymentType

/**
* Represents the discovery method during Akka Boostrap on Kubernetes.
*/
sealed trait DiscoveryMethod
object DiscoveryMethod {
case object KubernetesApi extends DiscoveryMethod {
override def toString: String = "kubernetes-api"
}
case object AkkaDns extends DiscoveryMethod {
override def toString = "akka-dns"
}
def all = Seq(AkkaDns, KubernetesApi)
}

/**
* Represents the input argument for `generate-deployment` command.
*/
Expand All @@ -68,6 +82,7 @@ case class GenerateDeploymentArgs(
akkaClusterJoinExisting: Boolean = false,
akkaClusterSkipValidation: Boolean = false,
deploymentType: DeploymentType = CanaryDeploymentType,
discoveryMethod: DiscoveryMethod = DiscoveryMethod.AkkaDns,
dockerImages: Seq[String] = Seq.empty,
name: Option[String] = None,
version: Option[String] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ object InputArgs {
throw new IllegalArgumentException(s"Invalid deployment type $v. Available: ${DeploymentType.All.mkString(", ")}")
}

implicit val discoveryMethodRead: scopt.Read[DiscoveryMethod] =
scopt.Read.reads {
case v if v.toLowerCase == DiscoveryMethod.AkkaDns.toString => DiscoveryMethod.AkkaDns
case v if v.toLowerCase == DiscoveryMethod.KubernetesApi.toString => DiscoveryMethod.KubernetesApi
case v =>
throw new IllegalArgumentException(s"Invalid discovery method $v. Available: ${DiscoveryMethod.all.mkString(", ")}")
}

implicit val logLevelsRead: scopt.Read[LogLevel] =
scopt.Read.reads {
case v if v.toLowerCase == "error" => LogLevel.ERROR
Expand Down Expand Up @@ -122,6 +130,11 @@ object InputArgs {
.optional()
.action(GenerateDeploymentArgs.set((t, args) => args.copy(deploymentType = t))),

opt[DiscoveryMethod]("discovery-method")
.text(s"Sets the discovery method. Default: ${DiscoveryMethod.AkkaDns}; Available: ${DiscoveryMethod.all.mkString(", ")}")
.optional()
.action(GenerateDeploymentArgs.set((t, args) => args.copy(discoveryMethod = t))),

opt[String]("env") /* note: this argument will apply for other targets */
.text("Sets an environment variable. Format: NAME=value")
.minOccurs(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ object Deployment {
noOfReplicas: Int,
externalServices: Map[String, Seq[String]],
deploymentType: DeploymentType,
discoveryMethod: DiscoveryMethod,
jsonTransform: JsonTransform,
akkaClusterJoinExisting: Boolean): ValidationNel[String, Deployment] =

Expand All @@ -57,9 +58,17 @@ object Deployment {
val appName = serviceName(rawAppName)
val appNameVersion = serviceName(s"$appName$VersionSeparator$version")

val serviceResourceName =
deploymentType match {
case CanaryDeploymentType => appName
case BlueGreenDeploymentType => appNameVersion
case RollingDeploymentType => appName
}

val labels = Map(
"appName" -> appName,
"appNameVersion" -> appNameVersion) ++ annotations.akkaClusterBootstrapSystemName.fold(Map.empty[String, String])(system => Map("actorSystemName" -> system))
"app" -> appName,
"appNameVersion" -> appNameVersion) ++ annotations.akkaClusterBootstrapSystemName.fold(Map(
serviceNameLabel -> serviceResourceName))(system => Map(serviceNameLabel -> system))

val podTemplate =
PodTemplate.generate(
Expand All @@ -72,6 +81,7 @@ object Deployment {
RestartPolicy.Always, // The only valid RestartPolicy for Deployment
externalServices,
deploymentType,
discoveryMethod,
akkaClusterJoinExisting,
applicationArgs,
appName,
Expand All @@ -87,7 +97,7 @@ object Deployment {
(appNameVersion, Json("appNameVersion" -> appNameVersion.asJson))

case RollingDeploymentType =>
(appName, Json("appName" -> appName.asJson))
(appName, Json("app" -> appName.asJson))
}

Deployment(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ object Job {
noOfReplicas: Int,
externalServices: Map[String, Seq[String]],
deploymentType: DeploymentType,
discoveryMethod: DiscoveryMethod,
jsonTransform: JsonTransform,
akkaClusterJoinExisting: Boolean): ValidationNel[String, Job] =

Expand All @@ -56,10 +57,17 @@ object Job {
|@| restartPolicyValidation(restartPolicy)) { (applicationArgs, rawAppName, version, restartPolicy) =>
val appName = serviceName(rawAppName)
val appNameVersion = serviceName(s"$appName$VersionSeparator$version")
val serviceResourceName =
deploymentType match {
case CanaryDeploymentType => appName
case BlueGreenDeploymentType => appNameVersion
case RollingDeploymentType => appName
}

val labels = Map(
"appName" -> appName,
"appNameVersion" -> appNameVersion) ++ annotations.akkaClusterBootstrapSystemName.fold(Map.empty[String, String])(system => Map("actorSystemName" -> system))
"app" -> appName,
"appNameVersion" -> appNameVersion) ++ annotations.akkaClusterBootstrapSystemName.fold(Map(
serviceNameLabel -> serviceResourceName))(system => Map(serviceNameLabel -> system))

val podTemplate =
PodTemplate.generate(
Expand All @@ -72,6 +80,7 @@ object Job {
if (restartPolicy == RestartPolicy.Default) RestartPolicy.OnFailure else restartPolicy,
externalServices,
deploymentType,
discoveryMethod,
akkaClusterJoinExisting,
applicationArgs,
appName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,16 @@ object PodTemplate {
/**
* Generates pod environment variables specific for RP applications.
*/
def envs(annotations: Annotations, serviceResourceName: String, noOfReplicas: Int, externalServices: Map[String, Seq[String]], akkaClusterJoinExisting: Boolean): Map[String, EnvironmentVariable] =
def envs(annotations: Annotations, serviceResourceName: String, noOfReplicas: Int, externalServices: Map[String, Seq[String]], akkaClusterJoinExisting: Boolean, discoveryMethod: DiscoveryMethod): Map[String, EnvironmentVariable] =
mergeEnvs(
PodEnvs,
appNameEnvs(annotations.appName),
annotations.version.fold(Map.empty[String, EnvironmentVariable])(versionEnvs),
appTypeEnvs(annotations.appType, annotations.modules),
configEnvs(annotations.configResource),
akkaClusterEnvs(
annotations.appName,
discoveryMethod,
annotations.modules,
annotations.namespace,
serviceResourceName,
Expand All @@ -78,6 +80,8 @@ object PodTemplate {
}.toMap

private[kubernetes] def akkaClusterEnvs(
appName: Option[String],
discoveryMethod: DiscoveryMethod,
modules: Set[String],
namespace: Option[String],
serviceResourceName: String,
Expand All @@ -90,13 +94,29 @@ object PodTemplate {
else
Map(
"RP_JAVA_OPTS" -> LiteralEnvironmentVariable(
Seq(
s"-Dakka.management.cluster.bootstrap.contact-point-discovery.discovery-method=kubernetes-api",
s"-Dakka.management.cluster.bootstrap.contact-point-discovery.port-name=$managementEndpointName",
s"-Dakka.management.cluster.bootstrap.contact-point-discovery.effective-name=$serviceResourceName",
s"-Dakka.management.cluster.bootstrap.contact-point-discovery.required-contact-point-nr=$noOfReplicas",
akkaClusterBootstrapSystemName.fold("-Dakka.discovery.kubernetes-api.pod-label-selector=appName=%s")(systemName => s"-Dakka.discovery.kubernetes-api.pod-label-selector=actorSystemName=$systemName"),
s"${if (akkaClusterJoinExisting) "-Dakka.management.cluster.bootstrap.form-new-cluster=false" else ""}")
((discoveryMethod match {
case DiscoveryMethod.KubernetesApi =>
List(
s"-Dakka.management.cluster.bootstrap.contact-point-discovery.discovery-method=kubernetes-api",
s"-Dakka.management.cluster.bootstrap.contact-point-discovery.port-name=$managementEndpointName",
// https://github.com/akka/akka-management/blob/v0.20.0/cluster-bootstrap/src/main/resources/reference.conf
akkaClusterBootstrapSystemName match {
case Some(systemName) => s"-Dakka.management.cluster.bootstrap.contact-point-discovery.effective-name=$systemName"
case _ => s"-Dakka.management.cluster.bootstrap.contact-point-discovery.effective-name=$serviceResourceName"
},
"-Dakka.discovery.kubernetes-api.pod-label-selector=akka.lightbend.com/service-name=%s")
case DiscoveryMethod.AkkaDns =>
List(
s"-Dakka.management.cluster.bootstrap.contact-point-discovery.discovery-method=akka-dns",
s"-Dakka.management.cluster.bootstrap.contact-point-discovery.port-name=$managementEndpointName",
appName match {
case Some(name) => s"-Dakka.management.cluster.bootstrap.contact-point-discovery.service-name=$name-internal"
case _ => sys.error("appName was expected")
})
}) ++
List(
s"-Dakka.management.cluster.bootstrap.contact-point-discovery.required-contact-point-nr=$noOfReplicas",
s"${if (akkaClusterJoinExisting) "-Dakka.management.cluster.bootstrap.form-new-cluster=false" else ""}"))
.filter(_.nonEmpty)
.mkString(" ")),
"RP_DYN_JAVA_OPTS" -> LiteralEnvironmentVariable(
Expand Down Expand Up @@ -158,7 +178,7 @@ object PodTemplate {
* If the akkaClusterJoinExisting flag is provided, these labels are removed from the pod template so that
* it isn't used for bootstrap.
*/
private[kubernetes] val PodDiscoveryLabels = Set("appName", "actorSystemName")
private[kubernetes] val PodDiscoveryLabels = Set("app", "actorSystemName")

/**
* Represents possible values for imagePullPolicy field within the Kubernetes pod template.
Expand Down Expand Up @@ -274,6 +294,7 @@ object PodTemplate {
restartPolicy: RestartPolicy.Value,
externalServices: Map[String, Seq[String]],
deploymentType: DeploymentType,
discoveryMethod: DiscoveryMethod,
akkaClusterJoinExisting: Boolean,
applicationArgs: Option[Seq[String]],
appName: String,
Expand Down Expand Up @@ -368,7 +389,7 @@ object PodTemplate {
"imagePullPolicy" -> imagePullPolicy.asJson,
"env" -> RpEnvironmentVariables.mergeEnvs(
annotations.environmentVariables ++
RpEnvironmentVariables.envs(annotations, serviceResourceName, noOfReplicas, externalServices, akkaClusterJoinExisting)).asJson,
RpEnvironmentVariables.envs(annotations, serviceResourceName, noOfReplicas, externalServices, akkaClusterJoinExisting, discoveryMethod)).asJson,
"ports" -> annotations.endpoints.asJson,
"volumeMounts" -> secretNames
.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,43 +59,67 @@ object Service {
apiVersion: String,
clusterIp: Option[String],
deploymentType: DeploymentType,
discoveryMethod: DiscoveryMethod,
jsonTransform: JsonTransform,
loadBalancerIp: Option[String],
serviceType: Option[String]): ValidationNel[String, Option[Service]] =
serviceType: Option[String]): ValidationNel[String, List[Service]] =
(annotations.appNameValidation |@| annotations.versionValidation) { (rawAppName, version) =>
// FIXME there's a bit of code duplicate in Deployment
val appName = serviceName(rawAppName)
val internalAppname = appName + "-internal"
val appNameVersion = serviceName(s"$appName${PodTemplate.VersionSeparator}$version")

val selector =
deploymentType match {
case CanaryDeploymentType => Json("appName" -> appName.asJson)
case RollingDeploymentType => Json("appName" -> appName.asJson)
case CanaryDeploymentType => Json("app" -> appName.asJson)
case RollingDeploymentType => Json("app" -> appName.asJson)
case BlueGreenDeploymentType => Json("appNameVersion" -> appNameVersion.asJson)
}

if (annotations.endpoints.isEmpty)
None
else
Some(
Service(
appName,
Json(
"apiVersion" -> apiVersion.asJson,
"kind" -> "Service".asJson,
"metadata" -> Json(
"labels" -> Json(
"appName" -> appName.asJson),
"name" -> appName.asJson)
.deepmerge(
annotations.namespace.fold(jEmptyObject)(ns => Json("namespace" -> serviceName(ns).asJson))),
"spec" -> Json(
"ports" -> annotations.endpoints.asJson,
"selector" -> selector)
.deepmerge(clusterIp.fold(jEmptyObject)(cIp => Json("clusterIP" -> jString(cIp))))
.deepmerge(serviceType.fold(jEmptyObject)(svcType => Json("type" -> jString(svcType))))
.deepmerge(loadBalancerIp.fold(jEmptyObject)(lbIp => Json("loadBalancerIP" -> jString(lbIp))))),
jsonTransform))
def svc(endpoints: Map[String, Endpoint]) = Service(
appName,
Json(
"apiVersion" -> apiVersion.asJson,
"kind" -> "Service".asJson,
"metadata" -> Json(
"labels" -> Json(
"app" -> appName.asJson),
"name" -> appName.asJson)
.deepmerge(
annotations.namespace.fold(jEmptyObject)(ns => Json("namespace" -> serviceName(ns).asJson))),
"spec" -> Json(
"ports" -> endpoints.asJson,
"selector" -> selector)
.deepmerge(clusterIp.fold(jEmptyObject)(cIp => Json("clusterIP" -> jString(cIp))))
.deepmerge(serviceType.fold(jEmptyObject)(svcType => Json("type" -> jString(svcType))))
.deepmerge(loadBalancerIp.fold(jEmptyObject)(lbIp => Json("loadBalancerIP" -> jString(lbIp))))),
jsonTransform)

def headlessService(endpoints: Map[String, Endpoint]) = Service(
internalAppname,
Json(
"apiVersion" -> apiVersion.asJson,
"kind" -> "Service".asJson,
"metadata" -> Json(
"labels" -> Json(
"app" -> appName.asJson),
"annotations" -> Json(
"service.alpha.kubernetes.io/tolerate-unready-endpoints" -> jString("true")),
"name" -> internalAppname.asJson)
.deepmerge(
annotations.namespace.fold(jEmptyObject)(ns => Json("namespace" -> serviceName(ns).asJson))),
"spec" -> Json(
"ports" -> endpoints.asJson,
"selector" -> selector,
"clusterIP" -> jString("None"),
"publishNotReadyAddresses" -> jTrue)),
jsonTransform)

if (annotations.endpoints.isEmpty) List()
else if (discoveryMethod == DiscoveryMethod.AkkaDns) List(
headlessService(annotations.headlessEndpoints),
svc(annotations.publicEndpoints))
else List(svc(annotations.endpoints))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import slogging.LazyLogging
import Scalaz._

package object kubernetes extends LazyLogging {
private[reactivecli] val serviceNameLabel = "akka.lightbend.com/service-name"
private[reactivecli] val LivenessInitialDelaySeconds = 60
private[reactivecli] val StatusPeriodSeconds = 10

Expand Down Expand Up @@ -90,6 +91,7 @@ package object kubernetes extends LazyLogging {
kubernetesArgs.podControllerArgs.numberOfReplicas,
generateDeploymentArgs.externalServices,
generateDeploymentArgs.deploymentType,
generateDeploymentArgs.discoveryMethod,
kubernetesArgs.transformPodControllers.fold(JsonTransform.noop)(JsonTransform.jq),
generateDeploymentArgs.akkaClusterJoinExisting)

Expand All @@ -104,6 +106,7 @@ package object kubernetes extends LazyLogging {
kubernetesArgs.podControllerArgs.numberOfReplicas,
generateDeploymentArgs.externalServices,
generateDeploymentArgs.deploymentType,
generateDeploymentArgs.discoveryMethod,
kubernetesArgs.transformPodControllers.fold(JsonTransform.noop)(JsonTransform.jq),
generateDeploymentArgs.akkaClusterJoinExisting)
}
Expand All @@ -113,6 +116,7 @@ package object kubernetes extends LazyLogging {
serviceApiVersion,
kubernetesArgs.serviceArgs.clusterIp,
generateDeploymentArgs.deploymentType,
generateDeploymentArgs.discoveryMethod,
kubernetesArgs.transformServices.fold(JsonTransform.noop)(JsonTransform.jq),
kubernetesArgs.serviceArgs.loadBalancerIp,
kubernetesArgs.serviceArgs.serviceType)
Expand Down
Loading