Skip to content

feat: Add sync-dependencies #514

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/sync/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ const (
AnnotationSyncOptions = "argocd.argoproj.io/sync-options"
// AnnotationSyncWave indicates which wave of the sync the resource or hook should be in
AnnotationSyncWave = "argocd.argoproj.io/sync-wave"
// AnnotationSyncDependencies is dependencies of this object
// The dependencies are specified as a comma-separated list of objects.
// The format of each object is <group>/<kind>/<namespace>/<name>
// The group and kind are optional. If not specified, they'll match any group or kind.
// The namespace is optional. If not specified, the namespace is assumed to be the same as the namespace of the object.
// The name is required.
AnnotationSyncDependencies = "argocd.argoproj.io/sync-dependencies"
// AnnotationKeyHook contains the hook type of a resource
AnnotationKeyHook = "argocd.argoproj.io/hook"
// AnnotationKeyHookDeletePolicy is the policy of deleting a hook
Expand Down
36 changes: 26 additions & 10 deletions pkg/sync/sync_context.go
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In syncTasks we need to check for cyclic-dependencies.

Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ func (sc *syncContext) Sync() {

sc.log.WithValues("tasks", tasks).V(1).Info("Filtering out non-pending tasks")
// remove tasks that are completed, we can assume that there are no running tasks
tasks = tasks.Filter(func(t *syncTask) bool { return t.pending() })
completedTasks, tasks := tasks.Split(func(t *syncTask) bool { return t.completed() })

if sc.applyOutOfSyncOnly {
tasks = sc.filterOutOfSyncTasks(tasks)
Expand All @@ -500,15 +500,22 @@ func (sc *syncContext) Sync() {
wave := tasks.wave()
finalWave := phase == tasks.lastPhase() && wave == tasks.lastWave()

// if it is the last phase/wave and the only remaining tasks are non-hooks, the we are successful
// EVEN if those objects subsequently degraded
// This handles the common case where neither hooks or waves are used and a sync equates to simply an (asynchronous) kubectl apply of manifests, which succeeds immediately.
remainingTasks := tasks.Filter(func(t *syncTask) bool { return t.phase != phase || wave != t.wave() || t.isHook() })
hasIncompleteDependency := func(t *syncTask) bool {
for _, dep := range t.dependencies() {
if !completedTasks.Any(func(t *syncTask) bool { return dep.match(t.obj()) }) {
return true
}
}
return false
}

tasks, waitingTasks := tasks.Split(func(t *syncTask) bool {
return t.phase == phase && t.wave() == wave && !hasIncompleteDependency(t)
})

sc.log.WithValues("phase", phase, "wave", wave, "tasks", tasks, "syncFailTasks", syncFailTasks).V(1).Info("Filtering tasks in correct phase and wave")
tasks = tasks.Filter(func(t *syncTask) bool { return t.phase == phase && t.wave() == wave })
sc.log.WithValues("phase", phase, "wave", wave, "tasks", tasks, "waitingTasks", waitingTasks, "syncFailTasks", syncFailTasks).V(1).Info("Filtering tasks in correct phase and wave")

sc.setOperationPhase(common.OperationRunning, "one or more tasks are running")
sc.setOperationPhase(common.OperationRunning, fmt.Sprintf("%d task(s) are running", len(tasks)))

sc.log.WithValues("tasks", tasks).V(1).Info("Wet-run")
runState := sc.runTasks(tasks, false)
Expand All @@ -529,12 +536,19 @@ func (sc *syncContext) Sync() {
sc.deleteHooks(hooksPendingDeletionFailed)
sc.setOperationFailed(syncFailTasks, syncFailedTasks, "one or more objects failed to apply")
case successful:
if remainingTasks.Len() == 0 {

// if it is the last phase/wave and the only remaining tasks are non-hooks, then the sync is complete
// EVEN if those objects subsequently degraded
// This handles the case where neither hooks or waves are used and a sync equates to an
// (asynchronous) kubectl apply of manifests, which succeeds immediately.
anyHooks := tasks.Any(func(task *syncTask) bool { return task.isHook() })

if len(waitingTasks) == 0 && !anyHooks {
// delete all completed hooks which have appropriate delete policy
sc.deleteHooks(hooksPendingDeletionSuccessful)
sc.setOperationPhase(common.OperationSucceeded, "successfully synced (all tasks run)")
} else {
sc.setRunningPhase(remainingTasks, false)
sc.setRunningPhase(tasks, false)
Copy link
Author

@alexec alexec Mar 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a 2+ year old bug.

}
default:
sc.setRunningPhase(tasks.Filter(func(task *syncTask) bool {
Expand Down Expand Up @@ -772,6 +786,8 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) {

tasks.Sort()

sc.log.WithValues("tasks", tasks).V(1).Info("tasks after sorting")

Comment on lines +789 to +790
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
sc.log.WithValues("tasks", tasks).V(1).Info("tasks after sorting")

// finally enrich tasks with the result
for _, task := range tasks {
result, ok := sc.syncRes[task.resultKey()]
Expand Down
1 change: 0 additions & 1 deletion pkg/sync/sync_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1082,7 +1082,6 @@ func TestNamespaceAutoCreationForNonExistingNs(t *testing.T) {
syncStatus: synccommon.ResultCodeSyncFailed,
operationState: synccommon.OperationError,
message: "namespaceModifier error: some error",
waveOverride: nil,
}, tasks[0])
})
}
Expand Down
32 changes: 27 additions & 5 deletions pkg/sync/sync_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package sync

import (
"fmt"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"strings"

"github.com/argoproj/gitops-engine/pkg/sync/common"
"github.com/argoproj/gitops-engine/pkg/sync/hook"
Expand All @@ -23,7 +23,6 @@ type syncTask struct {
syncStatus common.ResultCode
operationState common.OperationPhase
message string
waveOverride *int
}

func ternary(val bool, a, b string) string {
Expand Down Expand Up @@ -58,12 +57,35 @@ func (t *syncTask) obj() *unstructured.Unstructured {
}

func (t *syncTask) wave() int {
if t.waveOverride != nil {
return *t.waveOverride
}
return syncwaves.Wave(t.obj())
}

// Returns the dependencies of the task.
func (t *syncTask) dependencies() []taskDependency {
var out []taskDependency
for _, s := range strings.Split(t.obj().GetAnnotations()[common.AnnotationSyncDependencies], ",") {
if s == "" {
continue
}
parts := strings.Split(s, "/")
switch len(parts) {
case 1:
name := parts[0]
out = append(out, taskDependency{Name: name})
case 2:
namespace, name := parts[0], parts[1]
out = append(out, taskDependency{Namespace: namespace, Name: name})
case 3:
kind, namespace, name := parts[0], parts[1], parts[2]
out = append(out, taskDependency{Kind: kind, Namespace: namespace, Name: name})
case 4:
group, kind, namespace, name := parts[0], parts[1], parts[2], parts[3]
out = append(out, taskDependency{Kind: kind, Group: group, Namespace: namespace, Name: name})
}
}
return out
}

func (t *syncTask) isHook() bool {
return hook.IsHook(t.obj())
}
Expand Down
35 changes: 35 additions & 0 deletions pkg/sync/sync_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,38 @@ func Test_syncTask_wave(t *testing.T) {
assert.Equal(t, 0, (&syncTask{targetObj: NewPod()}).wave())
assert.Equal(t, 1, (&syncTask{targetObj: Annotate(NewPod(), "argocd.argoproj.io/sync-wave", "1")}).wave())
}

func Test_syncTask_dependencies(t1 *testing.T) {
tests := []struct {
name string
task syncTask
want []taskDependency
}{
{"Empty", syncTask{targetObj: Unstructured(`{"kind": "Pod"}`)}, []taskDependency{}},
{
"Name",
syncTask{targetObj: Annotate(NewPod(), common.AnnotationSyncDependencies, "foo")},
[]taskDependency{{Name: "foo"}},
},
{
"Namespace",
syncTask{targetObj: Annotate(NewPod(), common.AnnotationSyncDependencies, "foo/")},
[]taskDependency{{Namespace: "foo"}},
},
{
"Kind",
syncTask{targetObj: Annotate(NewPod(), common.AnnotationSyncDependencies, "Foo//")},
[]taskDependency{{Kind: "Foo"}},
},
{
"Group",
syncTask{targetObj: Annotate(NewPod(), common.AnnotationSyncDependencies, "foo///")},
[]taskDependency{{Group: "foo"}},
},
}
for _, tt := range tests {
t1.Run(tt.name, func(t1 *testing.T) {
assert.ElementsMatch(t1, tt.want, tt.task.dependencies(), "dependencies()")
})
}
}
140 changes: 60 additions & 80 deletions pkg/sync/sync_tasks.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
package sync

import (
"fmt"
"github.com/argoproj/gitops-engine/pkg/utils/kube"
"sort"
"strings"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

"github.com/argoproj/gitops-engine/pkg/sync/common"
"github.com/argoproj/gitops-engine/pkg/utils/kube"
)

// kindOrder represents the correct order of Kubernetes resources within a manifest
Expand Down Expand Up @@ -77,29 +74,58 @@ func (s syncTasks) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

// order is
// 1. phase
// 2. wave
// 3. kind
// 4. name
// Less returns true if task i should be sorted before task j
// The order is:
// 1. Namespaces
// 2. CRDs
// 3. Sync phase
// 4. Wave
// 5. Kind
// 6. Name
func (s syncTasks) Less(i, j int) bool {

tA := s[i]
tB := s[j]
l := s[i]
r := s[j]

a := l.obj()
b := r.obj()

// namespaces must come before objects that depend on them
if a.GetKind() == kube.NamespaceKind && a.GroupVersionKind().Group == "" && a.GetName() == b.GetNamespace() {
return true
}

// crds must come before objects that depend on them
if isCRDOfGroupKind(b.GroupVersionKind().Group, b.GetKind(), a) {
return true
}

// Order by dependency.
// We tolerate cycles in the dependency graph, but we will not detect them.
// We also tolerate missing dependencies, but we will not detect them.
deps := r.dependencies()
for len(deps) > 0 {
dep := s.taskFor(deps[0])
deps = deps[1:]
if dep == nil {
continue
}
if dep == l {
return true
}
deps = append(deps, dep.dependencies()...)
}

d := syncPhaseOrder[tA.phase] - syncPhaseOrder[tB.phase]
d := syncPhaseOrder[l.phase] - syncPhaseOrder[r.phase]
if d != 0 {
return d < 0
}

d = tA.wave() - tB.wave()
d = l.wave() - r.wave()
if d != 0 {
return d < 0
}

a := tA.obj()
b := tB.obj()

// we take advantage of the fact that if the kind is not in the kindOrder map,
// then it will return the default int value of zero, which is the highest value
d = kindOrder[a.GetKind()] - kindOrder[b.GetKind()]
Expand All @@ -112,69 +138,6 @@ func (s syncTasks) Less(i, j int) bool {

func (s syncTasks) Sort() {
sort.Sort(s)
// make sure namespaces are created before resources referencing namespaces
s.adjustDeps(func(obj *unstructured.Unstructured) (string, bool) {
return obj.GetName(), obj.GetKind() == kube.NamespaceKind && obj.GroupVersionKind().Group == ""
}, func(obj *unstructured.Unstructured) (string, bool) {
return obj.GetNamespace(), obj.GetNamespace() != ""
})
// make sure CRDs are created before CRs
s.adjustDeps(func(obj *unstructured.Unstructured) (string, bool) {
if kube.IsCRD(obj) {
crdGroup, ok, err := unstructured.NestedString(obj.Object, "spec", "group")
if err != nil || !ok {
return "", false
}
crdKind, ok, err := unstructured.NestedString(obj.Object, "spec", "names", "kind")
if err != nil || !ok {
return "", false
}
return fmt.Sprintf("%s/%s", crdGroup, crdKind), true
}
return "", false
}, func(obj *unstructured.Unstructured) (string, bool) {
gk := obj.GroupVersionKind()
return fmt.Sprintf("%s/%s", gk.Group, gk.Kind), true
})
}

// adjust order of tasks and bubble up tasks which are dependencies of other tasks
// (e.g. namespace sync should happen before resources that resides in that namespace)
func (s syncTasks) adjustDeps(isDep func(obj *unstructured.Unstructured) (string, bool), doesRefDep func(obj *unstructured.Unstructured) (string, bool)) {
// store dependency key and first occurrence of resource referencing the dependency
firstIndexByDepKey := map[string]int{}

for i, t := range s {
if t.targetObj == nil {
continue
}

if depKey, ok := isDep(t.targetObj); ok {
// if tasks is a dependency then insert if before first task that reference it
if index, ok := firstIndexByDepKey[depKey]; ok {
// wave and sync phase of dependency resource must be same as wave and phase of resource that depend on it
wave := s[index].wave()
t.waveOverride = &wave
t.phase = s[index].phase

for j := i; j > index; j-- {
s[j] = s[j-1]
}
s[index] = t
// increase previously collected indexes by 1
for ns, firstIndex := range firstIndexByDepKey {
if firstIndex >= index {
firstIndexByDepKey[ns] = firstIndex + 1
}
}
}
} else if depKey, ok := doesRefDep(t.targetObj); ok {
// if task is referencing the dependency then store first index of it
if _, ok := firstIndexByDepKey[depKey]; !ok {
firstIndexByDepKey[depKey] = i
}
}
}
}

func (s syncTasks) Filter(predicate func(task *syncTask) bool) (tasks syncTasks) {
Expand Down Expand Up @@ -244,6 +207,15 @@ func (s syncTasks) String() string {
return "[" + strings.Join(values, ", ") + "]"
}

// names returns the names of the tasks
func (s syncTasks) names() []string {
var values []string
for _, task := range s {
values = append(values, task.name())
}
return values
}

func (s syncTasks) phase() common.SyncPhase {
if len(s) > 0 {
return s[0].phase
Expand Down Expand Up @@ -273,5 +245,13 @@ func (s syncTasks) lastWave() int {
}

func (s syncTasks) multiStep() bool {
return s.wave() != s.lastWave() || s.phase() != s.lastPhase()
return s.wave() != s.lastWave() || s.phase() != s.lastPhase() || s.Any(func(task *syncTask) bool {
return len(task.dependencies()) > 0
})
}

func (s syncTasks) taskFor(dep taskDependency) *syncTask {
return s.Find(func(task *syncTask) bool {
return dep.match(task.obj())
})
}
Loading