Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
alfredkrohmer committed Mar 6, 2020
1 parent 1af529c commit c7459df
Show file tree
Hide file tree
Showing 9 changed files with 204 additions and 489 deletions.
6 changes: 3 additions & 3 deletions docs/guide/ingress/pod-conditions.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ In order to avoid this situation, the AWS ALB ingress controller can set the bef

## Pod configuration

Add a readiness gate with `conditionType: target-health.alb.ingress.kubernetes.io/<ingress name>_<service name>_<service port>` to your pod.
Add a readiness gate with `conditionType: target-health.alb.ingress.k8s.aws/<ingress name>_<service name>_<service port>` to your pod.

Example:

Expand Down Expand Up @@ -63,7 +63,7 @@ spec:
app: nginx
spec:
readinessGates:
- conditionType: target-health.alb.ingress.kubernetes.io/nginx-ingress_nginx-service_80
- conditionType: target-health.alb.ingress.k8s.aws/nginx-ingress_nginx-service_80
containers:
- name: nginx
image: nginx
Expand Down Expand Up @@ -97,5 +97,5 @@ $ kubectl get pod nginx-test-5744b9ff84-7ftl9 -o yaml | grep -B5 'type: target-h
message: Target registration is in progress.
reason: Elb.RegistrationInProgress
status: "False"
type: target-health.alb.ingress.kubernetes.io/nginx-test_nginx-test_80
type: target-health.alb.ingress.k8s.aws/nginx-test_nginx-test_80
```
110 changes: 55 additions & 55 deletions internal/alb/tg/targethealth.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@ package tg
import (
"context"
"fmt"
"sync"
"time"

"github.com/aws/aws-sdk-go/service/elbv2"
"github.com/kubernetes-sigs/aws-alb-ingress-controller/internal/albctx"
"github.com/kubernetes-sigs/aws-alb-ingress-controller/internal/aws"
"github.com/kubernetes-sigs/aws-alb-ingress-controller/internal/ingress/annotations/healthcheck"
"github.com/kubernetes-sigs/aws-alb-ingress-controller/internal/ingress/annotations/parser"
"github.com/kubernetes-sigs/aws-alb-ingress-controller/internal/ingress/annotations/targetgroup"
"github.com/kubernetes-sigs/aws-alb-ingress-controller/internal/ingress/backend"
"github.com/kubernetes-sigs/aws-alb-ingress-controller/internal/ingress/controller/store"
"github.com/kubernetes-sigs/aws-alb-ingress-controller/internal/k8s"
Expand All @@ -30,14 +27,25 @@ type targetGroupWatch struct {
// backend is the ingress backend for the target group
backend *extensions.IngressBackend

// podsToReconcile is the list of pods whose conditions have not been reconciled yet
targetsToReconcile []*elbv2.TargetDescription

cancel context.CancelFunc
mux sync.Mutex
// channels used to communicate the reconiliation interval and list of targets to reconcile into the go routine
interval chan int64
targetsToReconcile chan []*elbv2.TargetDescription
cancel context.CancelFunc
}

type targetGroupWatches map[string]*targetGroupWatch

func newTargetGroupWatch(ctx context.Context, ingress *extensions.Ingress, backend *extensions.IngressBackend) (*targetGroupWatch, context.Context) {
ctx, cancel := context.WithCancel(ctx)
return &targetGroupWatch{
ingress: ingress,
backend: backend,
interval: make(chan int64),
targetsToReconcile: make(chan []*elbv2.TargetDescription),
cancel: cancel,
}, ctx
}

// TargetHealthController provides functionality to reconcile pod condition status from target health of targets in a target group
type TargetHealthController interface {
// Reconcile ensures the target group targets in AWS matches the targets configured in the ingress backend.
Expand All @@ -63,7 +71,6 @@ type targetHealthController struct {
endpointResolver backend.EndpointResolver
client client.Client
tgWatches targetGroupWatches
tgWatchesMux sync.Mutex
}

// SyncTargetsForReconciliation starts a go routine for reconciling pod condition statuses for the given targets in the background until they are healthy in the target group
Expand All @@ -78,48 +85,33 @@ func (c *targetHealthController) SyncTargetsForReconciliation(ctx context.Contex
// create, update or remove targetGroupWatch for this target group;
// a targetGroupWatch exists as long as there are targets in the target group whose pod condition statuses need to be reconciled;
// while the targetGroupWatch exists, a go routine regularly monitors the target health of the targets in the target group and updates the pod condition status for the corresponding pods
c.tgWatchesMux.Lock()
defer c.tgWatchesMux.Unlock()

tgWatch, ok := c.tgWatches[t.TgArn]
if ok {
// targetGroupWatch for this target group already exists
if len(targetsToReconcile) == 0 {
tgWatch.cancel()
delete(c.tgWatches, t.TgArn)
} else {
tgWatch.mux.Lock()
tgWatch.ingress = t.Ingress
tgWatch.targetsToReconcile = targetsToReconcile
tgWatch.mux.Unlock()
return nil
}
} else {
if len(targetsToReconcile) == 0 {
return nil
}

// targetGroupWatch for this target group doesn't exist yet -> create it
ctx, cancel := context.WithCancel(ctx)
tgWatch = &targetGroupWatch{
ingress: t.Ingress,
backend: t.Backend,
targetsToReconcile: targetsToReconcile,
cancel: cancel,
}
tgWatch, ctx = newTargetGroupWatch(ctx, t.Ingress, t.Backend)
c.tgWatches[t.TgArn] = tgWatch

// start watching target health in target group and updating pod condition status
go c.reconcilePodConditionsLoop(ctx, t.TgArn, conditionType, tgWatch)
}
tgWatch.interval <- c.ingressTargetHealthReconciliationInterval(t.Backend.ServiceName, t.Ingress)
tgWatch.targetsToReconcile <- targetsToReconcile

return nil
}

// StopReconcilingPodConditionStatus stops a running go routine (if there is any) which was started to reconcile pod condition statuses in the background for a specific target group
func (c *targetHealthController) StopReconcilingPodConditionStatus(tgArn string) {
c.tgWatchesMux.Lock()
defer c.tgWatchesMux.Unlock()

tgWatch, ok := c.tgWatches[tgArn]
if ok {
tgWatch.cancel()
Expand Down Expand Up @@ -156,36 +148,47 @@ func (c *targetHealthController) reconcilePodConditionsLoop(ctx context.Context,
logger := albctx.GetLogger(ctx)
logger.Infof("Starting reconciliation of pod condition status for target group: %v", tgArn)

initial := true
for {
tgWatch.mux.Lock()
interval := c.ingressTargetHealthReconciliationInterval(initial, tgWatch.backend.ServiceName, tgWatch.ingress)
ingress := tgWatch.ingress
backend := tgWatch.backend
targetsToReconcile := append([]*elbv2.TargetDescription{}, tgWatch.targetsToReconcile...) // make copy
tgWatch.mux.Unlock()
var interval int64
var targetsToReconcile []*elbv2.TargetDescription

initial = false
for {
var reconcile <-chan time.Time
if len(targetsToReconcile) > 0 && interval > 0 { // only reconcile if we have at least one target
reconcile = time.After(time.Duration(interval) * time.Second)
}

select {
case <-time.After(time.Duration(interval) * time.Second):
if err := c.reconcilePodConditions(ctx, tgArn, conditionType, ingress, backend, targetsToReconcile); err != nil {
case interval = <-tgWatch.interval: // update interval
case targetsToReconcile = <-tgWatch.targetsToReconcile: // update targets

case <-reconcile:
notReadyTargets, err := c.reconcilePodConditions(ctx, tgArn, conditionType, tgWatch.ingress, tgWatch.backend, targetsToReconcile)
if err == nil {
targetsToReconcile = notReadyTargets
} else {
logger.Errorf("Failed to reconcile pod condition status: %v", err)
albctx.GetEventf(ctx)(api.EventTypeWarning, "ERROR", "Error reconciling pod condition status via target group %s: %s", tgArn, err.Error())
}

case <-ctx.Done():
logger.Infof("Stopping reconciliation of pod condition status for target group: %v", tgArn)
return
}
}

}

// For each given pod, checks for the health status of the corresponding target in the target group and adds/updates a pod condition that can be used for pod readiness gates.
func (c *targetHealthController) reconcilePodConditions(ctx context.Context, tgArn string, conditionType api.PodConditionType, ingress *extensions.Ingress, backend *extensions.IngressBackend, targetsToReconcile []*elbv2.TargetDescription) error {
in := &elbv2.DescribeTargetHealthInput{TargetGroupArn: aws.String(tgArn)}
func (c *targetHealthController) reconcilePodConditions(ctx context.Context, tgArn string, conditionType api.PodConditionType, ingress *extensions.Ingress, backend *extensions.IngressBackend, targetsToReconcile []*elbv2.TargetDescription) ([]*elbv2.TargetDescription, error) {
notReadyTargets := []*elbv2.TargetDescription{}

in := &elbv2.DescribeTargetHealthInput{
TargetGroupArn: aws.String(tgArn),
Targets: targetsToReconcile,
}
resp, err := c.cloud.DescribeTargetHealthWithContext(ctx, in)
if err != nil {
return err
return notReadyTargets, err
}
targetsHealth := map[string]*elbv2.TargetHealth{}
for _, desc := range resp.TargetHealthDescriptions {
Expand All @@ -194,18 +197,22 @@ func (c *targetHealthController) reconcilePodConditions(ctx context.Context, tgA

pods, err := c.endpointResolver.ReverseResolve(ingress, backend, targetsToReconcile)
if err != nil {
return err
return notReadyTargets, err
}

for _, pod := range pods {
for i, target := range targetsToReconcile {
pod := pods[i]
targetHealth, ok := targetsHealth[pod.Status.PodIP]
if ok && podHasReadinessGate(pod, conditionType) {
if aws.StringValue(targetHealth.State) != elbv2.TargetHealthStateEnumHealthy {
notReadyTargets = append(notReadyTargets, target)
}
if err := c.reconcilePodCondition(ctx, conditionType, pod, targetHealth, true); err != nil {
return err
return notReadyTargets, err
}
}
}
return nil
return notReadyTargets, nil
}

// Creates or updates the condition status for the given pod with the given target health.
Expand Down Expand Up @@ -278,30 +285,23 @@ func (c *targetHealthController) filterTargetsNeedingReconciliation(conditionTyp
return targetsToReconcile, nil
}

func (c *targetHealthController) ingressTargetHealthReconciliationInterval(initial bool, serviceName string, ingress *extensions.Ingress) int64 {
func (c *targetHealthController) ingressTargetHealthReconciliationInterval(serviceName string, ingress *extensions.Ingress) int64 {
ingressAnnos, err := c.store.GetIngressAnnotations(k8s.MetaNamespaceKey(ingress))
if err == nil {
serviceKey := types.NamespacedName{Namespace: ingress.Namespace, Name: serviceName}
serviceAnnos, err := c.store.GetServiceAnnotations(serviceKey.String(), ingressAnnos)
if err == nil {
if initial {
return *serviceAnnos.HealthCheck.IntervalSeconds * *serviceAnnos.TargetGroup.HealthyThresholdCount
}
return *serviceAnnos.HealthCheck.IntervalSeconds
}
}

if initial {
return healthcheck.DefaultIntervalSeconds * targetgroup.DefaultHealthyThresholdCount
}
return healthcheck.DefaultIntervalSeconds
}

// PodConditionTypeForIngressBackend returns the PodConditionType that is associated with the given ingress and backend
func podConditionTypeForIngressBackend(ingress *extensions.Ingress, backend *extensions.IngressBackend) api.PodConditionType {
return api.PodConditionType(fmt.Sprintf(
"target-health.%s/%s_%s_%s",
parser.AnnotationsPrefix,
"target-health.alb.ingress.k8s.aws/%s_%s_%s",
ingress.Name,
backend.ServiceName,
backend.ServicePort.String(),
Expand Down
Loading

0 comments on commit c7459df

Please sign in to comment.