From 476b09cbbf2730927236de39b16067dd2cff8099 Mon Sep 17 00:00:00 2001 From: Jesse Suen Date: Mon, 16 Mar 2020 11:51:59 -0700 Subject: [PATCH] feat: improve api-server and controller performance (#3222) * group read comparison settings during app reconciliation * Reduce lock contention in clusterInfo::ensureSynced(). Add getRepoObj stats * Remove additional source of lock contention * Exclude the coordination.k8s.io/Lease resource Co-authored-by: Alexander Matyushentsev --- Gopkg.lock | 2 + Makefile | 2 +- cmd/argocd-application-controller/main.go | 2 +- cmd/argocd-repo-server/main.go | 2 +- cmd/argocd-server/commands/root.go | 2 +- controller/appcontroller.go | 17 +-- controller/cache/cache.go | 48 ++++---- controller/cache/cache_test.go | 4 +- controller/cache/cluster.go | 73 ++++++----- controller/cache/cluster_test.go | 2 +- controller/metrics/metrics.go | 6 +- controller/state.go | 73 +++++++---- hack/test.sh | 2 +- server/application/application.go | 140 +++++++++++++++------- server/application/application_test.go | 15 ++- server/server.go | 18 ++- util/settings/resources_filter.go | 14 ++- util/stats/stats.go | 104 ++++++---------- util/stats/stats_test.go | 31 +++++ 19 files changed, 335 insertions(+), 222 deletions(-) create mode 100644 util/stats/stats_test.go diff --git a/Gopkg.lock b/Gopkg.lock index 969d3160d8692..e944e62963867 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -72,6 +72,7 @@ "errors", "exec", "rand", + "stats", "time", ] pruneopts = "" @@ -1930,6 +1931,7 @@ "github.com/TomOnTime/utfutil", "github.com/argoproj/pkg/errors", "github.com/argoproj/pkg/exec", + "github.com/argoproj/pkg/stats", "github.com/argoproj/pkg/time", "github.com/casbin/casbin", "github.com/casbin/casbin/model", diff --git a/Makefile b/Makefile index 64146b68c2128..b2b796c4522d7 100644 --- a/Makefile +++ b/Makefile @@ -181,7 +181,7 @@ test: .PHONY: test-e2e test-e2e: # NO_PROXY ensures all tests don't go out through a proxy if one is configured on the test system - NO_PROXY=* ./hack/test.sh -timeout 15m ./test/e2e + NO_PROXY=* ./hack/test.sh -timeout 15m -v ./test/e2e .PHONY: start-e2e start-e2e: cli diff --git a/cmd/argocd-application-controller/main.go b/cmd/argocd-application-controller/main.go index fd57ce0880af3..3caed89131ad4 100644 --- a/cmd/argocd-application-controller/main.go +++ b/cmd/argocd-application-controller/main.go @@ -6,6 +6,7 @@ import ( "os" "time" + "github.com/argoproj/pkg/stats" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "k8s.io/client-go/kubernetes" @@ -25,7 +26,6 @@ import ( "github.com/argoproj/argo-cd/util/cli" "github.com/argoproj/argo-cd/util/kube" "github.com/argoproj/argo-cd/util/settings" - "github.com/argoproj/argo-cd/util/stats" ) const ( diff --git a/cmd/argocd-repo-server/main.go b/cmd/argocd-repo-server/main.go index 75beb97df607a..9ade7f873d13b 100644 --- a/cmd/argocd-repo-server/main.go +++ b/cmd/argocd-repo-server/main.go @@ -7,6 +7,7 @@ import ( "os" "time" + "github.com/argoproj/pkg/stats" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -16,7 +17,6 @@ import ( reposervercache "github.com/argoproj/argo-cd/reposerver/cache" "github.com/argoproj/argo-cd/reposerver/metrics" "github.com/argoproj/argo-cd/util/cli" - "github.com/argoproj/argo-cd/util/stats" "github.com/argoproj/argo-cd/util/tls" ) diff --git a/cmd/argocd-server/commands/root.go b/cmd/argocd-server/commands/root.go index 68f320c8403ff..876e72f189756 100644 --- a/cmd/argocd-server/commands/root.go +++ b/cmd/argocd-server/commands/root.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/argoproj/pkg/stats" "github.com/spf13/cobra" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" @@ -15,7 +16,6 @@ import ( "github.com/argoproj/argo-cd/server" servercache "github.com/argoproj/argo-cd/server/cache" "github.com/argoproj/argo-cd/util/cli" - "github.com/argoproj/argo-cd/util/stats" "github.com/argoproj/argo-cd/util/tls" ) diff --git a/controller/appcontroller.go b/controller/appcontroller.go index c4cbc32ff4281..7d626911da365 100644 --- a/controller/appcontroller.go +++ b/controller/appcontroller.go @@ -821,22 +821,20 @@ func (ctrl *ApplicationController) processAppRefreshQueueItem() (processNext boo return } + app := origApp.DeepCopy() + logCtx := log.WithFields(log.Fields{"application": app.Name}) startTime := time.Now() defer func() { reconcileDuration := time.Since(startTime) ctrl.metricsServer.IncReconcile(origApp, reconcileDuration) - logCtx := log.WithFields(log.Fields{ - "application": origApp.Name, - "time_ms": reconcileDuration.Seconds() * 1e3, + logCtx.WithFields(log.Fields{ + "time_ms": reconcileDuration.Milliseconds(), "level": comparisonLevel, "dest-server": origApp.Spec.Destination.Server, "dest-namespace": origApp.Spec.Destination.Namespace, - }) - logCtx.Info("Reconciliation completed") + }).Info("Reconciliation completed") }() - app := origApp.DeepCopy() - logCtx := log.WithFields(log.Fields{"application": app.Name}) if comparisonLevel == ComparisonWithNothing { managedResources := make([]*appv1.ResourceDiff, 0) if err := ctrl.cache.GetAppManagedResources(app.Name, &managedResources); err != nil { @@ -888,6 +886,9 @@ func (ctrl *ApplicationController) processAppRefreshQueueItem() (processNext boo observedAt := metav1.Now() compareResult := ctrl.appStateManager.CompareAppState(app, project, revision, app.Spec.Source, refreshType == appv1.RefreshTypeHard, localManifests) + for k, v := range compareResult.timings { + logCtx = logCtx.WithField(k, v.Milliseconds()) + } ctrl.normalizeApplication(origApp, app) @@ -912,7 +913,7 @@ func (ctrl *ApplicationController) processAppRefreshQueueItem() (processNext boo ) } } else { - logCtx.Infof("Sync prevented by sync window") + logCtx.Info("Sync prevented by sync window") } if app.Status.ReconciledAt == nil || comparisonLevel == CompareWithLatest { diff --git a/controller/cache/cache.go b/controller/cache/cache.go index 2f43e37723924..1754e563367ca 100644 --- a/controller/cache/cache.go +++ b/controller/cache/cache.go @@ -70,7 +70,7 @@ func NewLiveStateCache( appInformer: appInformer, db: db, clusters: make(map[string]*clusterInfo), - lock: &sync.Mutex{}, + lock: &sync.RWMutex{}, onObjectUpdated: onObjectUpdated, kubectl: kubectl, settingsMgr: settingsMgr, @@ -82,7 +82,7 @@ func NewLiveStateCache( type liveStateCache struct { db db.ArgoDB clusters map[string]*clusterInfo - lock *sync.Mutex + lock *sync.RWMutex appInformer cache.SharedIndexInformer onObjectUpdated ObjectUpdatedHandler kubectl kube.Kubectl @@ -109,31 +109,35 @@ func (c *liveStateCache) loadCacheSettings() (*cacheSettings, error) { } func (c *liveStateCache) getCluster(server string) (*clusterInfo, error) { - c.lock.Lock() - defer c.lock.Unlock() + c.lock.RLock() info, ok := c.clusters[server] + c.lock.RUnlock() if !ok { + logCtx := log.WithField("server", server) + logCtx.Info("initializing cluster") cluster, err := c.db.GetCluster(context.Background(), server) if err != nil { return nil, err } info = &clusterInfo{ apisMeta: make(map[schema.GroupKind]*apiMeta), - lock: &sync.Mutex{}, + lock: &sync.RWMutex{}, nodes: make(map[kube.ResourceKey]*node), nsIndex: make(map[string]map[kube.ResourceKey]*node), onObjectUpdated: c.onObjectUpdated, kubectl: c.kubectl, cluster: cluster, syncTime: nil, - log: log.WithField("server", cluster.Server), + log: logCtx, cacheSettingsSrc: c.getCacheSettings, onEventReceived: func(event watch.EventType, un *unstructured.Unstructured) { - c.metricsServer.IncClusterEventsCount(cluster.Server) + gvk := un.GroupVersionKind() + c.metricsServer.IncClusterEventsCount(cluster.Server, gvk.Group, gvk.Kind) }, } - + c.lock.Lock() c.clusters[cluster.Server] = info + c.lock.Unlock() } return info, nil } @@ -152,8 +156,8 @@ func (c *liveStateCache) getSyncedCluster(server string) (*clusterInfo, error) { func (c *liveStateCache) Invalidate() { log.Info("invalidating live state cache") - c.lock.Lock() - defer c.lock.Unlock() + c.lock.RLock() + defer c.lock.RLock() for _, clust := range c.clusters { clust.invalidate() } @@ -210,8 +214,6 @@ func isClusterHasApps(apps []interface{}, cluster *appv1.Cluster) bool { } func (c *liveStateCache) getCacheSettings() *cacheSettings { - c.cacheSettingsLock.Lock() - defer c.cacheSettingsLock.Unlock() return c.cacheSettings } @@ -261,8 +263,9 @@ func (c *liveStateCache) Run(ctx context.Context) error { util.RetryUntilSucceed(func() error { clusterEventCallback := func(event *db.ClusterEvent) { c.lock.Lock() - defer c.lock.Unlock() - if cluster, ok := c.clusters[event.Cluster.Server]; ok { + cluster, ok := c.clusters[event.Cluster.Server] + if ok { + defer c.lock.Unlock() if event.Type == watch.Deleted { cluster.invalidate() delete(c.clusters, event.Cluster.Server) @@ -270,11 +273,14 @@ func (c *liveStateCache) Run(ctx context.Context) error { cluster.cluster = event.Cluster cluster.invalidate() } - } else if event.Type == watch.Added && isClusterHasApps(c.appInformer.GetStore().List(), event.Cluster) { - go func() { - // warm up cache for cluster with apps - _, _ = c.getSyncedCluster(event.Cluster.Server) - }() + } else { + c.lock.Unlock() + if event.Type == watch.Added && isClusterHasApps(c.appInformer.GetStore().List(), event.Cluster) { + go func() { + // warm up cache for cluster with apps + _, _ = c.getSyncedCluster(event.Cluster.Server) + }() + } } } @@ -287,8 +293,8 @@ func (c *liveStateCache) Run(ctx context.Context) error { } func (c *liveStateCache) GetClustersInfo() []metrics.ClusterInfo { - c.lock.Lock() - defer c.lock.Unlock() + c.lock.RLock() + defer c.lock.RUnlock() res := make([]metrics.ClusterInfo, 0) for _, info := range c.clusters { res = append(res, info.getClusterInfo()) diff --git a/controller/cache/cache_test.go b/controller/cache/cache_test.go index 8cc38aa7565d6..a691929fe9f11 100644 --- a/controller/cache/cache_test.go +++ b/controller/cache/cache_test.go @@ -11,11 +11,11 @@ import ( func TestGetServerVersion(t *testing.T) { now := time.Now() cache := &liveStateCache{ - lock: &sync.Mutex{}, + lock: &sync.RWMutex{}, clusters: map[string]*clusterInfo{ "http://localhost": { syncTime: &now, - lock: &sync.Mutex{}, + lock: &sync.RWMutex{}, serverVersion: "123", }, }} diff --git a/controller/cache/cluster.go b/controller/cache/cluster.go index 95da570d403dd..0679fa3d3bc78 100644 --- a/controller/cache/cluster.go +++ b/controller/cache/cluster.go @@ -9,19 +9,16 @@ import ( "sync" "time" - "k8s.io/client-go/dynamic" - - "k8s.io/apimachinery/pkg/types" - - "github.com/argoproj/argo-cd/controller/metrics" - log "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/dynamic" + "github.com/argoproj/argo-cd/controller/metrics" appv1 "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1" "github.com/argoproj/argo-cd/util" "github.com/argoproj/argo-cd/util/health" @@ -45,8 +42,11 @@ type clusterInfo struct { syncError error apisMeta map[schema.GroupKind]*apiMeta serverVersion string + // namespacedResources is a simple map which indicates a groupKind is namespaced + namespacedResources map[schema.GroupKind]bool - lock *sync.Mutex + // lock is a rw lock which protects the fields of clusterInfo + lock *sync.RWMutex nodes map[kube.ResourceKey]*node nsIndex map[string]map[kube.ResourceKey]*node @@ -176,16 +176,19 @@ func (c *clusterInfo) invalidate() { c.apisMeta[i].watchCancel() } c.apisMeta = nil + c.namespacedResources = nil + c.log.Warnf("invalidated cluster") } func (c *clusterInfo) synced() bool { - if c.syncTime == nil { + syncTime := c.syncTime + if syncTime == nil { return false } if c.syncError != nil { - return time.Now().Before(c.syncTime.Add(clusterRetryTimeout)) + return time.Now().Before(syncTime.Add(clusterRetryTimeout)) } - return time.Now().Before(c.syncTime.Add(clusterSyncTimeout)) + return time.Now().Before(syncTime.Add(clusterSyncTimeout)) } func (c *clusterInfo) stopWatching(gk schema.GroupKind, ns string) { @@ -195,7 +198,7 @@ func (c *clusterInfo) stopWatching(gk schema.GroupKind, ns string) { info.watchCancel() delete(c.apisMeta, gk) c.replaceResourceCache(gk, "", []unstructured.Unstructured{}, ns) - log.Warnf("Stop watching %s not found on %s.", gk, c.cluster.Server) + c.log.Warnf("Stop watching: %s not found", gk) } } @@ -211,9 +214,10 @@ func (c *clusterInfo) startMissingWatches() error { if err != nil { return err } - + namespacedResources := make(map[schema.GroupKind]bool) for i := range apis { api := apis[i] + namespacedResources[api.GroupKind] = api.Meta.Namespaced if _, ok := c.apisMeta[api.GroupKind]; !ok { ctx, cancel := context.WithCancel(context.Background()) info := &apiMeta{namespaced: api.Meta.Namespaced, watchCancel: cancel} @@ -228,10 +232,11 @@ func (c *clusterInfo) startMissingWatches() error { } } } + c.namespacedResources = namespacedResources return nil } -func runSynced(lock *sync.Mutex, action func() error) error { +func runSynced(lock sync.Locker, action func() error) error { lock.Lock() defer lock.Unlock() return action() @@ -265,15 +270,10 @@ func (c *clusterInfo) watchEvents(ctx context.Context, api kube.APIResourceInfo, c.stopWatching(api.GroupKind, ns) return nil } - - err = runSynced(c.lock, func() error { - if errors.IsGone(err) { - info.resourceVersion = "" - log.Warnf("Resource version of %s on %s is too old.", api.GroupKind, c.cluster.Server) - } - return err - }) - + if errors.IsGone(err) { + info.resourceVersion = "" + c.log.Warnf("Resource version of %s is too old", api.GroupKind) + } if err != nil { return err } @@ -304,7 +304,7 @@ func (c *clusterInfo) watchEvents(ctx context.Context, api kube.APIResourceInfo, } } if err != nil { - log.Warnf("Failed to start missing watch: %v", err) + c.log.Warnf("Failed to start missing watch: %v", err) } } else { return fmt.Errorf("Watch %s on %s has closed", api.GroupKind, c.cluster.Server) @@ -388,12 +388,17 @@ func (c *clusterInfo) sync() (err error) { } func (c *clusterInfo) ensureSynced() error { + // first check if cluster is synced *without lock* + if c.synced() { + return c.syncError + } c.lock.Lock() defer c.lock.Unlock() + // before doing any work, check once again now that we have the lock, to see if it got + // synced between the first check and now if c.synced() { return c.syncError } - err := c.sync() syncTime := time.Now() c.syncTime = &syncTime @@ -402,8 +407,8 @@ func (c *clusterInfo) ensureSynced() error { } func (c *clusterInfo) getNamespaceTopLevelResources(namespace string) map[kube.ResourceKey]appv1.ResourceNode { - c.lock.Lock() - defer c.lock.Unlock() + c.lock.RLock() + defer c.lock.RUnlock() nodes := make(map[kube.ResourceKey]appv1.ResourceNode) for _, node := range c.nsIndex[namespace] { if len(node.ownerRefs) == 0 { @@ -444,15 +449,17 @@ func (c *clusterInfo) iterateHierarchy(key kube.ResourceKey, action func(child a } func (c *clusterInfo) isNamespaced(gk schema.GroupKind) bool { - if api, ok := c.apisMeta[gk]; ok && !api.namespaced { - return false + // this is safe to access without a lock since we always replace the entire map instead of mutating keys + if isNamespaced, ok := c.namespacedResources[gk]; ok { + return isNamespaced } + log.Warnf("group/kind %s scope is unknown (known objects: %d). assuming namespaced object", gk, len(c.namespacedResources)) return true } func (c *clusterInfo) getManagedLiveObjs(a *appv1.Application, targetObjs []*unstructured.Unstructured, metricsServer *metrics.MetricsServer) (map[kube.ResourceKey]*unstructured.Unstructured, error) { - c.lock.Lock() - defer c.lock.Unlock() + c.lock.RLock() + defer c.lock.RUnlock() managedObjs := make(map[kube.ResourceKey]*unstructured.Unstructured) // iterate all objects in live state cache to find ones associated with app @@ -462,7 +469,7 @@ func (c *clusterInfo) getManagedLiveObjs(a *appv1.Application, targetObjs []*uns } } config := metrics.AddMetricsTransportWrapper(metricsServer, a, c.cluster.RESTConfig()) - // iterate target objects and identify ones that already exist in the cluster,\ + // iterate target objects and identify ones that already exist in the cluster, // but are simply missing our label lock := &sync.Mutex{} err := util.RunAllAsync(len(targetObjs), func(i int) error { @@ -586,8 +593,8 @@ var ( ) func (c *clusterInfo) getClusterInfo() metrics.ClusterInfo { - c.lock.Lock() - defer c.lock.Unlock() + c.lock.RLock() + defer c.lock.RUnlock() return metrics.ClusterInfo{ APIsCount: len(c.apisMeta), K8SVersion: c.serverVersion, diff --git a/controller/cache/cluster_test.go b/controller/cache/cluster_test.go index 894ea0970a9db..4b5fbba72b3e1 100644 --- a/controller/cache/cluster_test.go +++ b/controller/cache/cluster_test.go @@ -153,7 +153,7 @@ func newCluster(objs ...*unstructured.Unstructured) *clusterInfo { func newClusterExt(kubectl kube.Kubectl) *clusterInfo { return &clusterInfo{ - lock: &sync.Mutex{}, + lock: &sync.RWMutex{}, nodes: make(map[kube.ResourceKey]*node), onObjectUpdated: func(managedByApp map[string]bool, reference corev1.ObjectReference) {}, kubectl: kubectl, diff --git a/controller/metrics/metrics.go b/controller/metrics/metrics.go index 84c82105d0b37..7af193d723296 100644 --- a/controller/metrics/metrics.go +++ b/controller/metrics/metrics.go @@ -119,7 +119,7 @@ func NewMetricsServer(addr string, appLister applister.ApplicationLister, health clusterEventsCounter := prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "argocd_cluster_events_total", Help: "Number of processes k8s resource events.", - }, descClusterDefaultLabels) + }, append(descClusterDefaultLabels, "group", "kind")) registry.MustRegister(clusterEventsCounter) return &MetricsServer{ @@ -164,8 +164,8 @@ func (m *MetricsServer) DecKubectlExecPending(command string) { } // IncClusterEventsCount increments the number of cluster events -func (m *MetricsServer) IncClusterEventsCount(server string) { - m.clusterEventsCounter.WithLabelValues(server).Inc() +func (m *MetricsServer) IncClusterEventsCount(server, group, kind string) { + m.clusterEventsCounter.WithLabelValues(server, group, kind).Inc() } // IncKubernetesRequest increments the kubernetes requests counter for an application diff --git a/controller/state.go b/controller/state.go index 29fd1031877d7..40352f1939306 100644 --- a/controller/state.go +++ b/controller/state.go @@ -31,6 +31,7 @@ import ( "github.com/argoproj/argo-cd/util/resource" "github.com/argoproj/argo-cd/util/resource/ignore" "github.com/argoproj/argo-cd/util/settings" + "github.com/argoproj/argo-cd/util/stats" ) type managedResource struct { @@ -71,6 +72,8 @@ type comparisonResult struct { hooks []*unstructured.Unstructured diffNormalizer diff.Normalizer appSourceType v1alpha1.ApplicationSourceType + // timings maps phases of comparison to the duration it took to complete (for statistical purposes) + timings map[string]time.Duration } func (cr *comparisonResult) targetObjs() []*unstructured.Unstructured { @@ -97,14 +100,17 @@ type appStateManager struct { } func (m *appStateManager) getRepoObjs(app *v1alpha1.Application, source v1alpha1.ApplicationSource, appLabelKey, revision string, noCache bool) ([]*unstructured.Unstructured, []*unstructured.Unstructured, *apiclient.ManifestResponse, error) { + ts := stats.NewTimingStats() helmRepos, err := m.db.ListHelmRepositories(context.Background()) if err != nil { return nil, nil, nil, err } + ts.AddCheckpoint("helm_ms") repo, err := m.db.GetRepository(context.Background(), source.RepoURL) if err != nil { return nil, nil, nil, err } + ts.AddCheckpoint("repo_ms") conn, repoClient, err := m.repoClientset.NewRepoServerClient() if err != nil { return nil, nil, nil, err @@ -119,7 +125,7 @@ func (m *appStateManager) getRepoObjs(app *v1alpha1.Application, source v1alpha1 if err != nil { return nil, nil, nil, err } - + ts.AddCheckpoint("plugins_ms") tools := make([]*appv1.ConfigManagementPlugin, len(plugins)) for i := range plugins { tools[i] = &plugins[i] @@ -129,10 +135,12 @@ func (m *appStateManager) getRepoObjs(app *v1alpha1.Application, source v1alpha1 if err != nil { return nil, nil, nil, err } + ts.AddCheckpoint("build_options_ms") serverVersion, err := m.liveStateCache.GetServerVersion(app.Spec.Destination.Server) if err != nil { return nil, nil, nil, err } + ts.AddCheckpoint("version_ms") manifestInfo, err := repoClient.GenerateManifest(context.Background(), &apiclient.ManifestRequest{ Repo: repo, Repos: helmRepos, @@ -151,10 +159,18 @@ func (m *appStateManager) getRepoObjs(app *v1alpha1.Application, source v1alpha1 if err != nil { return nil, nil, nil, err } + ts.AddCheckpoint("manifests_ms") targetObjs, hooks, err := unmarshalManifests(manifestInfo.Manifests) if err != nil { return nil, nil, nil, err } + ts.AddCheckpoint("unmarshal_ms") + logCtx := log.WithField("application", app.Name) + for k, v := range ts.Timings() { + logCtx = logCtx.WithField(k, v.Milliseconds()) + } + logCtx = logCtx.WithField("time_ms", time.Since(ts.StartTime).Milliseconds()) + logCtx.Info("getRepoObjs stats") return targetObjs, hooks, manifestInfo, nil } @@ -253,27 +269,33 @@ func dedupLiveResources(targetObjs []*unstructured.Unstructured, liveObjsByKey m } } -func (m *appStateManager) getComparisonSettings(app *appv1.Application) (string, map[string]v1alpha1.ResourceOverride, diff.Normalizer, error) { +func (m *appStateManager) getComparisonSettings(app *appv1.Application) (string, map[string]v1alpha1.ResourceOverride, diff.Normalizer, *settings.ResourcesFilter, error) { resourceOverrides, err := m.settingsMgr.GetResourceOverrides() if err != nil { - return "", nil, nil, err + return "", nil, nil, nil, err } appLabelKey, err := m.settingsMgr.GetAppInstanceLabelKey() if err != nil { - return "", nil, nil, err + return "", nil, nil, nil, err } diffNormalizer, err := argo.NewDiffNormalizer(app.Spec.IgnoreDifferences, resourceOverrides) if err != nil { - return "", nil, nil, err + return "", nil, nil, nil, err } - return appLabelKey, resourceOverrides, diffNormalizer, nil + resFilter, err := m.settingsMgr.GetResourcesFilter() + if err != nil { + return "", nil, nil, nil, err + } + return appLabelKey, resourceOverrides, diffNormalizer, resFilter, nil } // CompareAppState compares application git state to the live app state, using the specified // revision and supplied source. If revision or overrides are empty, then compares against // revision and overrides in the app spec. func (m *appStateManager) CompareAppState(app *v1alpha1.Application, project *appv1.AppProject, revision string, source v1alpha1.ApplicationSource, noCache bool, localManifests []string) *comparisonResult { - appLabelKey, resourceOverrides, diffNormalizer, err := m.getComparisonSettings(app) + ts := stats.NewTimingStats() + appLabelKey, resourceOverrides, diffNormalizer, resFilter, err := m.getComparisonSettings(app) + ts.AddCheckpoint("settings_ms") // return unknown comparison result if basic comparison settings cannot be loaded if err != nil { @@ -314,32 +336,27 @@ func (m *appStateManager) CompareAppState(app *v1alpha1.Application, project *ap } manifestInfo = nil } + ts.AddCheckpoint("git_ms") targetObjs, dedupConditions, err := DeduplicateTargetObjects(app.Spec.Destination.Server, app.Spec.Destination.Namespace, targetObjs, m.liveStateCache) if err != nil { conditions = append(conditions, v1alpha1.ApplicationCondition{Type: v1alpha1.ApplicationConditionComparisonError, Message: err.Error(), LastTransitionTime: &now}) } conditions = append(conditions, dedupConditions...) - - resFilter, err := m.settingsMgr.GetResourcesFilter() - if err != nil { - conditions = append(conditions, v1alpha1.ApplicationCondition{Type: v1alpha1.ApplicationConditionComparisonError, Message: err.Error(), LastTransitionTime: &now}) - } else { - for i := len(targetObjs) - 1; i >= 0; i-- { - targetObj := targetObjs[i] - gvk := targetObj.GroupVersionKind() - if resFilter.IsExcludedResource(gvk.Group, gvk.Kind, app.Spec.Destination.Server) { - targetObjs = append(targetObjs[:i], targetObjs[i+1:]...) - conditions = append(conditions, v1alpha1.ApplicationCondition{ - Type: v1alpha1.ApplicationConditionExcludedResourceWarning, - Message: fmt.Sprintf("Resource %s/%s %s is excluded in the settings", gvk.Group, gvk.Kind, targetObj.GetName()), - LastTransitionTime: &now, - }) - } + for i := len(targetObjs) - 1; i >= 0; i-- { + targetObj := targetObjs[i] + gvk := targetObj.GroupVersionKind() + if resFilter.IsExcludedResource(gvk.Group, gvk.Kind, app.Spec.Destination.Server) { + targetObjs = append(targetObjs[:i], targetObjs[i+1:]...) + conditions = append(conditions, v1alpha1.ApplicationCondition{ + Type: v1alpha1.ApplicationConditionExcludedResourceWarning, + Message: fmt.Sprintf("Resource %s/%s %s is excluded in the settings", gvk.Group, gvk.Kind, targetObj.GetName()), + LastTransitionTime: &now, + }) } } + ts.AddCheckpoint("dedup_ms") - logCtx.Debugf("Generated config manifests") liveObjByKey, err := m.liveStateCache.GetManagedLiveObjs(app, targetObjs) if err != nil { liveObjByKey = make(map[kubeutil.ResourceKey]*unstructured.Unstructured) @@ -354,7 +371,6 @@ func (m *appStateManager) CompareAppState(app *v1alpha1.Application, project *ap } } - logCtx.Debugf("Retrieved lived manifests") for _, liveObj := range liveObjByKey { if liveObj != nil { appInstanceName := kubeutil.GetAppInstanceLabel(liveObj, appLabelKey) @@ -383,7 +399,8 @@ func (m *appStateManager) CompareAppState(app *v1alpha1.Application, project *ap managedLiveObj[i] = nil } } - logCtx.Debugf("built managed objects list") + ts.AddCheckpoint("live_ms") + // Everything remaining in liveObjByKey are "extra" resources that aren't tracked in git. // The following adds all the extras to the managedLiveObj list and backfills the targetObj // list with nils, so that the lists are of equal lengths for comparison purposes. @@ -399,6 +416,7 @@ func (m *appStateManager) CompareAppState(app *v1alpha1.Application, project *ap failedToLoadObjs = true conditions = append(conditions, v1alpha1.ApplicationCondition{Type: v1alpha1.ApplicationConditionComparisonError, Message: err.Error(), LastTransitionTime: &now}) } + ts.AddCheckpoint("diff_ms") syncCode := v1alpha1.SyncStatusCodeSynced managedResources := make([]managedResource, len(targetObjs)) @@ -488,6 +506,7 @@ func (m *appStateManager) CompareAppState(app *v1alpha1.Application, project *ap if manifestInfo != nil { syncStatus.Revision = manifestInfo.Revision } + ts.AddCheckpoint("sync_ms") healthStatus, err := health.SetApplicationHealth(resourceSummaries, GetLiveObjs(managedResources), resourceOverrides, func(obj *unstructured.Unstructured) bool { return !isSelfReferencedApp(app, kubeutil.GetObjectRef(obj)) @@ -514,6 +533,8 @@ func (m *appStateManager) CompareAppState(app *v1alpha1.Application, project *ap appv1.ApplicationConditionRepeatedResourceWarning: true, appv1.ApplicationConditionExcludedResourceWarning: true, }) + ts.AddCheckpoint("health_ms") + compRes.timings = ts.Timings() return &compRes } diff --git a/hack/test.sh b/hack/test.sh index c859342e648ea..a66c2a185f6c3 100755 --- a/hack/test.sh +++ b/hack/test.sh @@ -20,4 +20,4 @@ report() { trap 'report' EXIT -go test -v $* 2>&1 | tee $TEST_RESULTS/test.out +go test -failfast $* 2>&1 | tee $TEST_RESULTS/test.out diff --git a/server/application/application.go b/server/application/application.go index 6ff1b94b148b2..60c98ab42e826 100644 --- a/server/application/application.go +++ b/server/application/application.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "reflect" + "strconv" "strings" "time" @@ -20,6 +21,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" @@ -30,6 +32,7 @@ import ( "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1" appv1 "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1" appclientset "github.com/argoproj/argo-cd/pkg/client/clientset/versioned" + applisters "github.com/argoproj/argo-cd/pkg/client/listers/application/v1alpha1" "github.com/argoproj/argo-cd/reposerver/apiclient" servercache "github.com/argoproj/argo-cd/server/cache" "github.com/argoproj/argo-cd/server/rbacpolicy" @@ -52,6 +55,7 @@ type Server struct { ns string kubeclientset kubernetes.Interface appclientset appclientset.Interface + appLister applisters.ApplicationNamespaceLister repoClientset apiclient.Clientset kubectl kube.Kubectl db db.ArgoDB @@ -67,6 +71,7 @@ func NewServer( namespace string, kubeclientset kubernetes.Interface, appclientset appclientset.Interface, + appLister applisters.ApplicationNamespaceLister, repoClientset apiclient.Clientset, cache *servercache.Cache, kubectl kube.Kubectl, @@ -79,6 +84,7 @@ func NewServer( return &Server{ ns: namespace, appclientset: appclientset, + appLister: appLister, kubeclientset: kubeclientset, cache: cache, db: db, @@ -98,23 +104,25 @@ func appRBACName(app appv1.Application) string { // List returns list of applications func (s *Server) List(ctx context.Context, q *application.ApplicationQuery) (*appv1.ApplicationList, error) { - appList, err := s.appclientset.ArgoprojV1alpha1().Applications(s.ns).List(metav1.ListOptions{LabelSelector: q.Selector}) + labelsMap, err := labels.ConvertSelectorToLabelsMap(q.Selector) + if err != nil { + return nil, err + } + apps, err := s.appLister.List(labelsMap.AsSelector()) if err != nil { return nil, err } newItems := make([]appv1.Application, 0) - for _, a := range appList.Items { - if s.enf.Enforce(ctx.Value("claims"), rbacpolicy.ResourceApplications, rbacpolicy.ActionGet, appRBACName(a)) { - newItems = append(newItems, a) + for _, a := range apps { + if s.enf.Enforce(ctx.Value("claims"), rbacpolicy.ResourceApplications, rbacpolicy.ActionGet, appRBACName(*a)) { + newItems = append(newItems, *a) } } newItems = argoutil.FilterByProjects(newItems, q.Projects) - for i := range newItems { - app := newItems[i] - newItems[i] = app + appList := appv1.ApplicationList{ + Items: newItems, } - appList.Items = newItems - return appList, nil + return &appList, nil } // Create creates an application @@ -131,39 +139,44 @@ func (s *Server) Create(ctx context.Context, q *application.ApplicationCreateReq if err != nil { return nil, err } - out, err := s.appclientset.ArgoprojV1alpha1().Applications(s.ns).Create(&a) - if apierr.IsAlreadyExists(err) { - // act idempotent if existing spec matches new spec - existing, getErr := s.appclientset.ArgoprojV1alpha1().Applications(s.ns).Get(a.Name, metav1.GetOptions{}) - if getErr != nil { - return nil, status.Errorf(codes.Internal, "unable to check existing application details: %v", getErr) - } - if q.Upsert != nil && *q.Upsert { - if err := s.enf.EnforceErr(ctx.Value("claims"), rbacpolicy.ResourceApplications, rbacpolicy.ActionUpdate, appRBACName(a)); err != nil { - return nil, err - } - out, err = s.updateApp(existing, &a, ctx, true) - } else { - if !reflect.DeepEqual(existing.Spec, a.Spec) || - !reflect.DeepEqual(existing.Labels, a.Labels) || - !reflect.DeepEqual(existing.Annotations, a.Annotations) || - !reflect.DeepEqual(existing.Finalizers, a.Finalizers) { - - return nil, status.Errorf(codes.InvalidArgument, "existing application spec is different, use upsert flag to force update") - } - return existing, nil - } + created, err := s.appclientset.ArgoprojV1alpha1().Applications(s.ns).Create(&a) + if err == nil { + s.logAppEvent(created, ctx, argo.EventReasonResourceCreated, "created application") + s.waitSync(created) + return created, nil } + if !apierr.IsAlreadyExists(err) { + return nil, err + } + // act idempotent if existing spec matches new spec + existing, err := s.appLister.Get(a.Name) + if err != nil { + return nil, status.Errorf(codes.Internal, "unable to check existing application details: %v", err) + } + equalSpecs := reflect.DeepEqual(existing.Spec, a.Spec) && + reflect.DeepEqual(existing.Labels, a.Labels) && + reflect.DeepEqual(existing.Annotations, a.Annotations) && + reflect.DeepEqual(existing.Finalizers, a.Finalizers) - if err == nil { - s.logAppEvent(out, ctx, argo.EventReasonResourceCreated, "created application") + if equalSpecs { + return existing, nil + } + if q.Upsert == nil || !*q.Upsert { + return nil, status.Errorf(codes.InvalidArgument, "existing application spec is different, use upsert flag to force update") + } + if err := s.enf.EnforceErr(ctx.Value("claims"), rbacpolicy.ResourceApplications, rbacpolicy.ActionUpdate, appRBACName(a)); err != nil { + return nil, err } - return out, err + updated, err := s.updateApp(existing, &a, ctx, true) + if err != nil { + return nil, err + } + return updated, nil } // GetManifests returns application manifests func (s *Server) GetManifests(ctx context.Context, q *application.ApplicationManifestQuery) (*apiclient.ManifestResponse, error) { - a, err := s.appclientset.ArgoprojV1alpha1().Applications(s.ns).Get(*q.Name, metav1.GetOptions{}) + a, err := s.appLister.Get(*q.Name) if err != nil { return nil, err } @@ -251,8 +264,10 @@ func (s *Server) GetManifests(ctx context.Context, q *application.ApplicationMan // Get returns an application by name func (s *Server) Get(ctx context.Context, q *application.ApplicationQuery) (*appv1.Application, error) { - appIf := s.appclientset.ArgoprojV1alpha1().Applications(s.ns) - a, err := appIf.Get(*q.Name, metav1.GetOptions{}) + // We must use a client Get instead of an informer Get, because it's common to call Get immediately + // following a Watch (which is not yet powered by an informer), and the Get must reflect what was + // previously seen by the client. + a, err := s.appclientset.ArgoprojV1alpha1().Applications(s.ns).Get(*q.Name, metav1.GetOptions{}) if err != nil { return nil, err } @@ -264,6 +279,7 @@ func (s *Server) Get(ctx context.Context, q *application.ApplicationQuery) (*app if *q.Refresh == string(appv1.RefreshTypeHard) { refreshType = appv1.RefreshTypeHard } + appIf := s.appclientset.ArgoprojV1alpha1().Applications(s.ns) _, err = argoutil.RefreshApp(appIf, *q.Name, refreshType) if err != nil { return nil, err @@ -278,7 +294,7 @@ func (s *Server) Get(ctx context.Context, q *application.ApplicationQuery) (*app // ListResourceEvents returns a list of event resources func (s *Server) ListResourceEvents(ctx context.Context, q *application.ApplicationResourceEventsQuery) (*v1.EventList, error) { - a, err := s.appclientset.ArgoprojV1alpha1().Applications(s.ns).Get(*q.Name, metav1.GetOptions{}) + a, err := s.appLister.Get(*q.Name) if err != nil { return nil, err } @@ -354,6 +370,39 @@ func mergeStringMaps(items ...map[string]string) map[string]string { return res } +var informerSyncTimeout = 2 * time.Second + +// waitSync is a helper to wait until the application informer cache is synced after create/update. +// It waits until the app in the informer, has a resource version greater than the version in the +// supplied app, or after 2 seconds, whichever comes first. Returns true if synced. +// We use an informer cache for read operations (Get, List). Since the cache is only +// eventually consistent, it is possible that it doesn't reflect an application change immediately +// after a mutating API call (create/update). This function should be called after a creates & +// update to give a probable (but not guaranteed) chance of being up-to-date after the create/update. +func (s *Server) waitSync(app *appv1.Application) { + logCtx := log.WithField("application", app.Name) + deadline := time.Now().Add(informerSyncTimeout) + minVersion, err := strconv.Atoi(app.ResourceVersion) + if err != nil { + logCtx.Warnf("waitSync failed: could not parse resource version %s", app.ResourceVersion) + time.Sleep(50 * time.Millisecond) // sleep anyways + return + } + for { + if currApp, err := s.appLister.Get(app.Name); err == nil { + currVersion, err := strconv.Atoi(currApp.ResourceVersion) + if err == nil && currVersion >= minVersion { + return + } + } + if time.Now().After(deadline) { + break + } + time.Sleep(20 * time.Millisecond) + } + logCtx.Warnf("waitSync failed: timed out") +} + func (s *Server) updateApp(app *appv1.Application, newApp *appv1.Application, ctx context.Context, merge bool) (*appv1.Application, error) { for i := 0; i < 10; i++ { app.Spec = newApp.Spec @@ -370,6 +419,7 @@ func (s *Server) updateApp(app *appv1.Application, newApp *appv1.Application, ct res, err := s.appclientset.ArgoprojV1alpha1().Applications(s.ns).Update(app) if err == nil { s.logAppEvent(app, ctx, argo.EventReasonResourceUpdated, "updated application spec") + s.waitSync(res) return res, nil } if !apierr.IsConflict(err) { @@ -504,7 +554,6 @@ func (s *Server) Delete(ctx context.Context, q *application.ApplicationDeleteReq if err != nil && !apierr.IsNotFound(err) { return nil, err } - s.logAppEvent(a, ctx, argo.EventReasonResourceDeleted, "deleted application") return &application.ApplicationResponse{}, nil } @@ -688,7 +737,7 @@ func (s *Server) getAppResources(ctx context.Context, a *appv1.Application) (*ap } func (s *Server) getAppResource(ctx context.Context, action string, q *application.ApplicationResourceRequest) (*appv1.ResourceNode, *rest.Config, *appv1.Application, error) { - a, err := s.appclientset.ArgoprojV1alpha1().Applications(s.ns).Get(*q.Name, metav1.GetOptions{}) + a, err := s.appLister.Get(*q.Name) if err != nil { return nil, nil, nil, err } @@ -810,7 +859,7 @@ func (s *Server) DeleteResource(ctx context.Context, q *application.ApplicationR } func (s *Server) ResourceTree(ctx context.Context, q *application.ResourcesQuery) (*appv1.ApplicationTree, error) { - a, err := s.appclientset.ArgoprojV1alpha1().Applications(s.ns).Get(*q.ApplicationName, metav1.GetOptions{}) + a, err := s.appLister.Get(*q.ApplicationName) if err != nil { return nil, err } @@ -821,7 +870,7 @@ func (s *Server) ResourceTree(ctx context.Context, q *application.ResourcesQuery } func (s *Server) RevisionMetadata(ctx context.Context, q *application.RevisionMetadataQuery) (*v1alpha1.RevisionMetadata, error) { - a, err := s.appclientset.ArgoprojV1alpha1().Applications(s.ns).Get(q.GetName(), metav1.GetOptions{}) + a, err := s.appLister.Get(q.GetName()) if err != nil { return nil, err } @@ -848,7 +897,7 @@ func isMatchingResource(q *application.ResourcesQuery, key kube.ResourceKey) boo } func (s *Server) ManagedResources(ctx context.Context, q *application.ResourcesQuery) (*application.ManagedResourcesResponse, error) { - a, err := s.appclientset.ArgoprojV1alpha1().Applications(s.ns).Get(*q.ApplicationName, metav1.GetOptions{}) + a, err := s.appLister.Get(*q.ApplicationName) if err != nil { return nil, err } @@ -1156,8 +1205,10 @@ func (s *Server) TerminateOperation(ctx context.Context, termOpReq *application. return nil, status.Errorf(codes.InvalidArgument, "Unable to terminate operation. No operation is in progress") } a.Status.OperationState.Phase = appv1.OperationTerminating - _, err = s.appclientset.ArgoprojV1alpha1().Applications(s.ns).Update(a) + updated, err := s.appclientset.ArgoprojV1alpha1().Applications(s.ns).Update(a) if err == nil { + s.waitSync(updated) + s.logAppEvent(a, ctx, argo.EventReasonResourceUpdated, "terminated running operation") return &application.OperationTerminateResponse{}, nil } if !apierr.IsConflict(err) { @@ -1169,7 +1220,6 @@ func (s *Server) TerminateOperation(ctx context.Context, termOpReq *application. if err != nil { return nil, err } - s.logAppEvent(a, ctx, argo.EventReasonResourceUpdated, "terminated running operation") } return nil, status.Errorf(codes.Internal, "Failed to terminate app. Too many conflicts") } diff --git a/server/application/application_test.go b/server/application/application_test.go index 794144d4a7497..e61b2b3d76421 100644 --- a/server/application/application_test.go +++ b/server/application/application_test.go @@ -18,6 +18,7 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes/fake" kubetesting "k8s.io/client-go/testing" + k8scache "k8s.io/client-go/tools/cache" "github.com/argoproj/argo-cd/common" "github.com/argoproj/argo-cd/errors" @@ -92,8 +93,8 @@ func newTestAppServer(objects ...runtime.Object) *Server { "server.secretkey": []byte("test"), }, }) - db := db.NewDB(testNamespace, settings.NewSettingsManager(context.Background(), kubeclientset, testNamespace), kubeclientset) ctx := context.Background() + db := db.NewDB(testNamespace, settings.NewSettingsManager(ctx, kubeclientset, testNamespace), kubeclientset) _, err := db.CreateRepository(ctx, fakeRepo()) errors.CheckError(err) _, err = db.CreateCluster(ctx, fakeCluster()) @@ -148,12 +149,22 @@ func newTestAppServer(objects ...runtime.Object) *Server { enforcer.SetDefaultRole("role:admin") enforcer.SetClaimsEnforcerFunc(rbacpolicy.NewRBACPolicyEnforcer(enforcer, fakeProjLister).EnforceClaims) - settingsMgr := settings.NewSettingsManager(context.Background(), kubeclientset, testNamespace) + settingsMgr := settings.NewSettingsManager(ctx, kubeclientset, testNamespace) + + // populate the app informer with the fake objects + appInformer := factory.Argoproj().V1alpha1().Applications().Informer() + // TODO(jessesuen): probably should return cancel function so tests can stop background informer + //ctx, cancel := context.WithCancel(context.Background()) + go appInformer.Run(ctx.Done()) + if !k8scache.WaitForCacheSync(ctx.Done(), appInformer.HasSynced) { + panic("Timed out waiting forfff caches to sync") + } server := NewServer( testNamespace, kubeclientset, fakeAppsClientset, + factory.Argoproj().V1alpha1().Applications().Lister().Applications(testNamespace), mockRepoClient, nil, &kubetest.MockKubectlCmd{}, diff --git a/server/server.go b/server/server.go index c380165c52445..66c4d295beedb 100644 --- a/server/server.go +++ b/server/server.go @@ -15,9 +15,6 @@ import ( "strings" "time" - "gopkg.in/yaml.v2" - v1 "k8s.io/api/core/v1" - "github.com/dgrijalva/jwt-go" golang_proto "github.com/golang/protobuf/proto" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" @@ -36,6 +33,8 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/reflection" "google.golang.org/grpc/status" + "gopkg.in/yaml.v2" + v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" @@ -58,6 +57,7 @@ import ( "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1" appclientset "github.com/argoproj/argo-cd/pkg/client/clientset/versioned" appinformer "github.com/argoproj/argo-cd/pkg/client/informers/externalversions" + applisters "github.com/argoproj/argo-cd/pkg/client/listers/application/v1alpha1" repoapiclient "github.com/argoproj/argo-cd/reposerver/apiclient" "github.com/argoproj/argo-cd/server/account" "github.com/argoproj/argo-cd/server/application" @@ -128,6 +128,8 @@ type ArgoCDServer struct { enf *rbac.Enforcer projInformer cache.SharedIndexInformer policyEnforcer *rbacpolicy.RBACPolicyEnforcer + appInformer cache.SharedIndexInformer + appLister applisters.ApplicationNamespaceLister // stopCh is the channel which when closed, will shutdown the Argo CD server stopCh chan struct{} @@ -182,6 +184,9 @@ func NewServer(ctx context.Context, opts ArgoCDServerOpts) *ArgoCDServer { projInformer := factory.Argoproj().V1alpha1().AppProjects().Informer() projLister := factory.Argoproj().V1alpha1().AppProjects().Lister().AppProjects(opts.Namespace) + appInformer := factory.Argoproj().V1alpha1().Applications().Informer() + appLister := factory.Argoproj().V1alpha1().Applications().Lister().Applications(opts.Namespace) + enf := rbac.NewEnforcer(opts.KubeClientset, opts.Namespace, common.ArgoCDRBACConfigMapName, nil) enf.EnableEnforce(!opts.DisableAuth) err = enf.SetBuiltinPolicy(assets.BuiltinPolicyCSV) @@ -199,6 +204,8 @@ func NewServer(ctx context.Context, opts ArgoCDServerOpts) *ArgoCDServer { settingsMgr: settingsMgr, enf: enf, projInformer: projInformer, + appInformer: appInformer, + appLister: appLister, policyEnforcer: policyEnf, } } @@ -267,6 +274,7 @@ func (a *ArgoCDServer) Run(ctx context.Context, port int, metricsPort int) { common.GetVersion(), port, a.settings.URL, a.useTLS(), a.Namespace, a.settings.IsSSOConfigured()) go a.projInformer.Run(ctx.Done()) + go a.appInformer.Run(ctx.Done()) go func() { a.checkServeErr("grpcS", grpcS.Serve(grpcL)) }() go func() { a.checkServeErr("httpS", httpS.Serve(httpL)) }() if a.useTLS() { @@ -277,7 +285,7 @@ func (a *ArgoCDServer) Run(ctx context.Context, port int, metricsPort int) { go a.rbacPolicyLoader(ctx) go func() { a.checkServeErr("tcpm", tcpm.Serve()) }() go func() { a.checkServeErr("metrics", metricsServ.ListenAndServe()) }() - if !cache.WaitForCacheSync(ctx.Done(), a.projInformer.HasSynced) { + if !cache.WaitForCacheSync(ctx.Done(), a.projInformer.HasSynced, a.appInformer.HasSynced) { log.Fatal("Timed out waiting for project cache to sync") } @@ -464,7 +472,7 @@ func (a *ArgoCDServer) newGRPCServer() *grpc.Server { repoCredsService := repocreds.NewServer(a.RepoClientset, db, a.enf, a.settingsMgr) sessionService := session.NewServer(a.sessionMgr, a) projectLock := util.NewKeyLock() - applicationService := application.NewServer(a.Namespace, a.KubeClientset, a.AppClientset, a.RepoClientset, a.Cache, kubectl, db, a.enf, projectLock, a.settingsMgr) + applicationService := application.NewServer(a.Namespace, a.KubeClientset, a.AppClientset, a.appLister, a.RepoClientset, a.Cache, kubectl, db, a.enf, projectLock, a.settingsMgr) projectService := project.NewServer(a.Namespace, a.KubeClientset, a.AppClientset, a.enf, projectLock, a.sessionMgr) settingsService := settings.NewServer(a.settingsMgr, a.DisableAdmin) accountService := account.NewServer(a.sessionMgr, a.settingsMgr, a.enf) diff --git a/util/settings/resources_filter.go b/util/settings/resources_filter.go index e1083031945b7..bb311432e928e 100644 --- a/util/settings/resources_filter.go +++ b/util/settings/resources_filter.go @@ -1,5 +1,15 @@ package settings +// The core exclusion list are K8s resources that we assume will never be managed by operators, +// and are never child objects of managed resources that need to be presented in the resource tree. +// This list contains high volume and high churn metadata objects which we exclude for performance +// reasons, reducing connections and load to the K8s API servers of managed clusters. +var coreExcludedResources = []FilteredResource{ + {APIGroups: []string{"events.k8s.io", "metrics.k8s.io"}}, + {APIGroups: []string{""}, Kinds: []string{"Event", "Node"}}, + {APIGroups: []string{"coordination.k8s.io"}, Kinds: []string{"Lease"}}, +} + type ResourcesFilter struct { // ResourceExclusions holds the api groups, kinds per cluster to exclude from Argo CD's watch ResourceExclusions []FilteredResource @@ -8,10 +18,6 @@ type ResourcesFilter struct { } func (rf *ResourcesFilter) getExcludedResources() []FilteredResource { - coreExcludedResources := []FilteredResource{ - {APIGroups: []string{"events.k8s.io", "metrics.k8s.io"}}, - {APIGroups: []string{""}, Kinds: []string{"Event"}}, - } return append(coreExcludedResources, rf.ResourceExclusions...) } diff --git a/util/stats/stats.go b/util/stats/stats.go index ed98aebf36c1c..aa62f531589ce 100644 --- a/util/stats/stats.go +++ b/util/stats/stats.go @@ -1,82 +1,52 @@ package stats import ( - "os" - "os/signal" - "runtime" - "runtime/pprof" - "syscall" "time" - - log "github.com/sirupsen/logrus" ) -// StartStatsTicker starts a goroutine which dumps stats at a specified interval -func StartStatsTicker(d time.Duration) { - ticker := time.NewTicker(d) - go func() { - for { - <-ticker.C - LogStats() - } - }() +// mock out time.Now() for unit tests +var now = time.Now + +// TimingStats is a helper to breakdown the timing of an expensive function call +// Usage: +// ts := NewTimingStats() +// ts.AddCheckpoint("checkpoint-1") +// ... +// ts.AddCheckpoint("checkpoint-2") +// ... +// ts.AddCheckpoint("checkpoint-3") +// ts.Timings() +type TimingStats struct { + StartTime time.Time + + checkpoints []tsCheckpoint } -// RegisterStackDumper spawns a goroutine which dumps stack trace upon a SIGUSR1 -func RegisterStackDumper() { - go func() { - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGUSR1) - for { - <-sigs - LogStack() - } - }() +type tsCheckpoint struct { + name string + time time.Time } -// RegisterHeapDumper spawns a goroutine which dumps heap profile upon a SIGUSR2 -func RegisterHeapDumper(filePath string) { - go func() { - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGUSR2) - for { - <-sigs - runtime.GC() - if _, err := os.Stat(filePath); err == nil { - err = os.Remove(filePath) - if err != nil { - log.Warnf("could not delete heap profile file: %v", err) - return - } - } - f, err := os.Create(filePath) - if err != nil { - log.Warnf("could not create heap profile file: %v", err) - return - } - - if err := pprof.WriteHeapProfile(f); err != nil { - log.Warnf("could not write heap profile: %v", err) - return - } else { - log.Infof("dumped heap profile to %s", filePath) - } - } - }() +func NewTimingStats() *TimingStats { + return &TimingStats{ + StartTime: now(), + } } -// LogStats logs runtime statistics -func LogStats() { - var m runtime.MemStats - runtime.ReadMemStats(&m) - log.Infof("Alloc=%v TotalAlloc=%v Sys=%v NumGC=%v Goroutines=%d", - m.Alloc/1024, m.TotalAlloc/1024, m.Sys/1024, m.NumGC, runtime.NumGoroutine()) - +func (t *TimingStats) AddCheckpoint(name string) { + cp := tsCheckpoint{ + name: name, + time: now(), + } + t.checkpoints = append(t.checkpoints, cp) } -// LogStack will log the current stack -func LogStack() { - buf := make([]byte, 1<<20) - stacklen := runtime.Stack(buf, true) - log.Infof("*** goroutine dump...\n%s\n*** end\n", buf[:stacklen]) +func (t *TimingStats) Timings() map[string]time.Duration { + timings := make(map[string]time.Duration) + prev := t.StartTime + for _, cp := range t.checkpoints { + timings[cp.name] = cp.time.Sub(prev) + prev = cp.time + } + return timings } diff --git a/util/stats/stats_test.go b/util/stats/stats_test.go new file mode 100644 index 0000000000000..4e2b0785b1fbc --- /dev/null +++ b/util/stats/stats_test.go @@ -0,0 +1,31 @@ +package stats + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestTimingStats(t *testing.T) { + start := time.Now() + now = func() time.Time { + return start + } + defer func() { + now = time.Now + }() + ts := NewTimingStats() + now = func() time.Time { + return start.Add(100 * time.Millisecond) + } + ts.AddCheckpoint("checkpoint-1") + now = func() time.Time { + return start.Add(300 * time.Millisecond) + } + ts.AddCheckpoint("checkpoint-2") + timings := ts.Timings() + assert.Len(t, timings, 2) + assert.Equal(t, 100*time.Millisecond, timings["checkpoint-1"]) + assert.Equal(t, 200*time.Millisecond, timings["checkpoint-2"]) +}