diff --git a/kubernetes/manifests/cli.go b/kubernetes/manifests/cli.go new file mode 100644 index 0000000..0c78604 --- /dev/null +++ b/kubernetes/manifests/cli.go @@ -0,0 +1,76 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package manifests + +import ( + "context" + + "k8s.io/client-go/rest" +) + +// SyncWithLog applies the manifests to the cluster logging the results via logFunc. +func SyncWithLog(ctx context.Context, objects []Manifest, config *rest.Config, dryRun bool, logFunc func(string, ...any)) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + syncCh := make(chan SyncResult) + errCh := make(chan error, 1) + + go func() { + errCh <- Sync(ctx, objects, config, dryRun, syncCh) + }() + + logFunc("updating manifests") + + var updatedManifests []Manifest + +syncLoop: + for { + select { + case result := <-syncCh: + logFunc(" > processing manifest %s", result.Path) + + switch { + case result.Skipped: + logFunc(" < no changes") + case dryRun: + logFunc("%s", result.Diff) + logFunc(" < dry run, change skipped") + case !dryRun: + logFunc("%s", result.Diff) + logFunc(" < applied successfully") + + updatedManifests = append(updatedManifests, result.Object) + } + case err := <-errCh: + if err == nil { + break syncLoop + } + + return err + } + } + + if dryRun { + return nil + } + + logFunc("waiting for all manifests to be applied") + + rolloutCh := make(chan RolloutProgress) + + go func() { + errCh <- WaitForRollout(ctx, config, updatedManifests, rolloutCh) + }() + + for { + select { + case result := <-rolloutCh: + logFunc(" > waiting for %s", result.Path) + case err := <-errCh: + return err + } + } +} diff --git a/kubernetes/manifests/rollout.go b/kubernetes/manifests/rollout.go new file mode 100644 index 0000000..80b266b --- /dev/null +++ b/kubernetes/manifests/rollout.go @@ -0,0 +1,151 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +package manifests + +import ( + "context" + "time" + + "github.com/siderolabs/gen/channel" + "github.com/siderolabs/go-retry/retry" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/rest" + + "github.com/siderolabs/go-kubernetes/kubernetes" +) + +// RolloutProgress indicates the current manifest rollout progress. +type RolloutProgress struct { + Object Manifest + Path string +} + +// WaitForRollout waits for the manifest rollout to be complete. +func WaitForRollout(ctx context.Context, config *rest.Config, objects []Manifest, resultCh chan<- RolloutProgress) error { + var deployments, daemonsets []Manifest + + for _, object := range objects { + switch { + case object.GetKind() == "Deployment" && object.GroupVersionKind().Group == "apps": + deployments = append(deployments, object) + case object.GetKind() == "DaemonSet" && object.GroupVersionKind().Group == "apps": + daemonsets = append(daemonsets, object) + } + } + + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + return err + } + + defer clientset.Close() //nolint:errcheck + + if err = waitForDeploymentsRollout(ctx, clientset, deployments, resultCh); err != nil { + return err + } + + if err = waitForDaemonSetsRollout(ctx, clientset, daemonsets, resultCh); err != nil { + return err + } + + return nil +} + +func waitForDeploymentsRollout(ctx context.Context, clientset *kubernetes.Client, deployments []Manifest, resultCh chan<- RolloutProgress) error { + for _, obj := range deployments { + obj := obj + + if !channel.SendWithContext(ctx, resultCh, + RolloutProgress{ + Object: obj, + Path: manifestPath(obj), + }) { + return ctx.Err() + } + + err := retry.Constant(3*time.Minute, retry.WithUnits(10*time.Second)).Retry(func() error { + deployment, err := clientset.AppsV1().Deployments(obj.GetNamespace()).Get(ctx, obj.GetName(), metav1.GetOptions{}) + if err != nil { + if kubernetes.IsRetryableError(err) { + return retry.ExpectedError(err) + } + + return err + } + + if deployment.Generation != deployment.Status.ObservedGeneration { + return retry.ExpectedErrorf("deployment %s generation %d != observed generation %d", deployment.Name, deployment.Generation, deployment.Status.ObservedGeneration) + } + + if deployment.Status.ReadyReplicas != deployment.Status.Replicas || deployment.Status.UpdatedReplicas != deployment.Status.Replicas { + return retry.ExpectedErrorf("deployment %s ready replicas %d != replicas %d", deployment.Name, deployment.Status.ReadyReplicas, deployment.Status.Replicas) + } + + return nil + }) + if err != nil { + return err + } + } + + return nil +} + +func waitForDaemonSetsRollout(ctx context.Context, clientset *kubernetes.Client, daemonSets []Manifest, resultCh chan<- RolloutProgress) error { + for _, obj := range daemonSets { + obj := obj + + if !channel.SendWithContext(ctx, resultCh, + RolloutProgress{ + Object: obj, + Path: manifestPath(obj), + }) { + return ctx.Err() + } + + err := retry.Constant(5*time.Minute, retry.WithUnits(10*time.Second)).Retry(func() error { + daemonSet, err := clientset.AppsV1().DaemonSets(obj.GetNamespace()).Get(ctx, obj.GetName(), metav1.GetOptions{}) + if err != nil { + if kubernetes.IsRetryableError(err) { + return retry.ExpectedError(err) + } + + return err + } + + if daemonSet.Generation != daemonSet.Status.ObservedGeneration { + return retry.ExpectedErrorf("expected observed generation for %s to be %d, got %d", + daemonSet.Name, daemonSet.Generation, daemonSet.Status.ObservedGeneration) + } + + if daemonSet.Status.UpdatedNumberScheduled != daemonSet.Status.DesiredNumberScheduled { + return retry.ExpectedErrorf("expected current number up-to-date for %s to be %d, got %d", + daemonSet.Name, daemonSet.Status.UpdatedNumberScheduled, daemonSet.Status.CurrentNumberScheduled) + } + + if daemonSet.Status.CurrentNumberScheduled != daemonSet.Status.DesiredNumberScheduled { + return retry.ExpectedErrorf("expected current number scheduled for %s to be %d, got %d", + daemonSet.Name, daemonSet.Status.DesiredNumberScheduled, daemonSet.Status.CurrentNumberScheduled) + } + + if daemonSet.Status.NumberAvailable != daemonSet.Status.DesiredNumberScheduled { + return retry.ExpectedErrorf("expected number available for %s to be %d, got %d", + daemonSet.Name, daemonSet.Status.DesiredNumberScheduled, daemonSet.Status.NumberAvailable) + } + + if daemonSet.Status.NumberReady != daemonSet.Status.DesiredNumberScheduled { + return retry.ExpectedErrorf("expected number ready for %s to be %d, got %d", + daemonSet.Name, daemonSet.Status.DesiredNumberScheduled, daemonSet.Status.NumberReady) + } + + return nil + }) + if err != nil { + return err + } + } + + return nil +} diff --git a/kubernetes/manifests/sync.go b/kubernetes/manifests/sync.go index ac450ff..0415208 100644 --- a/kubernetes/manifests/sync.go +++ b/kubernetes/manifests/sync.go @@ -13,6 +13,7 @@ import ( "github.com/hexops/gotextdiff" "github.com/hexops/gotextdiff/myers" "github.com/hexops/gotextdiff/span" + "github.com/siderolabs/gen/channel" "github.com/siderolabs/go-retry/retry" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" @@ -28,12 +29,24 @@ import ( "github.com/siderolabs/go-kubernetes/kubernetes" ) +// SyncResult describes the result of a single manifest sync. +type SyncResult struct { + Path string + Object Manifest + Diff string + Skipped bool +} + // Sync applies the manifests to the cluster providing the results. -func Sync(ctx context.Context, objects []Manifest, config *rest.Config, dryRun bool, logFunc func(string, ...any)) error { +func Sync(ctx context.Context, objects []Manifest, config *rest.Config, dryRun bool, resultCh chan<- SyncResult) error { dialer := kubernetes.NewDialer() config.Dial = dialer.DialContext - defer dialer.CloseAll() + defer func() { + dialer.CloseAll() + + config.Dial = nil + }() k8sClient, err := dynamic.NewForConfig(config) if err != nil { @@ -47,16 +60,9 @@ func Sync(ctx context.Context, objects []Manifest, config *rest.Config, dryRun b mapper := restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(dc)) - // list of deployments to wait for to become ready after update - var deployments []Manifest - - logFunc("updating manifests") - for _, obj := range objects { - logFunc(" > processing manifest %s %s", obj.GetKind(), obj.GetName()) - var ( - resp *unstructured.Unstructured + resp Manifest diff string skipped bool ) @@ -74,61 +80,13 @@ func Sync(ctx context.Context, objects []Manifest, config *rest.Config, dryRun b return err } - switch { - case dryRun: - var diffInfo string - if diff != "" { - diffInfo = fmt.Sprintf(", diff:\n%s", diff) - } - - logFunc(" < apply skipped in dry run%s", diffInfo) - - continue - case skipped: - logFunc(" < apply skipped: nothing to update") - - continue - } - - if resp.GetKind() == "Deployment" { - deployments = append(deployments, resp) - } - - logFunc(" < update applied, diff:\n%s", diff) - } - - if len(deployments) == 0 { - return nil - } - - config.Dial = nil - - clientset, err := kubernetes.NewForConfig(config) - if err != nil { - return err - } - - defer clientset.Close() //nolint:errcheck - - for _, obj := range deployments { - obj := obj - - err := retry.Constant(3*time.Minute, retry.WithUnits(10*time.Second)).Retry(func() error { - deployment, err := clientset.AppsV1().Deployments(obj.GetNamespace()).Get(ctx, obj.GetName(), metav1.GetOptions{}) - if err != nil { - return err - } - - if deployment.Status.ReadyReplicas != deployment.Status.Replicas || deployment.Status.UpdatedReplicas != deployment.Status.Replicas { - return retry.ExpectedErrorf("deployment %s ready replicas %d != replicas %d", deployment.Name, deployment.Status.ReadyReplicas, deployment.Status.Replicas) - } - - logFunc(" > updated %s", deployment.GetName()) - - return nil - }) - if err != nil { - return err + if !channel.SendWithContext(ctx, resultCh, SyncResult{ + Path: manifestPath(resp), + Object: resp, + Diff: diff, + Skipped: skipped, + }) { + return ctx.Err() } } @@ -172,18 +130,18 @@ func updateManifest( } exists = false - diff = "resource is going to be created" } switch { case dryRun: - return nil, diff, exists, nil + return obj, diff, diff == "", nil case !exists: resp, err = dr.Create(ctx, obj, metav1.CreateOptions{}) case diff != "": resp, err = dr.Update(ctx, obj, metav1.UpdateOptions{}) default: skipped = true + resp = obj } return resp, diff, skipped, err @@ -192,6 +150,15 @@ func updateManifest( func getResourceDiff(ctx context.Context, dr dynamic.ResourceInterface, obj Manifest) (string, error) { current, err := dr.Get(ctx, obj.GetName(), metav1.GetOptions{}) if err != nil { + if apierrors.IsNotFound(err) { + diff, diffErr := manifestDiff(nil, obj) + if diffErr != nil { + return "", diffErr + } + + return diff, err + } + return "", err } @@ -247,20 +214,55 @@ func getResourceDiff(ctx context.Context, dr dynamic.ResourceInterface, obj Mani ignoreKey("secrets") // injected by Kubernetes in ServiceAccount objects } - x, err := k8syaml.Marshal(current) - if err != nil { - return "", err + return manifestDiff(current, resp) +} + +func manifestPath(obj Manifest) string { + gv := obj.GetObjectKind().GroupVersionKind().Version + if obj.GetObjectKind().GroupVersionKind().Group != "" { + gv = obj.GetObjectKind().GroupVersionKind().Group + "/" + gv } - y, err := k8syaml.Marshal(resp) - if err != nil { - return "", err + name := obj.GetName() + + if obj.GetNamespace() != "" { + name = obj.GetNamespace() + "/" + name } - resourceID := fmt.Sprintf("%s/%s/%s", obj.GetObjectKind().GroupVersionKind(), obj.GetNamespace(), obj.GetName()) + return fmt.Sprintf("%s.%s/%s", gv, obj.GetObjectKind().GroupVersionKind().Kind, name) +} + +func manifestDiff(a, b Manifest) (string, error) { + var ( + ma, mb []byte + path string + err error + ) + + if a != nil { + path = manifestPath(a) + + ma, err = k8syaml.Marshal(a) + if err != nil { + return "", err + } + } + + if b != nil { + path = manifestPath(b) + + mb, err = k8syaml.Marshal(b) + if err != nil { + return "", err + } + } + + return computeDiff(path, string(ma), string(mb)), nil +} - edits := myers.ComputeEdits(span.URIFromPath(resourceID), string(x), string(y)) - diff := gotextdiff.ToUnified(resourceID, resourceID, string(x), edits) +func computeDiff(path string, a, b string) string { + edits := myers.ComputeEdits(span.URIFromPath(path), a, b) + diff := gotextdiff.ToUnified("a/"+path, "b/"+path, a, edits) - return fmt.Sprint(diff), nil + return fmt.Sprint(diff) }