Skip to content

Commit

Permalink
feat(event/eventbus): add artificial delay option to simulate network…
Browse files Browse the repository at this point in the history
… latency
  • Loading branch information
bounoable committed Nov 6, 2023
1 parent 186872e commit 5cf3f58
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 16 deletions.
67 changes: 52 additions & 15 deletions event/eventbus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ import (
"context"
"fmt"
"sync"
"time"

"github.com/modernice/goes/event"
)

type chanbus struct {
sync.RWMutex

artificialDelay time.Duration

events map[string]*eventSubscription
queue chan event.Event
done chan struct{}
Expand Down Expand Up @@ -38,22 +41,45 @@ type subscribeJob struct {
done chan struct{}
}

// New creates and returns a new event.Bus backed by a channel-based
// implementation. The returned event.Bus allows subscribing to events,
// publishing events, and managing event subscriptions.
func New() event.Bus {
// Option is a type of function that modifies the properties of a [*chanbus]
// during its initialization. It enables the configuration of various settings
// of the event bus, such as the artificial delay, before it starts operation.
// Options are applied in the order they are provided when creating a new event
// bus through the New function.
type Option func(*chanbus)

// WithArtificialDelay sets an artificial delay for the event bus. This delay is
// applied after each event is published to the bus, effectively slowing down
// the rate of event publishing. The delay duration is specified by the provided
// time.Duration value. The function returns an Option that can be used to
// configure a chanbus instance.
func WithArtificialDelay(delay time.Duration) func(*chanbus) {
return func(c *chanbus) {
c.artificialDelay = delay
}
}

// New creates a new instance of an event bus with the provided options. The
// returned event bus is safe for concurrent use and starts processing events
// immediately. The artificial delay parameter can be set to simulate network
// latency or other delays.
func New(opts ...Option) event.Bus {
bus := &chanbus{
events: make(map[string]*eventSubscription),
queue: make(chan event.Event),
done: make(chan struct{}),
artificialDelay: time.Millisecond,
events: make(map[string]*eventSubscription),
queue: make(chan event.Event),
done: make(chan struct{}),
}
for _, opt := range opts {
opt(bus)
}
go bus.work()
return bus
}

// Close terminates the channel-based event bus. If the bus is already closed,
// it does nothing. Otherwise, it signals to all operations that they should
// stop and cleans up any resources associated with the bus.
// Close signals the termination of the Chanbus. After Close is called, no more
// events can be processed by the Chanbus. Calling Close multiple times has no
// additional effect.
func (bus *chanbus) Close() {
select {
case <-bus.done:
Expand All @@ -62,9 +88,14 @@ func (bus *chanbus) Close() {
}
}

// Subscribe creates a subscription for the specified events and returns
// channels for receiving the events and errors. The subscription is
// automatically unsubscribed when the provided context is canceled.
// Subscribe is used to register interest in a set of events. It takes in a
// context and a variadic parameter of event names. For each event name
// provided, it creates a subscription that listens for these events. The
// function returns two channels: one for receiving the subscribed events and
// another for receiving any errors that may occur during the subscription
// process. If an error occurs while setting up any of the subscriptions, the
// function cancels all other subscriptions, closes the channels, and returns an
// error.
func (bus *chanbus) Subscribe(ctx context.Context, events ...string) (<-chan event.Event, <-chan error, error) {
ctx, unsubscribeAll := context.WithCancel(ctx)
go func() {
Expand All @@ -86,8 +117,11 @@ func (bus *chanbus) Subscribe(ctx context.Context, events ...string) (<-chan eve
return fanInEvents(ctx, rcpts), fanInErrors(ctx, rcpts), nil
}

// Publish sends the provided events to the event bus. It returns an error if
// the context is canceled before all events are published.
// Publish publishes the provided events to the event bus. It ensures each event
// is dispatched to all subscribed recipients. If an artificial delay is set, it
// pauses for that duration before dispatching the next event. The function
// returns an error if the context gets cancelled before all events are
// published.
func (bus *chanbus) Publish(ctx context.Context, events ...event.Event) error {
done := make(chan struct{})
go func() {
Expand All @@ -97,6 +131,9 @@ func (bus *chanbus) Publish(ctx context.Context, events ...event.Event) error {
case <-ctx.Done():
return
case bus.queue <- evt:
if bus.artificialDelay > 0 {
time.Sleep(bus.artificialDelay)
}
}
}
}()
Expand Down
4 changes: 4 additions & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ cloud.google.com/go/compute v1.19.1/go.mod h1:6ylj3a05WF8leseCdIf77NK0g1ey+nj5IK
cloud.google.com/go/compute v1.19.3/go.mod h1:qxvISKp/gYnXkSAD1ppcSOveRAmzxicEv/JlizULFrI=
cloud.google.com/go/compute v1.20.1/go.mod h1:4tCnrn48xsqlwSAiLf1HXMQk8CONslYbdiEZc9FEIbM=
cloud.google.com/go/compute v1.21.0/go.mod h1:4tCnrn48xsqlwSAiLf1HXMQk8CONslYbdiEZc9FEIbM=
cloud.google.com/go/compute v1.23.0/go.mod h1:4tCnrn48xsqlwSAiLf1HXMQk8CONslYbdiEZc9FEIbM=
cloud.google.com/go/compute/metadata v0.2.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k=
cloud.google.com/go/compute/metadata v0.2.1/go.mod h1:jgHgmJd2RKBGzXqF5LR2EZMGxBkeanZ9wwa75XHJgOM=
cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY=
Expand Down Expand Up @@ -794,6 +795,7 @@ golang.org/x/oauth2 v0.7.0 h1:qe6s0zUXlPX80/dITx3440hWZ7GwMwgDDyrSGTPJG/g=
golang.org/x/oauth2 v0.7.0/go.mod h1:hPLQkd9LyjfXTiRohC/41GhcFqxisoUQ99sCUOHO9x4=
golang.org/x/oauth2 v0.8.0/go.mod h1:yr7u4HXZRm1R1kBWqr/xKNqewf0plRYoB7sla+BCIXE=
golang.org/x/oauth2 v0.10.0/go.mod h1:kTpgurOux7LqtuxjuyZa4Gj2gdezIt/jQtGnNFfypQI=
golang.org/x/oauth2 v0.11.0/go.mod h1:LdF7O/8bLR/qWK9DrpXmbHLTouvRHK0SgJl0GmDBchk=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down Expand Up @@ -992,11 +994,13 @@ google.golang.org/genproto v0.0.0-20230711160842-782d3b101e98/go.mod h1:S7mY02Oq
google.golang.org/genproto v0.0.0-20230803162519-f966b187b2e5 h1:L6iMMGrtzgHsWofoFcihmDEMYeDR9KN/ThbPWGrh++g=
google.golang.org/genproto v0.0.0-20230803162519-f966b187b2e5/go.mod h1:oH/ZOT02u4kWEp7oYBGYFFkCdKS/uYR9Z7+0/xuuFp8=
google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d h1:VBu5YqKPv6XiJ199exd8Br+Aetz+o08F+PLMnwJQHAY=
google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d/go.mod h1:yZTlhN0tQnXo3h00fuXNCxJdLdIdnVFVBaRJ5LWBbw4=
google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9 h1:m8v1xLLLzMe1m5P+gCTF8nJB9epwZQUBERm20Oy1poQ=
google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9/go.mod h1:vHYtlOoi6TsQ3Uk2yxR7NI5z8uoV+3pZtR4jmHIkRig=
google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:vHYtlOoi6TsQ3Uk2yxR7NI5z8uoV+3pZtR4jmHIkRig=
google.golang.org/genproto/googleapis/api v0.0.0-20230706204954-ccb25ca9f130/go.mod h1:mPBs5jNgx2GuQGvFwUvVKqtn6HsUw9nP64BedgvqEsQ=
google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98/go.mod h1:rsr7RhLuwsDKL7RmgDDCUc6yaGr1iqceVb5Wv6f6YvQ=
google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d/go.mod h1:KjSP20unUpOx5kyQUFa7k4OJg0qeJ7DEZflGDu2p6Bk=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230706204954-ccb25ca9f130/go.mod h1:8mL13HKkDa+IuJ8yruA3ci0q+0vsUz4m//+ottjwS5o=
Expand Down
51 changes: 51 additions & 0 deletions projection/lookup/lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,44 @@ func TestLookup_Reverse(t *testing.T) {
}
}

func TestLookup_removed(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

bus := eventbus.New()
store := eventstore.New()

l := lookup.New(store, bus, []string{"foo", "bar"})
errs, err := l.Run(ctx)
if err != nil {
t.Fatalf("Run() failed with %q", err)
}
go func() {
for range errs {
}
}()

aggregateID := uuid.New()

events := []event.Event{
event.New("foo", LookupEvent{Foo: "foo"}, event.Aggregate(aggregateID, "foo", 1)).Any(),
event.New("bar", LookupRemoveEvent{}, event.Aggregate(aggregateID, "foo", 2)).Any(),
}

if err := bus.Publish(ctx, events...); err != nil {
t.Fatalf("publish events: %v", err)
}

value, ok := l.Lookup(ctx, "foo", "foo", aggregateID)
if ok {
t.Fatalf("Lookup should return false; got true")
}

if value != nil {
t.Fatalf("Lookup should return nil; got %v", value)
}
}

// LookupEvent is a type used in testing the lookup package. It provides a Foo
// field and implements the ProvideLookup method of the lookup.Provider
// interface.
Expand All @@ -131,3 +169,16 @@ type LookupEvent struct {
func (e LookupEvent) ProvideLookup(p lookup.Provider) {
p.Provide("foo", e.Foo)
}

// LookupRemoveEvent is a type that, when used in the context of the lookup
// package, signals the removal of a lookup value from the provider it's
// attached to. It has no fields and its sole purpose is to implement the
// ProvideLookup method of the lookup.Provider interface in a way that instructs
// it to remove a specific value.
type LookupRemoveEvent struct{}

// ProvideLookup removes the provided lookup value from the provider for the
// LookupRemoveEvent instance.
func (LookupRemoveEvent) ProvideLookup(p lookup.Provider) {
p.Remove("foo")
}
2 changes: 1 addition & 1 deletion projection/schedule/continuous.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func DebounceCap(cap time.Duration) ContinuousOption {
// subscribes to events with the given eventNames to create projection Jobs
// for those events.
//
// Debounce events
// # Debounce events
//
// It may be desirable to debounce the creation of projection Jobs to avoid
// creating a Job on every event if Events are published within a short
Expand Down

0 comments on commit 5cf3f58

Please sign in to comment.