Skip to content

Commit

Permalink
chore: using domain-qualified finalizers (#6023)
Browse files Browse the repository at this point in the history
* chore: using domain-qualified finalizers

Signed-off-by: Roger Torrentsgenerós <rogert@spotify.com>

* test: adding test, fixing others

Signed-off-by: Roger Torrentsgenerós <rogert@spotify.com>

* chore: use flyte.org/finalizer instead

Signed-off-by: Roger Torrentsgenerós <rogert@spotify.com>

---------

Signed-off-by: Roger Torrentsgenerós <rogert@spotify.com>
  • Loading branch information
trutx authored Jan 4, 2025
1 parent fd9a378 commit 27c9edd
Show file tree
Hide file tree
Showing 13 changed files with 124 additions and 168 deletions.
4 changes: 4 additions & 0 deletions flyteadmin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ require (
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/go-redis/redis v6.15.7+incompatible // indirect
github.com/go-sql-driver/mysql v1.6.0 // indirect
github.com/go-test/deep v1.0.7 // indirect
github.com/goccy/go-json v0.10.2 // indirect
Expand Down Expand Up @@ -155,6 +156,7 @@ require (
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/ncw/swift v1.0.53 // indirect
github.com/nxadm/tail v1.4.11 // indirect
github.com/ory/go-acc v0.2.6 // indirect
github.com/ory/go-convenience v0.1.0 // indirect
github.com/ory/viper v1.7.5 // indirect
Expand Down Expand Up @@ -201,6 +203,7 @@ require (
golang.org/x/term v0.27.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect
Expand All @@ -214,6 +217,7 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.28.4 // indirect
k8s.io/component-base v0.28.4 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)
Expand Down
6 changes: 6 additions & 0 deletions flyteadmin/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,8 @@ github.com/go-openapi/validate v0.18.0/go.mod h1:Uh4HdOzKt19xGIGm1qHf/ofbX1YQ4Y+
github.com/go-openapi/validate v0.19.2/go.mod h1:1tRCw7m3jtI8eNWEEliiAqUIcBztB2KDnRCRMUi7GTA=
github.com/go-openapi/validate v0.19.3/go.mod h1:90Vh6jjkTn+OT1Eefm0ZixWNFjhtOH7vS9k0lo6zwJo=
github.com/go-openapi/validate v0.19.10/go.mod h1:RKEZTUWDkxKQxN2jDT7ZnZi2bhZlbNMAuKvKB+IaGx8=
github.com/go-redis/redis v6.15.7+incompatible h1:3skhDh95XQMpnqeqNftPkQD9jL9e5e36z/1SUm6dy1U=
github.com/go-redis/redis v6.15.7+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
Expand Down Expand Up @@ -1013,6 +1015,8 @@ github.com/ncw/swift v1.0.53/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/
github.com/nicksnyder/go-i18n v1.10.0/go.mod h1:HrK7VCrbOvQoUAQ7Vpy7i87N7JZZZ7R2xBGjv0j365Q=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d/go.mod h1:YUTz3bUH2ZwIWBy3CJBeOBEugqcmXREj14T+iG/4k4U=
github.com/nxadm/tail v1.4.11 h1:8feyoE3OzPrcshW5/MJ4sGESc5cqmGkGCWlco4l0bqY=
github.com/nxadm/tail v1.4.11/go.mod h1:OTaG3NK980DZzxbRq6lEuzgU+mug70nY11sMd4JXXHc=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/oleiade/reflections v1.0.0/go.mod h1:RbATFBbKYkVdqmSFtx13Bb/tVhR0lgOBXunWTZKeL4w=
github.com/oleiade/reflections v1.0.1 h1:D1XO3LVEYroYskEsoSiGItp9RUxG6jWnCVvrqH0HHQM=
Expand All @@ -1024,6 +1028,7 @@ github.com/onsi/ginkgo v1.9.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W
github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.10.3/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
github.com/onsi/ginkgo/v2 v2.11.0 h1:WgqUCUt/lT6yXoQ8Wef0fsNn5cAuMK7+KT9UFRz2tcU=
github.com/onsi/ginkgo/v2 v2.11.0/go.mod h1:ZhrRA5XmEE3x3rhlzamx/JJvujdZoJ2uvgI7kR0iZvM=
github.com/onsi/gomega v1.4.1/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
Expand Down Expand Up @@ -1971,6 +1976,7 @@ gopkg.in/square/go-jose.v2 v2.2.2/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76
gopkg.in/square/go-jose.v2 v2.5.2-0.20210529014059-a5c7eec3c614/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI=
gopkg.in/square/go-jose.v2 v2.6.0 h1:NGk74WTnPKBNUhNzQX7PYcTLUjoq7mzKk2OKbvwk2iI=
gopkg.in/square/go-jose.v2 v2.6.0/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/validator.v2 v2.0.0-20180514200540-135c24b11c19/go.mod h1:o4V0GXN9/CAmCsvJ0oXYZvrZOe7syiDZSN1GWGZTGzc=
gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74=
Expand Down
4 changes: 3 additions & 1 deletion flyteadmin/pkg/workflowengine/impl/prepare_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"github.com/golang/protobuf/proto"
"google.golang.org/grpc/codes"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

"github.com/flyteorg/flyte/flyteadmin/pkg/errors"
"github.com/flyteorg/flyte/flyteadmin/pkg/workflowengine/interfaces"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller"
)

func addMapValues(overrides map[string]string, defaultValues map[string]string) map[string]string {
Expand Down Expand Up @@ -130,7 +132,7 @@ func PrepareFlyteWorkflow(data interfaces.ExecutionData, flyteWorkflow *v1alpha1
flyteWorkflow.AcceptedAt = &acceptAtWrapper

// Add finalizer
flyteWorkflow.Finalizers = append(flyteWorkflow.Finalizers, "flyte-finalizer")
_ = controllerutil.AddFinalizer(flyteWorkflow, controller.Finalizer)

// add permissions from auth and security context. Adding permissions from auth would be removed once all clients
// have migrated over to security context
Expand Down
3 changes: 2 additions & 1 deletion flyteadmin/pkg/workflowengine/impl/prepare_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller"
)

const testRole = "role"
Expand Down Expand Up @@ -254,5 +255,5 @@ func TestPrepareFlyteWorkflow(t *testing.T) {
OutputLocationPrefix: "s3://bucket/key",
},
})
assert.Equal(t, flyteWorkflow.Finalizers, []string{"flyte-finalizer"})
assert.Equal(t, flyteWorkflow.Finalizers, []string{controller.Finalizer})
}
42 changes: 24 additions & 18 deletions flyteplugins/go/tasks/plugins/array/k8s/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

