Skip to content

Commit

Permalink
fix cloud event flaky unit tests by adding waitgroup to fakeclient
Browse files Browse the repository at this point in the history
This commit adds waitgroup to fakeclient to avoid that some goroutines are not done when we want to collect the events.
The tests are flaky because the cloud events are sent with goroutine
but we don't wait until all goroutines done to check the events. So it
is possible that some events are not collected. The waitGroup will count when each goroutine is created and decrease the count when the goroutine is done.
This change has no impact on current code.

Signed-off-by: Yongxuan Zhang yongxuanzhang@google.com
  • Loading branch information
Yongxuanzhang authored and tekton-robot committed Nov 17, 2022
1 parent b15a80e commit fff3fcd
Show file tree
Hide file tree
Showing 14 changed files with 262 additions and 153 deletions.
22 changes: 7 additions & 15 deletions pkg/reconciler/customrun/customrun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/tektoncd/pipeline/pkg/reconciler/events/cloudevent"
ttesting "github.com/tektoncd/pipeline/pkg/reconciler/testing"
"github.com/tektoncd/pipeline/test"
eventstest "github.com/tektoncd/pipeline/test/events"
"github.com/tektoncd/pipeline/test/names"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -47,6 +46,7 @@ import (

func initializeCustomRunControllerAssets(t *testing.T, d test.Data) (test.Assets, func()) {
ctx, _ := ttesting.SetupFakeContext(t)
ctx = ttesting.SetupFakeCloudClientContext(ctx, d.ExpectedCloudEventCount)
ctx, cancel := context.WithCancel(ctx)
test.EnsureConfigurationConfigMapsExist(&d)
c, informers := test.SeedTestData(t, ctx, d)
Expand Down Expand Up @@ -158,8 +158,9 @@ func TestReconcile_CloudEvents(t *testing.T) {
customRuns := []*v1beta1.CustomRun{&customRun}

d := test.Data{
CustomRuns: customRuns,
ConfigMaps: cms,
CustomRuns: customRuns,
ConfigMaps: cms,
ExpectedCloudEventCount: len(tc.wantCloudEvents),
}
testAssets, cancel := getCustomRunController(t, d)
defer cancel()
Expand Down Expand Up @@ -187,19 +188,13 @@ func TestReconcile_CloudEvents(t *testing.T) {
}

ceClient := clients.CloudEvents.(cloudevent.FakeClient)
err = eventstest.CheckEventsUnordered(t, ceClient.Events, tc.name, tc.wantCloudEvents)
if err != nil {
t.Errorf(err.Error())
}
ceClient.CheckCloudEventsUnordered(t, tc.name, tc.wantCloudEvents)

// Try and reconcile again - expect no event
if err := c.Reconciler.Reconcile(testAssets.Ctx, getCustomRunName(customRun)); err != nil {
t.Fatal("Didn't expect an error, but got one.")
}
err = eventstest.CheckEventsUnordered(t, ceClient.Events, tc.name, []string{})
if err != nil {
t.Errorf(err.Error())
}
ceClient.CheckCloudEventsUnordered(t, tc.name, []string{})
})
}
}
Expand Down Expand Up @@ -296,10 +291,7 @@ func TestReconcile_CloudEvents_Disabled(t *testing.T) {
}

ceClient := clients.CloudEvents.(cloudevent.FakeClient)
err = eventstest.CheckEventsUnordered(t, ceClient.Events, tc.name, []string{})
if err != nil {
t.Errorf(err.Error())
}
ceClient.CheckCloudEventsUnordered(t, tc.name, []string{})
})
}
}
3 changes: 3 additions & 0 deletions pkg/reconciler/events/cloudevent/cloud_event_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,10 @@ func SendCloudEventWithRetries(ctx context.Context, object runtime.Object) error
_, isCustomRun := object.(*v1beta1.CustomRun)

wasIn := make(chan error)

