Skip to content

Commit

Permalink
refactor: make sync easier to consume without CLI
Browse files Browse the repository at this point in the history
Refactor so that one could consume raw results and process the results
the way it might be needed vs. simple result logging.

Signed-off-by: Andrey Smirnov <andrey.smirnov@talos-systems.com>
  • Loading branch information
smira committed Mar 2, 2023
1 parent 570819b commit fe473c0
Show file tree
Hide file tree
Showing 3 changed files with 306 additions and 77 deletions.
76 changes: 76 additions & 0 deletions kubernetes/manifests/cli.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package manifests

import (
"context"

"k8s.io/client-go/rest"
)

// SyncWithLog applies the manifests to the cluster logging the results via logFunc.
func SyncWithLog(ctx context.Context, objects []Manifest, config *rest.Config, dryRun bool, logFunc func(string, ...any)) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

syncCh := make(chan SyncResult)
errCh := make(chan error, 1)

go func() {
errCh <- Sync(ctx, objects, config, dryRun, syncCh)
}()

logFunc("updating manifests")

var updatedManifests []Manifest

syncLoop:
for {
select {
case result := <-syncCh:
logFunc(" > processing manifest %s", result.Path)

switch {
case result.Skipped:
logFunc(" < no changes")
case dryRun:
logFunc("%s", result.Diff)
logFunc(" < dry run, change skipped")
case !dryRun:
logFunc("%s", result.Diff)
logFunc(" < applied successfully")

updatedManifests = append(updatedManifests, result.Object)
}
case err := <-errCh:
if err == nil {
break syncLoop
}

return err
}
}

if dryRun {
return nil
}

logFunc("waiting for all manifests to be applied")

rolloutCh := make(chan RolloutProgress)

go func() {
errCh <- WaitForRollout(ctx, config, updatedManifests, rolloutCh)
}()

for {
select {
case result := <-rolloutCh:
logFunc(" > waiting for %s", result.Path)
case err := <-errCh:
return err
}
}
}
151 changes: 151 additions & 0 deletions kubernetes/manifests/rollout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package manifests

import (
"context"
"time"

"github.com/siderolabs/gen/channel"
"github.com/siderolabs/go-retry/retry"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/rest"

"github.com/siderolabs/go-kubernetes/kubernetes"
)

// RolloutProgress indicates the current manifest rollout progress.
type RolloutProgress struct {
Object Manifest
Path string
}

// WaitForRollout waits for the manifest rollout to be complete.
func WaitForRollout(ctx context.Context, config *rest.Config, objects []Manifest, resultCh chan<- RolloutProgress) error {
var deployments, daemonsets []Manifest

for _, object := range objects {
switch {
case object.GetKind() == "Deployment" && object.GroupVersionKind().Group == "apps":
deployments = append(deployments, object)
case object.GetKind() == "DaemonSet" && object.GroupVersionKind().Group == "apps":
daemonsets = append(daemonsets, object)
}
}

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return err
}

defer clientset.Close() //nolint:errcheck

if err = waitForDeploymentsRollout(ctx, clientset, deployments, resultCh); err != nil {
return err
}

if err = waitForDaemonSetsRollout(ctx, clientset, daemonsets, resultCh); err != nil {
return err
}

return nil
}

func waitForDeploymentsRollout(ctx context.Context, clientset *kubernetes.Client, deployments []Manifest, resultCh chan<- RolloutProgress) error {
for _, obj := range deployments {
obj := obj

if !channel.SendWithContext(ctx, resultCh,
RolloutProgress{
Object: obj,
Path: manifestPath(obj),
}) {
return ctx.Err()
}

err := retry.Constant(3*time.Minute, retry.WithUnits(10*time.Second)).Retry(func() error {
deployment, err := clientset.AppsV1().Deployments(obj.GetNamespace()).Get(ctx, obj.GetName(), metav1.GetOptions{})
if err != nil {
if kubernetes.IsRetryableError(err) {
return retry.ExpectedError(err)
}

return err
}

if deployment.Generation != deployment.Status.ObservedGeneration {
return retry.ExpectedErrorf("deployment %s generation %d != observed generation %d", deployment.Name, deployment.Generation, deployment.Status.ObservedGeneration)
}

if deployment.Status.ReadyReplicas != deployment.Status.Replicas || deployment.Status.UpdatedReplicas != deployment.Status.Replicas {
return retry.ExpectedErrorf("deployment %s ready replicas %d != replicas %d", deployment.Name, deployment.Status.ReadyReplicas, deployment.Status.Replicas)
}

return nil
})
if err != nil {
return err
}
}

return nil
}

func waitForDaemonSetsRollout(ctx context.Context, clientset *kubernetes.Client, daemonSets []Manifest, resultCh chan<- RolloutProgress) error {
for _, obj := range daemonSets {
obj := obj

if !channel.SendWithContext(ctx, resultCh,
RolloutProgress{
Object: obj,
Path: manifestPath(obj),
}) {
return ctx.Err()
}

err := retry.Constant(5*time.Minute, retry.WithUnits(10*time.Second)).Retry(func() error {
daemonSet, err := clientset.AppsV1().DaemonSets(obj.GetNamespace()).Get(ctx, obj.GetName(), metav1.GetOptions{})
if err != nil {
if kubernetes.IsRetryableError(err) {
return retry.ExpectedError(err)
}

return err
}

if daemonSet.Generation != daemonSet.Status.ObservedGeneration {
return retry.ExpectedErrorf("expected observed generation for %s to be %d, got %d",
daemonSet.Name, daemonSet.Generation, daemonSet.Status.ObservedGeneration)
}

if daemonSet.Status.UpdatedNumberScheduled != daemonSet.Status.DesiredNumberScheduled {
return retry.ExpectedErrorf("expected current number up-to-date for %s to be %d, got %d",
daemonSet.Name, daemonSet.Status.UpdatedNumberScheduled, daemonSet.Status.CurrentNumberScheduled)
}

if daemonSet.Status.CurrentNumberScheduled != daemonSet.Status.DesiredNumberScheduled {
return retry.ExpectedErrorf("expected current number scheduled for %s to be %d, got %d",
daemonSet.Name, daemonSet.Status.DesiredNumberScheduled, daemonSet.Status.CurrentNumberScheduled)
}

if daemonSet.Status.NumberAvailable != daemonSet.Status.DesiredNumberScheduled {
return retry.ExpectedErrorf("expected number available for %s to be %d, got %d",
daemonSet.Name, daemonSet.Status.DesiredNumberScheduled, daemonSet.Status.NumberAvailable)
}

if daemonSet.Status.NumberReady != daemonSet.Status.DesiredNumberScheduled {
return retry.ExpectedErrorf("expected number ready for %s to be %d, got %d",
daemonSet.Name, daemonSet.Status.DesiredNumberScheduled, daemonSet.Status.NumberReady)
}

return nil
})
if err != nil {
return err
}
}

return nil
}
Loading

0 comments on commit fe473c0

Please sign in to comment.