Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Prevent LeaderElector setup error from being swallowed #2876

Merged
merged 8 commits into from
Aug 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 29 additions & 22 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,16 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) {
// Initialize the internal context.
cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)

// Leader elector must be created before defer that contains engageStopProcedure function
// https://github.com/kubernetes-sigs/controller-runtime/issues/2873
var leaderElector *leaderelection.LeaderElector
if cm.resourceLock != nil {
leaderElector, err = cm.initLeaderElector()
if err != nil {
return fmt.Errorf("failed during initialization leader election process: %w", err)
}
}

// This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request
stopComplete := make(chan struct{})
defer close(stopComplete)
Expand Down Expand Up @@ -433,19 +443,22 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) {
{
ctx, cancel := context.WithCancel(context.Background())
cm.leaderElectionCancel = cancel
go func() {
if cm.resourceLock != nil {
if err := cm.startLeaderElection(ctx); err != nil {
cm.errChan <- err
}
} else {
if leaderElector != nil {
// Start the leader elector process
go func() {
leaderElector.Run(ctx)
<-ctx.Done()
close(cm.leaderElectionStopped)
}()
} else {
go func() {
// Treat not having leader election enabled the same as being elected.
if err := cm.startLeaderElectionRunnables(); err != nil {
cm.errChan <- err
}
close(cm.elected)
}
}()
}()
}
}

ready = true
Expand Down Expand Up @@ -564,12 +577,8 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e
return nil
}

func (cm *controllerManager) startLeaderElectionRunnables() error {
return cm.runnables.LeaderElection.Start(cm.internalCtx)
}

func (cm *controllerManager) startLeaderElection(ctx context.Context) (err error) {
l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
func (cm *controllerManager) initLeaderElector() (*leaderelection.LeaderElector, error) {
leaderElector, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: cm.resourceLock,
LeaseDuration: cm.leaseDuration,
RenewDeadline: cm.renewDeadline,
Expand Down Expand Up @@ -599,16 +608,14 @@ func (cm *controllerManager) startLeaderElection(ctx context.Context) (err error
Name: cm.leaderElectionID,
})
if err != nil {
return err
return nil, err
}

// Start the leader elector process
go func() {
l.Run(ctx)
<-ctx.Done()
close(cm.leaderElectionStopped)
}()
return nil
return leaderElector, nil
}

func (cm *controllerManager) startLeaderElectionRunnables() error {
return cm.runnables.LeaderElection.Start(cm.internalCtx)
}

func (cm *controllerManager) Elected() <-chan struct{} {
Expand Down
17 changes: 17 additions & 0 deletions pkg/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1165,6 +1165,23 @@ var _ = Describe("manger.Manager", func() {
cm.onStoppedLeading = func() {}
},
)

It("should return an error if leader election param incorrect", func() {
renewDeadline := time.Second * 20
m, err := New(cfg, Options{
LeaderElection: true,
LeaderElectionID: "controller-runtime",
LeaderElectionNamespace: "default",
newResourceLock: fakeleaderelection.NewResourceLock,
RenewDeadline: &renewDeadline,
})
Expect(err).NotTo(HaveOccurred())
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
err = m.Start(ctx)
Expect(err).To(HaveOccurred())
Expect(errors.Is(err, context.DeadlineExceeded)).NotTo(BeTrue())
})
})

Context("should start serving metrics", func() {
Expand Down