diff --git a/changelogs/unreleased/1235-skriss b/changelogs/unreleased/1235-skriss new file mode 100644 index 0000000000..a2c55f75c6 --- /dev/null +++ b/changelogs/unreleased/1235-skriss @@ -0,0 +1 @@ +Fix concurrency bug in code ensuring restic repository exists diff --git a/pkg/restic/repository_ensurer.go b/pkg/restic/repository_ensurer.go index d0d38a35a0..0e4c03957e 100644 --- a/pkg/restic/repository_ensurer.go +++ b/pkg/restic/repository_ensurer.go @@ -35,18 +35,31 @@ import ( // repositoryEnsurer ensures that Velero restic repositories are created and ready. type repositoryEnsurer struct { + log logrus.FieldLogger repoLister velerov1listers.ResticRepositoryLister repoClient velerov1client.ResticRepositoriesGetter readyChansLock sync.Mutex readyChans map[string]chan *velerov1api.ResticRepository + + // repoLocksMu synchronizes reads/writes to the repoLocks map itself + // since maps are not threadsafe. + repoLocksMu sync.Mutex + repoLocks map[repoKey]*sync.Mutex +} + +type repoKey struct { + volumeNamespace string + backupLocation string } func newRepositoryEnsurer(repoInformer velerov1informers.ResticRepositoryInformer, repoClient velerov1client.ResticRepositoriesGetter, log logrus.FieldLogger) *repositoryEnsurer { r := &repositoryEnsurer{ + log: log, repoLister: repoInformer.Lister(), repoClient: repoClient, readyChans: make(map[string]chan *velerov1api.ResticRepository), + repoLocks: make(map[repoKey]*sync.Mutex), } repoInformer.Informer().AddEventHandler( @@ -67,7 +80,7 @@ func newRepositoryEnsurer(repoInformer velerov1informers.ResticRepositoryInforme } readyChan <- newObj - delete(r.readyChans, newObj.Name) + delete(r.readyChans, key) } }, }, @@ -84,6 +97,30 @@ func repoLabels(volumeNamespace, backupLocation string) labels.Set { } func (r *repositoryEnsurer) EnsureRepo(ctx context.Context, namespace, volumeNamespace, backupLocation string) (*velerov1api.ResticRepository, error) { + log := r.log.WithField("volumeNamespace", volumeNamespace).WithField("backupLocation", backupLocation) + + // It's only safe to have one instance of this method executing concurrently for a + // given volumeNamespace + backupLocation, so synchronize based on that. It's fine + // to run concurrently for *different* namespaces/locations. If you had 2 goroutines + // running this for the same inputs, both might find no ResticRepository exists, then + // both would create new ones for the same namespace/location. + // + // This issue could probably be avoided if we had a deterministic name for + // each restic repository, and we just tried to create it, checked for an + // AlreadyExists err, and then waited for it to be ready. However, there are + // already repositories in the wild with non-deterministic names (i.e. using + // GenerateName) which poses a backwards compatibility problem. + log.Debug("Acquiring lock") + + repoMu := r.repoLock(volumeNamespace, backupLocation) + repoMu.Lock() + defer func() { + repoMu.Unlock() + log.Debug("Released lock") + }() + + log.Debug("Acquired lock") + selector := labels.SelectorFromSet(repoLabels(volumeNamespace, backupLocation)) repos, err := r.repoLister.ResticRepositories(namespace).List(selector) @@ -97,11 +134,14 @@ func (r *repositoryEnsurer) EnsureRepo(ctx context.Context, namespace, volumeNam if repos[0].Status.Phase != velerov1api.ResticRepositoryPhaseReady { return nil, errors.New("restic repository is not ready") } + + log.Debug("Ready repository found") return repos[0], nil } - // no repo found: create one and wait for it to be ready + log.Debug("No repository found, creating one") + // no repo found: create one and wait for it to be ready repo := &velerov1api.ResticRepository{ ObjectMeta: metav1.ObjectMeta{ Namespace: namespace, @@ -137,3 +177,19 @@ func (r *repositoryEnsurer) getReadyChan(name string) chan *velerov1api.ResticRe r.readyChans[name] = make(chan *velerov1api.ResticRepository) return r.readyChans[name] } + +func (r *repositoryEnsurer) repoLock(volumeNamespace, backupLocation string) *sync.Mutex { + r.repoLocksMu.Lock() + defer r.repoLocksMu.Unlock() + + key := repoKey{ + volumeNamespace: volumeNamespace, + backupLocation: backupLocation, + } + + if r.repoLocks[key] == nil { + r.repoLocks[key] = new(sync.Mutex) + } + + return r.repoLocks[key] +}