Skip to content

Commit

Permalink
[chore] Try fixing flaky SharedInstance e2e test (#10929)
Browse files Browse the repository at this point in the history
Fixes
#10927.

` go test status_test.go --count 100` passed.
  • Loading branch information
TylerHelmuth authored Aug 26, 2024
1 parent b33913b commit a80ce1a
Showing 1 changed file with 42 additions and 28 deletions.
70 changes: 42 additions & 28 deletions internal/e2e/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package e2e
import (
"context"
"errors"
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -116,31 +118,40 @@ func Test_ComponentStatusReporting_SharedInstance(t *testing.T) {
err = s.Shutdown(context.Background())
require.NoError(t, err)

assert.Equal(t, 5, len(eventsReceived))
require.Equal(t, 2, len(eventsReceived))

for instanceID, events := range eventsReceived {
if instanceID.ComponentID() == component.NewID(component.MustNewType("test")) {
for i, e := range events {
if i == 0 {
assert.Equal(t, componentstatus.StatusStarting, e.Status())
}
if i == 1 {
assert.Equal(t, componentstatus.StatusRecoverableError, e.Status())
}
if i == 2 {
assert.Equal(t, componentstatus.StatusOK, e.Status())
}
if i == 3 {
assert.Equal(t, componentstatus.StatusStopping, e.Status())
}
if i == 4 {
assert.Equal(t, componentstatus.StatusStopped, e.Status())
}
if i >= 5 {
assert.Fail(t, "received too many events")
}
pipelineIDs := ""
instanceID.AllPipelineIDs(func(id component.ID) bool {
pipelineIDs += id.String() + ","
return true
})

t.Logf("checking errors for %v - %v - %v", pipelineIDs, instanceID.Kind().String(), instanceID.ComponentID().String())

eventStr := ""
for i, e := range events {
eventStr += fmt.Sprintf("%v,", e.Status())
if i == 0 {
assert.Equal(t, componentstatus.StatusStarting, e.Status())
}
if i == 1 {
assert.Equal(t, componentstatus.StatusRecoverableError, e.Status())
}
if i == 2 {
assert.Equal(t, componentstatus.StatusOK, e.Status())
}
if i == 3 {
assert.Equal(t, componentstatus.StatusStopping, e.Status())
}
if i == 4 {
assert.Equal(t, componentstatus.StatusStopped, e.Status())
}
if i >= 5 {
assert.Fail(t, "received too many events")
}
}
t.Logf("events received: %v", eventStr)
}
}

Expand All @@ -156,12 +167,10 @@ func newReceiverFactory() receiver.Factory {
type testReceiver struct{}

func (t *testReceiver) Start(_ context.Context, host component.Host) error {
if statusReporter, ok := host.(componentstatus.Reporter); ok {
statusReporter.Report(componentstatus.NewRecoverableErrorEvent(errors.New("test recoverable error")))
go func() {
statusReporter.Report(componentstatus.NewEvent(componentstatus.StatusOK))
}()
}
componentstatus.ReportStatus(host, componentstatus.NewRecoverableErrorEvent(errors.New("test recoverable error")))
go func() {
componentstatus.ReportStatus(host, componentstatus.NewEvent(componentstatus.StatusOK))
}()
return nil
}

Expand Down Expand Up @@ -237,6 +246,7 @@ func createExtension(_ context.Context, _ extension.Settings, cfg component.Conf

type testExtension struct {
eventsReceived map[*componentstatus.InstanceID][]*componentstatus.Event
lock sync.Mutex
}

type extensionConfig struct {
Expand All @@ -262,7 +272,11 @@ func (t *testExtension) ComponentStatusChanged(
source *componentstatus.InstanceID,
event *componentstatus.Event,
) {
t.eventsReceived[source] = append(t.eventsReceived[source], event)
t.lock.Lock()
defer t.lock.Unlock()
if source.ComponentID() == component.NewID(component.MustNewType("test")) {
t.eventsReceived[source] = append(t.eventsReceived[source], event)
}
}

// NotifyConfig implements the extension.ConfigWatcher interface.
Expand Down

0 comments on commit a80ce1a

Please sign in to comment.