Skip to content

Add new API to propagate context and deprecate olds #714

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pkg/cache/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ func (f *noopSettings) IsExcludedResource(_, _, _ string) bool {
// Settings caching customizations
type Settings struct {
// ResourceHealthOverride contains health assessment overrides
// Deprecated: use ResourceHealthOverrideContext insttead.
ResourceHealthOverride health.HealthOverride
// ResourceHealthOverrideContext contains health assessment overrides
ResourceHealthOverrideContext health.HealthOverrideContext
// ResourcesFilter holds filter that excludes resources
ResourcesFilter kube.ResourceFilter
}
Expand All @@ -54,7 +57,7 @@ func SetPopulateResourceInfoHandler(handler OnPopulateResourceInfoHandler) Updat
// SetSettings updates caching settings
func SetSettings(settings Settings) UpdateSettingsFunc {
return func(cache *clusterCache) {
cache.settings = Settings{settings.ResourceHealthOverride, settings.ResourcesFilter}
cache.settings = Settings{settings.ResourceHealthOverride, settings.ResourceHealthOverrideContext, settings.ResourcesFilter}
}
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/diff/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,15 @@ func (n *noopNormalizer) Normalize(_ *unstructured.Unstructured) error {
return nil
}

func (n *noopNormalizer) NormalizeContext(_ context.Context, _ *unstructured.Unstructured) error {
return nil
}

// Normalizer updates resource before comparing it
type Normalizer interface {
// Deprecated: use NormalizeContext instead
Normalize(un *unstructured.Unstructured) error
NormalizeContext(ctx context.Context, un *unstructured.Unstructured) error
}

// GetNoopNormalizer returns normalizer that does not apply any resource modifications
Expand Down
30 changes: 29 additions & 1 deletion pkg/health/health.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package health

import (
"context"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"

Expand Down Expand Up @@ -33,6 +35,13 @@ type HealthOverride interface {
GetResourceHealth(obj *unstructured.Unstructured) (*HealthStatus, error)
}

// Implements custom health assessment that overrides built-in assessment
type HealthOverrideContext interface {
GetResourceHealth(ctx context.Context, obj *unstructured.Unstructured) (*HealthStatus, error)
}

type HealthOverrideFuncContext func(ctx context.Context, obj *unstructured.Unstructured) (*HealthStatus, error)

// Holds health assessment results
type HealthStatus struct {
Status HealthStatusCode `json:"status,omitempty"`
Expand Down Expand Up @@ -66,6 +75,25 @@ func IsWorse(current, new HealthStatusCode) bool {

// GetResourceHealth returns the health of a k8s resource
func GetResourceHealth(obj *unstructured.Unstructured, healthOverride HealthOverride) (health *HealthStatus, err error) {
healthOverrideContext := func(_ context.Context, obj *unstructured.Unstructured) (*HealthStatus, error) {
return healthOverride.GetResourceHealth(obj)
}
return getResourceHealth(context.Background(), obj, healthOverrideContext)
}

// GetResourceHealth returns the health of a k8s resource
func GetResourceHealthContext(ctx context.Context, obj *unstructured.Unstructured, healthOverride HealthOverrideContext) (health *HealthStatus, err error) {
var healthOverrideFunc HealthOverrideFuncContext
if healthOverride != nil {
healthOverrideFunc = func(ctx context.Context, obj *unstructured.Unstructured) (*HealthStatus, error) {
return healthOverride.GetResourceHealth(ctx, obj)
}
}
return getResourceHealth(ctx, obj, healthOverrideFunc)
}

// GetResourceHealth returns the health of a k8s resource
func getResourceHealth(ctx context.Context, obj *unstructured.Unstructured, healthOverride HealthOverrideFuncContext) (health *HealthStatus, err error) {
if obj.GetDeletionTimestamp() != nil && !hook.HasHookFinalizer(obj) {
return &HealthStatus{
Status: HealthStatusProgressing,
Expand All @@ -74,7 +102,7 @@ func GetResourceHealth(obj *unstructured.Unstructured, healthOverride HealthOver
}

if healthOverride != nil {
health, err := healthOverride.GetResourceHealth(obj)
health, err := healthOverride(ctx, obj)
if err != nil {
health = &HealthStatus{
Status: HealthStatusUnknown,
Expand Down
3 changes: 2 additions & 1 deletion pkg/health/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Package provides functionality that allows assessing the health state of a Kuber
package health

import (
"context"
"os"
"testing"

Expand All @@ -28,7 +29,7 @@ func getHealthStatus(t *testing.T, yamlPath string) *HealthStatus {
var obj unstructured.Unstructured
err = yaml.Unmarshal(yamlBytes, &obj)
require.NoError(t, err)
health, err := GetResourceHealth(&obj, nil)
health, err := GetResourceHealthContext(context.Background(), &obj, nil)
require.NoError(t, err)
return health
}
Expand Down
86 changes: 67 additions & 19 deletions pkg/sync/sync_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,16 @@ func (r *reconciledResource) key() kubeutil.ResourceKey {
type SyncContext interface {
// Terminate terminates sync operation. The method is asynchronous: it starts deletion is related K8S resources
// such as in-flight resource hooks, updates operation status, and exists without waiting for resource completion.
// Deprecated: use TerminateContext instead
Terminate()
// TerminateContext terminates sync operation. The method is asynchronous: it starts deletion is related K8S resources
// such as in-flight resource hooks, updates operation status, and exists without waiting for resource completion.
TerminateContext(ctx context.Context)
// Executes next synchronization step and updates operation status.
// Deprecated: use SyncContext instead
Sync()
// Executes next synchronization step and updates operation status.
SyncContext(ctx context.Context)
// Returns current sync operation state and information about resources synchronized so far.
GetState() (common.OperationPhase, string, []common.ResourceSyncResult)
}
Expand All @@ -75,12 +82,20 @@ func WithPermissionValidator(validator common.PermissionValidator) SyncOpt {
}

// WithHealthOverride sets specified health override
// Deprecated: use WithHealthOverrideContext instead
func WithHealthOverride(override health.HealthOverride) SyncOpt {
return func(ctx *syncContext) {
ctx.healthOverride = override
}
}

// WithHealthOverrideContext sets specified health override
func WithHealthOverrideContext(override health.HealthOverrideContext) SyncOpt {
return func(ctx *syncContext) {
ctx.healthOverrideContext = override
}
}

// WithInitialState sets sync operation initial state
func WithInitialState(phase common.OperationPhase, message string, results []common.ResourceSyncResult, startedAt metav1.Time) SyncOpt {
return func(ctx *syncContext) {
Expand Down Expand Up @@ -308,11 +323,17 @@ const (
)

// getOperationPhase returns a health status from a _live_ unstructured object
func (sc *syncContext) getOperationPhase(obj *unstructured.Unstructured) (common.OperationPhase, string, error) {
func (sc *syncContext) getOperationPhase(ctx context.Context, obj *unstructured.Unstructured) (common.OperationPhase, string, error) {
phase := common.OperationSucceeded
message := obj.GetName() + " created"

resHealth, err := health.GetResourceHealth(obj, sc.healthOverride)
var resHealth *health.HealthStatus
var err error
if sc.healthOverrideContext != nil {
resHealth, err = health.GetResourceHealthContext(ctx, obj, sc.healthOverrideContext)
} else if sc.healthOverride != nil {
resHealth, err = health.GetResourceHealth(obj, sc.healthOverride)
}
if err != nil {
return "", "", err
}
Expand All @@ -333,18 +354,19 @@ func (sc *syncContext) getOperationPhase(obj *unstructured.Unstructured) (common
}

type syncContext struct {
healthOverride health.HealthOverride
permissionValidator common.PermissionValidator
resources map[kubeutil.ResourceKey]reconciledResource
hooks []*unstructured.Unstructured
config *rest.Config
rawConfig *rest.Config
dynamicIf dynamic.Interface
disco discovery.DiscoveryInterface
extensionsclientset *clientset.Clientset
kubectl kubeutil.Kubectl
resourceOps kubeutil.ResourceOperations
namespace string
healthOverride health.HealthOverride
healthOverrideContext health.HealthOverrideContext
permissionValidator common.PermissionValidator
resources map[kubeutil.ResourceKey]reconciledResource
hooks []*unstructured.Unstructured
config *rest.Config
rawConfig *rest.Config
dynamicIf dynamic.Interface
disco discovery.DiscoveryInterface
extensionsclientset *clientset.Clientset
kubectl kubeutil.Kubectl
resourceOps kubeutil.ResourceOperations
namespace string

dryRun bool
skipDryRun bool
Expand Down Expand Up @@ -403,8 +425,19 @@ func (sc *syncContext) setRunningPhase(tasks []*syncTask, isPendingDeletion bool
}
}

// sync has performs the actual apply or hook based sync
// Sync has performs the actual apply or hook based sync
// Deprecated: use SyncContext instead
func (sc *syncContext) Sync() {
sc.SyncContext(context.Background())
}

// SyncContext has performs the actual apply or hook based sync
func (sc *syncContext) SyncContext(ctx context.Context) {
sc.sync(ctx)
}

// sync has performs the actual apply or hook based sync
func (sc *syncContext) sync(ctx context.Context) {
sc.log.WithValues("skipHooks", sc.skipHooks, "started", sc.started()).Info("Syncing")
tasks, ok := sc.getSyncTasks()
if !ok {
Expand Down Expand Up @@ -441,15 +474,21 @@ func (sc *syncContext) Sync() {
}) {
if task.isHook() {
// update the hook's result
operationState, message, err := sc.getOperationPhase(task.liveObj)
operationState, message, err := sc.getOperationPhase(ctx, task.liveObj)
if err != nil {
sc.setResourceResult(task, "", common.OperationError, fmt.Sprintf("failed to get resource health: %v", err))
} else {
sc.setResourceResult(task, "", operationState, message)
}
} else {
// this must be calculated on the live object
healthStatus, err := health.GetResourceHealth(task.liveObj, sc.healthOverride)
var healthStatus *health.HealthStatus
var err error
if sc.healthOverrideContext != nil {
healthStatus, err = health.GetResourceHealthContext(ctx, task.liveObj, sc.healthOverrideContext)
} else if sc.healthOverride != nil {
healthStatus, err = health.GetResourceHealth(task.liveObj, sc.healthOverride)
}
if err == nil {
sc.log.WithValues("task", task, "healthStatus", healthStatus).V(1).Info("attempting to update health of running task")
if healthStatus == nil {
Expand Down Expand Up @@ -1176,8 +1215,17 @@ func (sc *syncContext) hasCRDOfGroupKind(group string, kind string) bool {
return false
}

// terminate looks for any running jobs/workflow hooks and deletes the resource
// Deprecated: use TerminateContext instead
func (sc *syncContext) Terminate() {
sc.TerminateContext(context.Background())
}

func (sc *syncContext) TerminateContext(ctx context.Context) {
sc.terminate(ctx)
}

// terminate looks for any running jobs/workflow hooks and deletes the resource
func (sc *syncContext) terminate(ctx context.Context) {
terminateSuccessful := true
sc.log.V(1).Info("terminating")
tasks, _ := sc.getSyncTasks()
Expand All @@ -1190,7 +1238,7 @@ func (sc *syncContext) Terminate() {
terminateSuccessful = false
continue
}
phase, msg, err := sc.getOperationPhase(task.liveObj)
phase, msg, err := sc.getOperationPhase(ctx, task.liveObj)
if err != nil {
sc.setOperationPhase(common.OperationError, fmt.Sprintf("Failed to get hook health: %v", err))
return
Expand Down
31 changes: 26 additions & 5 deletions pkg/utils/kube/ctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type CleanupFunc func()

type OnKubectlRunFunc func(command string) (CleanupFunc, error)

type OnKubectlRunFuncContext func(ctx context.Context, command string) (CleanupFunc, error)

type Kubectl interface {
ManageResources(config *rest.Config, openAPISchema openapi.Resources) (ResourceOperations, func(), error)
LoadOpenAPISchema(config *rest.Config) (openapi.Resources, *managedfields.GvkParser, error)
Expand All @@ -39,13 +41,16 @@ type Kubectl interface {
GetAPIResources(config *rest.Config, preferred bool, resourceFilter ResourceFilter) ([]APIResourceInfo, error)
GetServerVersion(config *rest.Config) (string, error)
NewDynamicClient(config *rest.Config) (dynamic.Interface, error)
// Deprecated: use SetOnKubectlRunContext instead.
SetOnKubectlRun(onKubectlRun OnKubectlRunFunc)
SetOnKubectlRunContext(onKubectlRun OnKubectlRunFuncContext)
}

type KubectlCmd struct {
Log logr.Logger
Tracer tracing.Tracer
OnKubectlRun OnKubectlRunFunc
Log logr.Logger
Tracer tracing.Tracer
OnKubectlRun OnKubectlRunFunc
OnKubectlRunContext OnKubectlRunFuncContext
}

type APIResourceInfo struct {
Expand Down Expand Up @@ -292,11 +297,23 @@ func (k *KubectlCmd) ManageResources(config *rest.Config, openAPISchema openapi.
openAPISchema: openAPISchema,
tracer: k.Tracer,
log: k.Log,
onKubectlRun: k.OnKubectlRun,
onKubectlRun: k.OnKubectlRunContext,
}, cleanup, nil
}

// Deprecated: use ManageServerSideDiffDryRunsContext instead.
func ManageServerSideDiffDryRuns(config *rest.Config, openAPISchema openapi.Resources, tracer tracing.Tracer, log logr.Logger, onKubectlRun OnKubectlRunFunc) (diff.KubeApplier, func(), error) {
onKubectlRunContext := func(_ context.Context, command string) (CleanupFunc, error) {
return onKubectlRun(command)
}
return manageServerSideDiffDryRunsContext(config, openAPISchema, tracer, log, onKubectlRunContext)
}

func ManageServerSideDiffDryRunsContext(config *rest.Config, openAPISchema openapi.Resources, tracer tracing.Tracer, log logr.Logger, onKubectlRunContext OnKubectlRunFuncContext) (diff.KubeApplier, func(), error) {
return manageServerSideDiffDryRunsContext(config, openAPISchema, tracer, log, onKubectlRunContext)
}

func manageServerSideDiffDryRunsContext(config *rest.Config, openAPISchema openapi.Resources, tracer tracing.Tracer, log logr.Logger, onKubectlRunContext OnKubectlRunFuncContext) (diff.KubeApplier, func(), error) {
f, err := os.CreateTemp(utils.TempDir, "")
if err != nil {
return nil, nil, fmt.Errorf("failed to generate temp file for kubeconfig: %w", err)
Expand All @@ -317,7 +334,7 @@ func ManageServerSideDiffDryRuns(config *rest.Config, openAPISchema openapi.Reso
openAPISchema: openAPISchema,
tracer: tracer,
log: log,
onKubectlRun: onKubectlRun,
onKubectlRun: onKubectlRunContext,
}, cleanup, nil
}

Expand Down Expand Up @@ -356,6 +373,10 @@ func (k *KubectlCmd) SetOnKubectlRun(onKubectlRun OnKubectlRunFunc) {
k.OnKubectlRun = onKubectlRun
}

func (k *KubectlCmd) SetOnKubectlRunContext(onKubectlRunContext OnKubectlRunFuncContext) {
k.OnKubectlRunContext = onKubectlRunContext
}

func RunAllAsync(count int, action func(i int) error) error {
g, ctx := errgroup.WithContext(context.Background())
loop:
Expand Down
3 changes: 3 additions & 0 deletions pkg/utils/kube/kubetest/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ func (k *MockKubectlCmd) LoadOpenAPISchema(_ *rest.Config) (openapi.Resources, *
func (k *MockKubectlCmd) SetOnKubectlRun(_ kube.OnKubectlRunFunc) {
}

func (k *MockKubectlCmd) SetOnKubectlRunContext(_ kube.OnKubectlRunFuncContext) {
}

func (k *MockKubectlCmd) ManageResources(_ *rest.Config, _ openapi.Resources) (kube.ResourceOperations, func(), error) {
return &MockResourceOps{}, func() {
}, nil
Expand Down
Loading
Loading