Skip to content

Commit

Permalink
[connector/failover] Fix flaky test (#32419)
Browse files Browse the repository at this point in the history
**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
Update test structure to fix flaky behavior, originally seemed to fail
every 1-2k runs from my local testing.

**Link to tracking Issue:** <Issue number if applicable>

resolves #32396

**Testing:** <Describe what testing was performed and which tests were
added.>
Added test method to directly trigger retry for testing, Rerun 10k+
times

**Documentation:** <Describe the documentation added.>

---------

Co-authored-by: Daniel Jaglowski <jaglows3@gmail.com>
  • Loading branch information
akats7 and djaglowski authored Apr 16, 2024
1 parent 97db72a commit ce92d2e
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 15 deletions.
27 changes: 27 additions & 0 deletions .chloggen/failover-flaky-test-fix.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: failoverconnector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix flaky test in pipeline selector component

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32396]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
16 changes: 10 additions & 6 deletions connector/failoverconnector/internal/state/pipeline_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ func (p *PipelineSelector) handlePipelineError(idx int) {
ctx, cancel := context.WithCancel(context.Background())
p.RS.InvokeCancel()
p.RS.UpdateCancelFunc(cancel)
p.enableRetry(ctx)
p.enableRetry(ctx, p.constants.RetryInterval, p.constants.RetryGap)
}

func (p *PipelineSelector) enableRetry(ctx context.Context) {
func (p *PipelineSelector) enableRetry(ctx context.Context, retryInterval time.Duration, retryGap time.Duration) {
go func() {
ticker := time.NewTicker(p.constants.RetryInterval)
ticker := time.NewTicker(retryInterval)
defer ticker.Stop()

var cancelFunc context.CancelFunc
Expand All @@ -51,7 +51,7 @@ func (p *PipelineSelector) enableRetry(ctx context.Context) {
if cancelFunc != nil {
cancelFunc()
}
cancelFunc = p.handleRetry(ctx)
cancelFunc = p.handleRetry(ctx, retryGap)
case <-ctx.Done():
return
}
Expand All @@ -61,9 +61,9 @@ func (p *PipelineSelector) enableRetry(ctx context.Context) {
}

// handleRetry is responsible for launching goroutine and returning cancelFunc
func (p *PipelineSelector) handleRetry(parentCtx context.Context) context.CancelFunc {
func (p *PipelineSelector) handleRetry(parentCtx context.Context, retryGap time.Duration) context.CancelFunc {
retryCtx, cancelFunc := context.WithCancel(parentCtx)
go p.retryHighPriorityPipelines(retryCtx, p.constants.RetryGap)
go p.retryHighPriorityPipelines(retryCtx, retryGap)
return cancelFunc
}

Expand Down Expand Up @@ -246,6 +246,10 @@ func (p *PipelineSelector) TestSetStableIndex(idx int32) {
p.stableIndex.Store(idx)
}

func (p *PipelineSelector) TestRetryPipelines(ctx context.Context, retryInterval time.Duration, retryGap time.Duration) {
p.enableRetry(ctx, retryInterval, retryGap)
}

func (p *PipelineSelector) SetRetryCountToMax(idx int) {
p.pipelineRetries[idx].Store(int32(p.constants.MaxRetries))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package state

import (
"context"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -53,26 +54,20 @@ func TestHandlePipelineError(t *testing.T) {
}

func TestCurrentPipelineWithRetry(t *testing.T) {
var wg sync.WaitGroup
done := make(chan struct{})
constants := PSConstants{
RetryInterval: 50 * time.Millisecond,
RetryGap: 10 * time.Millisecond,
MaxRetries: 1000,
}
pS := NewPipelineSelector(5, constants)

wg.Add(1)
go pS.ListenToChannels(done, &wg)
ctx, cancel := context.WithCancel(context.Background())
defer func() {
close(done)
wg.Wait()
cancel()
}()

_, ch := pS.SelectedPipeline()
ch <- false

pS.TestSetStableIndex(2)
pS.TestRetryPipelines(ctx, constants.RetryInterval, constants.RetryGap)

require.Eventually(t, func() bool {
idx, _ := pS.SelectedPipeline()
Expand Down

0 comments on commit ce92d2e

Please sign in to comment.