-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
refactor: make sync easier to consume without CLI
Refactor so that one could consume raw results and process the results the way it might be needed vs. simple result logging. Signed-off-by: Andrey Smirnov <andrey.smirnov@talos-systems.com>
- Loading branch information
Showing
3 changed files
with
306 additions
and
77 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
// This Source Code Form is subject to the terms of the Mozilla Public | ||
// License, v. 2.0. If a copy of the MPL was not distributed with this | ||
// file, You can obtain one at http://mozilla.org/MPL/2.0/. | ||
|
||
package manifests | ||
|
||
import ( | ||
"context" | ||
|
||
"k8s.io/client-go/rest" | ||
) | ||
|
||
// SyncWithLog applies the manifests to the cluster logging the results via logFunc. | ||
func SyncWithLog(ctx context.Context, objects []Manifest, config *rest.Config, dryRun bool, logFunc func(string, ...any)) error { | ||
ctx, cancel := context.WithCancel(ctx) | ||
defer cancel() | ||
|
||
syncCh := make(chan SyncResult) | ||
errCh := make(chan error, 1) | ||
|
||
go func() { | ||
errCh <- Sync(ctx, objects, config, dryRun, syncCh) | ||
}() | ||
|
||
logFunc("updating manifests") | ||
|
||
var updatedManifests []Manifest | ||
|
||
syncLoop: | ||
for { | ||
select { | ||
case result := <-syncCh: | ||
logFunc(" > processing manifest %s", result.Path) | ||
|
||
switch { | ||
case result.Skipped: | ||
logFunc(" < no changes") | ||
case dryRun: | ||
logFunc("%s", result.Diff) | ||
logFunc(" < dry run, change skipped") | ||
case !dryRun: | ||
logFunc("%s", result.Diff) | ||
logFunc(" < applied successfully") | ||
|
||
updatedManifests = append(updatedManifests, result.Object) | ||
} | ||
case err := <-errCh: | ||
if err == nil { | ||
break syncLoop | ||
} | ||
|
||
return err | ||
} | ||
} | ||
|
||
if dryRun { | ||
return nil | ||
} | ||
|
||
logFunc("waiting for all manifests to be applied") | ||
|
||
rolloutCh := make(chan RolloutProgress) | ||
|
||
go func() { | ||
errCh <- WaitForRollout(ctx, config, updatedManifests, rolloutCh) | ||
}() | ||
|
||
for { | ||
select { | ||
case result := <-rolloutCh: | ||
logFunc(" > waiting for %s", result.Path) | ||
case err := <-errCh: | ||
return err | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
// This Source Code Form is subject to the terms of the Mozilla Public | ||
// License, v. 2.0. If a copy of the MPL was not distributed with this | ||
// file, You can obtain one at http://mozilla.org/MPL/2.0/. | ||
|
||
package manifests | ||
|
||
import ( | ||
"context" | ||
"time" | ||
|
||
"github.com/siderolabs/gen/channel" | ||
"github.com/siderolabs/go-retry/retry" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/client-go/rest" | ||
|
||
"github.com/siderolabs/go-kubernetes/kubernetes" | ||
) | ||
|
||
// RolloutProgress indicates the current manifest rollout progress. | ||
type RolloutProgress struct { | ||
Object Manifest | ||
Path string | ||
} | ||
|
||
// WaitForRollout waits for the manifest rollout to be complete. | ||
func WaitForRollout(ctx context.Context, config *rest.Config, objects []Manifest, resultCh chan<- RolloutProgress) error { | ||
var deployments, daemonsets []Manifest | ||
|
||
for _, object := range objects { | ||
switch { | ||
case object.GetKind() == "Deployment" && object.GroupVersionKind().Group == "apps": | ||
deployments = append(deployments, object) | ||
case object.GetKind() == "DaemonSet" && object.GroupVersionKind().Group == "apps": | ||
daemonsets = append(daemonsets, object) | ||
} | ||
} | ||
|
||
clientset, err := kubernetes.NewForConfig(config) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
defer clientset.Close() //nolint:errcheck | ||
|
||
if err = waitForDeploymentsRollout(ctx, clientset, deployments, resultCh); err != nil { | ||
return err | ||
} | ||
|
||
if err = waitForDaemonSetsRollout(ctx, clientset, daemonsets, resultCh); err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func waitForDeploymentsRollout(ctx context.Context, clientset *kubernetes.Client, deployments []Manifest, resultCh chan<- RolloutProgress) error { | ||
for _, obj := range deployments { | ||
obj := obj | ||
|
||
if !channel.SendWithContext(ctx, resultCh, | ||
RolloutProgress{ | ||
Object: obj, | ||
Path: manifestPath(obj), | ||
}) { | ||
return ctx.Err() | ||
} | ||
|
||
err := retry.Constant(3*time.Minute, retry.WithUnits(10*time.Second)).Retry(func() error { | ||
deployment, err := clientset.AppsV1().Deployments(obj.GetNamespace()).Get(ctx, obj.GetName(), metav1.GetOptions{}) | ||
if err != nil { | ||
if kubernetes.IsRetryableError(err) { | ||
return retry.ExpectedError(err) | ||
} | ||
|
||
return err | ||
} | ||
|
||
if deployment.Generation != deployment.Status.ObservedGeneration { | ||
return retry.ExpectedErrorf("deployment %s generation %d != observed generation %d", deployment.Name, deployment.Generation, deployment.Status.ObservedGeneration) | ||
} | ||
|
||
if deployment.Status.ReadyReplicas != deployment.Status.Replicas || deployment.Status.UpdatedReplicas != deployment.Status.Replicas { | ||
return retry.ExpectedErrorf("deployment %s ready replicas %d != replicas %d", deployment.Name, deployment.Status.ReadyReplicas, deployment.Status.Replicas) | ||
} | ||
|
||
return nil | ||
}) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func waitForDaemonSetsRollout(ctx context.Context, clientset *kubernetes.Client, daemonSets []Manifest, resultCh chan<- RolloutProgress) error { | ||
for _, obj := range daemonSets { | ||
obj := obj | ||
|
||
if !channel.SendWithContext(ctx, resultCh, | ||
RolloutProgress{ | ||
Object: obj, | ||
Path: manifestPath(obj), | ||
}) { | ||
return ctx.Err() | ||
} | ||
|
||
err := retry.Constant(5*time.Minute, retry.WithUnits(10*time.Second)).Retry(func() error { | ||
daemonSet, err := clientset.AppsV1().DaemonSets(obj.GetNamespace()).Get(ctx, obj.GetName(), metav1.GetOptions{}) | ||
if err != nil { | ||
if kubernetes.IsRetryableError(err) { | ||
return retry.ExpectedError(err) | ||
} | ||
|
||
return err | ||
} | ||
|
||
if daemonSet.Generation != daemonSet.Status.ObservedGeneration { | ||
return retry.ExpectedErrorf("expected observed generation for %s to be %d, got %d", | ||
daemonSet.Name, daemonSet.Generation, daemonSet.Status.ObservedGeneration) | ||
} | ||
|
||
if daemonSet.Status.UpdatedNumberScheduled != daemonSet.Status.DesiredNumberScheduled { | ||
return retry.ExpectedErrorf("expected current number up-to-date for %s to be %d, got %d", | ||
daemonSet.Name, daemonSet.Status.UpdatedNumberScheduled, daemonSet.Status.CurrentNumberScheduled) | ||
} | ||
|
||
if daemonSet.Status.CurrentNumberScheduled != daemonSet.Status.DesiredNumberScheduled { | ||
return retry.ExpectedErrorf("expected current number scheduled for %s to be %d, got %d", | ||
daemonSet.Name, daemonSet.Status.DesiredNumberScheduled, daemonSet.Status.CurrentNumberScheduled) | ||
} | ||
|
||
if daemonSet.Status.NumberAvailable != daemonSet.Status.DesiredNumberScheduled { | ||
return retry.ExpectedErrorf("expected number available for %s to be %d, got %d", | ||
daemonSet.Name, daemonSet.Status.DesiredNumberScheduled, daemonSet.Status.NumberAvailable) | ||
} | ||
|
||
if daemonSet.Status.NumberReady != daemonSet.Status.DesiredNumberScheduled { | ||
return retry.ExpectedErrorf("expected number ready for %s to be %d, got %d", | ||
daemonSet.Name, daemonSet.Status.DesiredNumberScheduled, daemonSet.Status.NumberReady) | ||
} | ||
|
||
return nil | ||
}) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
|
||
return nil | ||
} |
Oops, something went wrong.