ceClient.addCount()
go func() {
defer ceClient.decreaseCount()
wasIn <- nil
logger.Debugf("Sending cloudevent of type %q", event.Type())
// In case of Run event, check cache if cloudevent is already sent
Expand Down
18 changes: 8 additions & 10 deletions pkg/reconciler/events/cloudevent/cloud_event_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func TestSendCloudEvents(t *testing.T) {
successfulBehaviour := FakeClientBehaviour{
SendSuccessfully: true,
}
err := SendCloudEvents(tc.taskRun, newFakeClient(&successfulBehaviour), logger, testClock)
err := SendCloudEvents(tc.taskRun, newFakeClient(&successfulBehaviour, len(tc.wantTaskRun.Status.CloudEvents)), logger, testClock)
if err != nil {
t.Fatalf("Unexpected error sending cloud events: %v", err)
}
Expand Down Expand Up @@ -335,7 +335,7 @@ func TestSendCloudEventsErrors(t *testing.T) {
unsuccessfulBehaviour := FakeClientBehaviour{
SendSuccessfully: false,
}
err := SendCloudEvents(tc.taskRun, newFakeClient(&unsuccessfulBehaviour), logger, testClock)
err := SendCloudEvents(tc.taskRun, newFakeClient(&unsuccessfulBehaviour, len(tc.wantTaskRun.Status.CloudEvents)), logger, testClock)
if err == nil {
t.Fatalf("Unexpected success sending cloud events: %v", err)
}
Expand Down Expand Up @@ -614,14 +614,12 @@ func TestSendCloudEventWithRetries(t *testing.T) {
}}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctx := setupFakeContext(t, tc.clientBehaviour, true)
ctx := setupFakeContext(t, tc.clientBehaviour, true, len(tc.wantCEvents))
if err := SendCloudEventWithRetries(ctx, tc.object); err != nil {
t.Fatalf("Unexpected error sending cloud events: %v", err)
}
ceClient := Get(ctx).(FakeClient)
if err := eventstest.CheckEventsUnordered(t, ceClient.Events, tc.name, tc.wantCEvents); err != nil {
t.Fatalf(err.Error())
}
ceClient.CheckCloudEventsUnordered(t, tc.name, tc.wantCEvents)
recorder := controller.GetEventRecorder(ctx).(*record.FakeRecorder)
if err := eventstest.CheckEventsOrdered(t, recorder.Events, tc.name, tc.wantEvents); err != nil {
t.Fatalf(err.Error())
Expand Down Expand Up @@ -656,7 +654,7 @@ func TestSendCloudEventWithRetriesInvalid(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
ctx := setupFakeContext(t, FakeClientBehaviour{
SendSuccessfully: true,
}, true)
}, true, 1)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
err := SendCloudEventWithRetries(ctx, tc.object)
Expand All @@ -669,7 +667,7 @@ func TestSendCloudEventWithRetriesInvalid(t *testing.T) {

func TestSendCloudEventWithRetriesNoClient(t *testing.T) {

ctx := setupFakeContext(t, FakeClientBehaviour{}, false)
ctx := setupFakeContext(t, FakeClientBehaviour{}, false, 0)
err := SendCloudEventWithRetries(ctx, &v1beta1.TaskRun{Status: v1beta1.TaskRunStatus{}})
if err == nil {
t.Fatalf("Expected an error sending cloud events with no client in the context, got none")
Expand All @@ -679,11 +677,11 @@ func TestSendCloudEventWithRetriesNoClient(t *testing.T) {
}
}

func setupFakeContext(t *testing.T, behaviour FakeClientBehaviour, withClient bool) context.Context {
func setupFakeContext(t *testing.T, behaviour FakeClientBehaviour, withClient bool, expectedEventCount int) context.Context {
var ctx context.Context
ctx, _ = rtesting.SetupFakeContext(t)
if withClient {
ctx = WithClient(ctx, &behaviour)
ctx = WithClient(ctx, &behaviour, expectedEventCount)
}
return ctx
}
11 changes: 9 additions & 2 deletions pkg/reconciler/events/cloudevent/cloudevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,15 @@ func (t TektonEventType) String() string {
return string(t)
}

// CEClient matches the `Client` interface from github.com/cloudevents/sdk-go/v2/cloudevents
type CEClient cloudevents.Client
// CEClient wraps the `Client` interface from github.com/cloudevents/sdk-go/v2/cloudevents
// and has methods to count the cloud events being sent, those methods are for testing purposes.
type CEClient interface {
cloudevents.Client
// addCount increments the count of events to be sent
addCount()
// decreaseCount decrements the count of events to be sent, indicating the event has been sent
decreaseCount()
}

// TektonCloudEventData type is used to marshal and unmarshal the payload of
// a Tekton cloud event. It can include a TaskRun or a PipelineRun
Expand Down
36 changes: 35 additions & 1 deletion pkg/reconciler/events/cloudevent/cloudeventclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"net/http"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/client"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/cloudevents/sdk-go/v2/protocol"
"k8s.io/client-go/rest"
"knative.dev/pkg/injection"
"knative.dev/pkg/logging"
Expand Down Expand Up @@ -59,7 +62,38 @@ func withCloudEventClient(ctx context.Context) context.Context {
logger.Panicf("Error creating the cloudevents client: %s", err)
}

return context.WithValue(ctx, ceKey{}, cloudEventClient)
celient := CloudClient{
client: cloudEventClient,
}
return context.WithValue(ctx, ceKey{}, celient)
}

// CloudClient is a wrapper of CloudEvents client and implements addCount and decreaseCount
type CloudClient struct {
client client.Client
}

// AddCount does nothing
func (c CloudClient) addCount() {
}

// DecreaseCount does nothing
func (c CloudClient) decreaseCount() {
}

// Send invokes call client.Send
func (c CloudClient) Send(ctx context.Context, event cloudevents.Event) protocol.Result {
return c.client.Send(ctx, event)
}

// Request invokes client.Request
func (c CloudClient) Request(ctx context.Context, event event.Event) (*cloudevents.Event, protocol.Result) {
return c.client.Request(ctx, event)
}

// StartReceiver invokes client.StartReceiver
func (c CloudClient) StartReceiver(ctx context.Context, fn interface{}) error {
return c.client.StartReceiver(ctx, fn)
}

// Get extracts the cloudEventClient client from the context.
Expand Down
83 changes: 72 additions & 11 deletions pkg/reconciler/events/cloudevent/cloudeventsfakeclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ package cloudevent
import (
"context"
"fmt"
"regexp"
"sync"
"testing"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/protocol"
)

const bufferSize = 100

// FakeClientBehaviour defines how the client will behave
type FakeClientBehaviour struct {
SendSuccessfully bool
Expand All @@ -37,13 +38,17 @@ type FakeClient struct {
behaviour *FakeClientBehaviour
// Modelled after k8s.io/client-go fake recorder
Events chan string
// waitGroup is used to block until all events have been sent
waitGroup *sync.WaitGroup
}

// newFakeClient is a FakeClient factory, it returns a client for the target
func newFakeClient(behaviour *FakeClientBehaviour) cloudevents.Client {
func newFakeClient(behaviour *FakeClientBehaviour, expectedEventCount int) CEClient {
return FakeClient{
behaviour: behaviour,
Events: make(chan string, bufferSize),
// set buffersize to length of want events to make sure no extra events are sent
Events: make(chan string, expectedEventCount),
waitGroup: &sync.WaitGroup{},
}
}

Expand All @@ -52,17 +57,24 @@ var _ cloudevents.Client = (*FakeClient)(nil)
// Send fakes the Send method from cloudevents.Client
func (c FakeClient) Send(ctx context.Context, event cloudevents.Event) protocol.Result {
if c.behaviour.SendSuccessfully {
c.Events <- fmt.Sprintf("%s", event.String())
return nil
// This is to prevent extra events are sent. We don't read events from channel before we call CheckCloudEventsUnordered
if len(c.Events) < cap(c.Events) {
c.Events <- fmt.Sprintf("%s", event.String())
return nil
}
return fmt.Errorf("Channel is full of size:%v, but extra event wants to be sent:%v", cap(c.Events), event)
}
return fmt.Errorf("Had to fail. Event ID: %s", event.ID())
}

// Request fakes the Request method from cloudevents.Client
func (c FakeClient) Request(ctx context.Context, event cloudevents.Event) (*cloudevents.Event, protocol.Result) {
if c.behaviour.SendSuccessfully {
c.Events <- fmt.Sprintf("%v", event.String())
return &event, nil
if len(c.Events) < cap(c.Events) {
c.Events <- fmt.Sprintf("%v", event.String())
return &event, nil
}
return nil, fmt.Errorf("Channel is full of size:%v, but extra event wants to be sent:%v", cap(c.Events), event)
}
return nil, fmt.Errorf("Had to fail. Event ID: %s", event.ID())
}
Expand All @@ -72,7 +84,56 @@ func (c FakeClient) StartReceiver(ctx context.Context, fn interface{}) error {
return nil
}

// WithClient adds to the context a fake client with the desired behaviour
func WithClient(ctx context.Context, behaviour *FakeClientBehaviour) context.Context {
return context.WithValue(ctx, ceKey{}, newFakeClient(behaviour))
// addCount can be used to add the count when each event is going to be sent
func (c FakeClient) addCount() {
c.waitGroup.Add(1)
}

// decreaseCount can be used to the decrease the count when each event is sent
func (c FakeClient) decreaseCount() {
c.waitGroup.Done()
}

// WithClient adds to the context a fake client with the desired behaviour and expectedEventCount
func WithClient(ctx context.Context, behaviour *FakeClientBehaviour, expectedEventCount int) context.Context {
return context.WithValue(ctx, ceKey{}, newFakeClient(behaviour, expectedEventCount))
}

// CheckCloudEventsUnordered checks that all events in wantEvents, and no others,
// were received via the given chan, in any order.
// Block until all events have been sent.
func (c *FakeClient) CheckCloudEventsUnordered(t *testing.T, testName string, wantEvents []string) {
t.Helper()
c.waitGroup.Wait()
expected := append([]string{}, wantEvents...)
channelEvents := len(c.Events)

// extra events are prevented in FakeClient's Send function.
// fewer events are detected because we collect all events from channel and compare with wantEvents
for eventCount := 0; eventCount < channelEvents; eventCount++ {
event := <-c.Events
if len(expected) == 0 {
t.Errorf("extra event received: %q", event)
}
found := false
for wantIdx, want := range expected {
matching, err := regexp.MatchString(want, event)
if err != nil {
t.Errorf("something went wrong matching an event: %s", err)
}
if matching {
found = true
// Remove event from list of those we expect to receive
expected[wantIdx] = expected[len(expected)-1]
expected = expected[:len(expected)-1]
break
}
}
if !found {
t.Errorf("unexpected event received: %q", event)
}
}
if len(expected) != 0 {
t.Errorf("%d events %#v are not received", len(expected), expected)
}
}
Loading

0 comments on commit fff3fcd

Please sign in to comment.