"github.com/flyteorg/flyte/flyteplugins/go/tasks/errors"
pluginsCore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
Expand All @@ -30,8 +31,11 @@ const (
ErrBuildPodTemplate stdErrors.ErrorCode = "POD_TEMPLATE_FAILED"
ErrReplaceCmdTemplate stdErrors.ErrorCode = "CMD_TEMPLATE_FAILED"
FlyteK8sArrayIndexVarName string = "FLYTE_K8S_ARRAY_INDEX"
finalizer string = "flyte/array"
JobIndexVarName string = "BATCH_JOB_ARRAY_INDEX_VAR_NAME"
finalizer string = "flyte.org/finalizer-array"
// Old non-domain-qualified finalizer for backwards compatibility
// This should eventually be removed
oldFinalizer string = "flyte/array"
JobIndexVarName string = "BATCH_JOB_ARRAY_INDEX_VAR_NAME"
)

var (
Expand Down Expand Up @@ -69,8 +73,7 @@ func addMetadata(stCtx SubTaskExecutionContext, cfg *Config, k8sPluginCfg *confi
}

if k8sPluginCfg.InjectFinalizer {
f := append(pod.GetFinalizers(), finalizer)
pod.SetFinalizers(f)
_ = controllerutil.AddFinalizer(pod, finalizer)
}

if len(cfg.DefaultScheduler) > 0 {
Expand Down Expand Up @@ -134,25 +137,28 @@ func abortSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Confi
}

if err != nil && !isK8sObjectNotExists(err) {
logger.Warningf(ctx, "Failed to clear finalizers for Resource with name: %v/%v. Error: %v",
logger.Warningf(ctx, "Failed to clear finalizer for Resource with name: %v/%v. Error: %v",
resourceToFinalize.GetNamespace(), resourceToFinalize.GetName(), err)
return err
}

return nil
}

// clearFinalizers removes finalizers (if they exist) from the k8s resource
func clearFinalizers(ctx context.Context, o client.Object, kubeClient pluginsCore.KubeClient) error {
if len(o.GetFinalizers()) > 0 {
o.SetFinalizers([]string{})
// clearFinalizer removes the Flyte finalizer (if it exists) from the k8s resource
func clearFinalizer(ctx context.Context, o client.Object, kubeClient pluginsCore.KubeClient) error {
// Checking for the old finalizer too for backwards compatibility. This should eventually be removed
// Go does short-circuiting so we have to make sure both are removed
finalizerRemoved := controllerutil.RemoveFinalizer(o, finalizer)
oldFinalizerRemoved := controllerutil.RemoveFinalizer(o, oldFinalizer)
if finalizerRemoved || oldFinalizerRemoved {
err := kubeClient.GetClient().Update(ctx, o)
if err != nil && !isK8sObjectNotExists(err) {
logger.Warningf(ctx, "Failed to clear finalizers for Resource with name: %v/%v. Error: %v", o.GetNamespace(), o.GetName(), err)
logger.Warningf(ctx, "Failed to clear finalizer for Resource with name: %v/%v. Error: %v", o.GetNamespace(), o.GetName(), err)
return err
}
} else {
logger.Debugf(ctx, "Finalizers are already empty for Resource with name: %v/%v", o.GetNamespace(), o.GetName())
logger.Debugf(ctx, "Finalizer is already cleared for Resource with name: %v/%v", o.GetNamespace(), o.GetName())
}
return nil
}
Expand Down Expand Up @@ -211,7 +217,7 @@ func launchSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Conf
}

// finalizeSubtask performs operations to complete the k8s pod defined by the SubTaskExecutionContext
// and Config. These may include removing finalizers and deleting the k8s resource.
// and Config. These may include removing finalizer and deleting the k8s resource.
func finalizeSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Config, kubeClient pluginsCore.KubeClient) error {
errs := stdErrors.ErrorCollection{}
var pod *v1.Pod
Expand All @@ -231,10 +237,10 @@ func finalizeSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Co
nsName = k8stypes.NamespacedName{Namespace: pod.GetNamespace(), Name: pod.GetName()}
}

// In InjectFinalizer is on, it means we may have added the finalizers when we launched this resource. Attempt to
// clear them to allow the object to be deleted/garbage collected. If InjectFinalizer was turned on (through config)
// In InjectFinalizer is on, it means we may have added the finalizer when we launched this resource. Attempt to
// clear it to allow the object to be deleted/garbage collected. If InjectFinalizer was turned on (through config)
// after the resource was created, we will not find any finalizers to clear and the object may have already been
// deleted at this point. Therefore, account for these cases and do not consider them errors.
// deleted at this point. Therefore, account for these cases and do not consider the errors.
if k8sPluginCfg.InjectFinalizer {
// Attempt to get resource from informer cache, if not found, retrieve it from API server.
if err := kubeClient.GetClient().Get(ctx, nsName, pod); err != nil {
Expand All @@ -250,7 +256,7 @@ func finalizeSubtask(ctx context.Context, stCtx SubTaskExecutionContext, cfg *Co
// This must happen after sending admin event. It's safe against partial failures because if the event failed, we will
// simply retry in the next round. If the event succeeded but this failed, we will try again the next round to send
// the same event (idempotent) and then come here again...
err := clearFinalizers(ctx, pod, kubeClient)
err := clearFinalizer(ctx, pod, kubeClient)
if err != nil {
errs.Append(err)
}
Expand Down Expand Up @@ -308,10 +314,10 @@ func getSubtaskPhaseInfo(ctx context.Context, stCtx SubTaskExecutionContext, cfg
return pluginsCore.PhaseInfoUndefined, err
}

if !phaseInfo.Phase().IsTerminal() && o.GetDeletionTimestamp() != nil {
if !phaseInfo.Phase().IsTerminal() && !o.GetDeletionTimestamp().IsZero() {
// If the object has been deleted, that is, it has a deletion timestamp, but is not in a terminal state, we should
// mark the task as a retryable failure. We've seen this happen when a kubelet disappears - all pods running on
// the node are marked with a deletionTimestamp, but our finalizers prevent the pod from being deleted.
// the node are marked with a deletionTimestamp, but our finalizer prevents the pod from being deleted.
// This can also happen when a user deletes a Pod directly.
failureReason := fmt.Sprintf("object [%s] terminated in the background, manually", nsName.String())
return pluginsCore.PhaseInfoSystemRetryableFailure("UnexpectedObjectDeletion", failureReason, nil), nil
Expand Down
5 changes: 5 additions & 0 deletions flytepropeller/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ import (
)

const (
// Finalizer is the global and domain-qualified Flyte finalizer
Finalizer = "flyte.org/finalizer"
// OldFinalizer is the old non-domain-qualified finalizer, kept for backwards compatibility
// This should eventually be removed
OldFinalizer = "flyte-finalizer"
resourceLevelMonitorCycleDuration = 5 * time.Second
missing = "missing"
podDefaultNamespace = "flyte"
Expand Down
36 changes: 0 additions & 36 deletions flytepropeller/pkg/controller/finalizer.go

This file was deleted.

70 changes: 0 additions & 70 deletions flytepropeller/pkg/controller/finalizer_test.go

This file was deleted.

17 changes: 12 additions & 5 deletions flytepropeller/pkg/controller/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/trace"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
Expand Down Expand Up @@ -103,7 +104,7 @@ func (p *Propeller) TryMutateWorkflow(ctx context.Context, originalW *v1alpha1.F
ctx = contextutils.WithResourceVersion(ctx, mutableW.GetResourceVersion())

maxRetries := uint32(p.cfg.MaxWorkflowRetries) // #nosec G115
if IsDeleted(mutableW) || (mutableW.Status.FailedAttempts > maxRetries) {
if !mutableW.GetDeletionTimestamp().IsZero() || mutableW.Status.FailedAttempts > maxRetries {
var err error
func() {
defer func() {
Expand All @@ -125,7 +126,7 @@ func (p *Propeller) TryMutateWorkflow(ctx context.Context, originalW *v1alpha1.F

if !mutableW.GetExecutionStatus().IsTerminated() {
var err error
SetFinalizerIfEmpty(mutableW, FinalizerKey)
_ = controllerutil.AddFinalizer(mutableW, Finalizer)
SetDefinitionVersionIfEmpty(mutableW, v1alpha1.LatestWorkflowDefinitionVersion)

func() {
Expand Down Expand Up @@ -210,7 +211,9 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error {
}

if w.GetExecutionStatus().IsTerminated() {
if HasCompletedLabel(w) && !HasFinalizer(w) {
// Checking for the old finalizer for backwards compatibility
// This should be eventually removed
if HasCompletedLabel(w) && !controllerutil.ContainsFinalizer(w, Finalizer) && !controllerutil.ContainsFinalizer(w, OldFinalizer) {
logger.Debugf(ctx, "Workflow is terminated.")
// This workflow had previously completed, let us ignore it
return nil
Expand Down Expand Up @@ -325,7 +328,9 @@ func (p *Propeller) streak(ctx context.Context, w *v1alpha1.FlyteWorkflow, wfClo
// If the end result is a terminated workflow, we remove the labels
// We add a completed label so that we can avoid polling for this workflow
SetCompletedLabel(mutatedWf, time.Now())
ResetFinalizers(mutatedWf)
_ = controllerutil.RemoveFinalizer(mutatedWf, Finalizer)
// Backwards compatibility. This should eventually be removed
_ = controllerutil.RemoveFinalizer(mutatedWf, OldFinalizer)
}
}

Expand Down Expand Up @@ -387,7 +392,9 @@ func (p *Propeller) streak(ctx context.Context, w *v1alpha1.FlyteWorkflow, wfClo
mutableW := w.DeepCopy()
// catch potential indefinite update loop
if mutatedWf.GetExecutionStatus().IsTerminated() {
ResetFinalizers(mutableW)
_ = controllerutil.RemoveFinalizer(mutableW, Finalizer)
// Backwards compatibility. This should eventually be removed
_ = controllerutil.RemoveFinalizer(mutableW, OldFinalizer)
SetDefinitionVersionIfEmpty(mutableW, v1alpha1.LatestWorkflowDefinitionVersion)
SetCompletedLabel(mutableW, time.Now())
msg := fmt.Sprintf("Workflow size has breached threshold. Finalized with status: %v", mutatedWf.GetExecutionStatus().GetPhase())
Expand Down
Loading

0 comments on commit 27c9edd

Please sign in to comment.