Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1654,12 +1654,6 @@ func (o *Operator) ensureInstallPlan(logger *logrus.Entry, namespace string, gen
return nil, nil
}

// Check if any existing installplans are creating the same resources
installPlans, err := o.listInstallPlans(namespace)
if err != nil {
return nil, err
}

// There are multiple(2) worker threads process the namespaceQueue.
// Both worker can work at the same time when 2 separate updates are made for the namespace.
// The following sequence causes 2 installplans are created for a subscription
Expand All @@ -1680,6 +1674,13 @@ func (o *Operator) ensureInstallPlan(logger *logrus.Entry, namespace string, gen
o.muInstallPlan.Lock()
defer o.muInstallPlan.Unlock()

// Check if any existing installplans are creating the same resources
// This must be done inside the lock to prevent TOCTOU race condition
installPlans, err := o.listInstallPlans(namespace)
if err != nil {
return nil, err
}

for _, installPlan := range installPlans {
if installPlan.Spec.Generation == gen {
return reference.GetReference(installPlan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"reflect"
"strings"
"sync"
"testing"
"testing/quick"
"time"
Expand Down Expand Up @@ -2559,3 +2560,107 @@ func hasExpectedCondition(ip *v1alpha1.InstallPlan, expectedCondition v1alpha1.I
}
return false
}

// TestEnsureInstallPlanConcurrency tests that concurrent calls to ensureInstallPlan
// do not create duplicate InstallPlans for the same subscription.
// This test verifies the fix for a TOCTOU race condition where multiple worker threads
// could create duplicate InstallPlans if they both check for existing plans before either
// has created one.
func TestEnsureInstallPlanConcurrency(t *testing.T) {
namespace := "test-ns"
gen := 1
numGoroutines := 10

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

// Create a fake operator
op, err := NewFakeOperator(ctx, namespace, []string{namespace})
require.NoError(t, err)

// Create a test subscription
sub := &v1alpha1.Subscription{
ObjectMeta: metav1.ObjectMeta{
Name: "test-sub",
Namespace: namespace,
UID: types.UID("test-uid"),
},
Spec: &v1alpha1.SubscriptionSpec{
Package: "test-package",
},
}

// Create test steps for the InstallPlan
steps := []*v1alpha1.Step{
{
Resolving: "test-csv",
Resource: v1alpha1.StepResource{
CatalogSource: "test-catalog",
CatalogSourceNamespace: namespace,
Group: "operators.coreos.com",
Version: "v1alpha1",
Kind: "ClusterServiceVersion",
Name: "test-csv",
Manifest: toManifest(t, csv("test-csv", namespace, nil, nil)),
},
Status: v1alpha1.StepStatusUnknown,
},
}

// Use WaitGroup to synchronize goroutines
var wg sync.WaitGroup
// Use a channel to collect results
results := make(chan *corev1.ObjectReference, numGoroutines)
// Use a sync.Once-like mechanism to start all goroutines at roughly the same time
startBarrier := make(chan struct{})

logger := logrus.NewEntry(logrus.New())

// Launch multiple goroutines that will call ensureInstallPlan concurrently
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// Wait for the start signal
<-startBarrier

// Call ensureInstallPlan
ref, err := op.ensureInstallPlan(logger, namespace, gen, []*v1alpha1.Subscription{sub}, v1alpha1.ApprovalAutomatic, steps, nil)
require.NoError(t, err)
results <- ref
}()
}

// Start all goroutines
close(startBarrier)

// Wait for all goroutines to complete
wg.Wait()
close(results)

// Collect all results
var refs []*corev1.ObjectReference
for ref := range results {
refs = append(refs, ref)
}

// Verify we got the expected number of results
require.Len(t, refs, numGoroutines, "should have received results from all goroutines")

// Verify all refs point to the same InstallPlan
firstRef := refs[0]
for i, ref := range refs {
require.Equal(t, firstRef.Name, ref.Name, "goroutine %d returned different InstallPlan name", i)
require.Equal(t, firstRef.Namespace, ref.Namespace, "goroutine %d returned different InstallPlan namespace", i)
require.Equal(t, firstRef.UID, ref.UID, "goroutine %d returned different InstallPlan UID", i)
}

// Verify only one InstallPlan was created in the cluster
ipList, err := op.client.OperatorsV1alpha1().InstallPlans(namespace).List(ctx, metav1.ListOptions{})
require.NoError(t, err)
require.Len(t, ipList.Items, 1, "exactly one InstallPlan should have been created")

// Verify the created InstallPlan has the correct generation
createdIP := &ipList.Items[0]
require.Equal(t, gen, createdIP.Spec.Generation, "InstallPlan should have the correct generation")
}
Original file line number Diff line number Diff line change
Expand Up @@ -5853,6 +5853,12 @@ func RequireObjectsInCache(t *testing.T, lister operatorlister.OperatorLister, n
fetched, err = lister.RbacV1().RoleBindingLister().RoleBindings(namespace).Get(o.GetName())
case *v1alpha1.ClusterServiceVersion:
fetched, err = lister.OperatorsV1alpha1().ClusterServiceVersionLister().ClusterServiceVersions(namespace).Get(o.GetName())
if err != nil {
if apierrors.IsNotFound(err) {
return err
}
return errors.Join(err, fmt.Errorf("namespace: %v, error: %v", namespace, err))
}
// We don't care about finalizers
object.(*v1alpha1.ClusterServiceVersion).Finalizers = nil
fetched.(*v1alpha1.ClusterServiceVersion).Finalizers = nil
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.