Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Server API refactor #11

Merged
merged 16 commits into from
Jul 22, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Remove Message.Topic
  • Loading branch information
tmaxmax committed Jul 17, 2023
commit 15a68a099d8508dfd94a89a1f21d58fff8d5d2e9
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ This file tracks changes to this project. It follows the [Keep a Changelog forma
### Removed

- `Message.ExpiresAt` is no more.
- `Message.Topic` is no more. See the changes to `Server`, `Provider` and `ReplayProvider` for handling topics – you can now publish a message to multiple topics at once.
- `NewValidReplayProvider` is no more.
- `NewFiniteReplayProvider` is no more.

Expand All @@ -18,7 +19,11 @@ This file tracks changes to this project. It follows the [Keep a Changelog forma
### Changed

- `ReplayProvider.Put` takes a simple `*Message` and returns a `*Message`, instead of changing the `*Message` to which the `**Message` parameter points.
It also takes a slice of topics, given that the `Message` doesn't hold the topic itself anymore.
- Because `Message.ExpiresAt` is removed, the `ValidReplayProvider` sets the expiry itself.
- `Server.Publish` now takes a list of topics.
- `Provider.Publish` now takes a non-empty slice of topics.
- `ReplayProvider.Put` now takes a non-empty slice of topics.

## [0.5.2] - 2023-07-12

Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ A provider is an implementation of the publish-subscribe messaging system:

```go
type Provider interface {
// Publish a message to all subscribers. A message contains the event and some additional information - read further and see the documentation.
Publish(msg *Message) error
// Publish a message to all subscribers of the given topics.
Publish(msg *Message, topics []string) error
// Add a new subscriber that is unsubscribed when the context is done.
Subscribe(ctx context.Context, sub Subscription) error
// Cleanup all resources and stop publishing messages or accepting subscriptions.
Expand Down Expand Up @@ -113,7 +113,7 @@ type ReplayProvider interface {
// If the provider automatically adds IDs aswell,
// the returned message will also have the ID set,
// otherwise the input value is returned.
Put(msg *Message) *Message
Put(msg *Message, topics []string) *Message
// Replay valid events to a subscriber.
Replay(sub Subscription)
}
Expand Down
54 changes: 0 additions & 54 deletions example_message_marshal_test.go

This file was deleted.

43 changes: 33 additions & 10 deletions joe.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type ReplayProvider interface {
// can be replayed. If an error occurs internally when putting the new message
// and retrying the operation would block for too long, it can be aborted.
// The errors aren't returned as the server providers won't be able to handle them in a useful manner.
Put(message *Message) *Message
Put(message *Message, topics []string) *Message
// Replay sends to a new subscriber all the valid events received by the provider
// since the event with the listener's ID. If the ID the listener provides
// is invalid, the provider should not replay any events.
Expand Down Expand Up @@ -80,6 +80,11 @@ type (
done subscriber
Subscription
}

messageWithTopics struct {
message *Message
topics []string
}
)

// Joe is a basic server provider that synchronously executes operations by queueing them in channels.
Expand All @@ -94,7 +99,7 @@ type (
// He serves simple use-cases well, as he's light on resources, and does not require any external
// services. Also, he is the default provider for Servers.
type Joe struct {
message chan *Message
message chan messageWithTopics
subscription chan subscription
unsubscription chan subscriber
done chan struct{}
Expand Down Expand Up @@ -131,7 +136,7 @@ func NewJoe(configuration ...JoeConfig) *Joe {
gc, stopGCTicker := ticker(config.ReplayGCInterval)

j := &Joe{
message: make(chan *Message),
message: make(chan messageWithTopics),
subscription: make(chan subscription),
unsubscription: make(chan subscriber),
done: make(chan struct{}),
Expand Down Expand Up @@ -175,11 +180,20 @@ func (j *Joe) Subscribe(ctx context.Context, sub Subscription) error {
}

// Publish tells Joe to send the given message to the subscribers.
func (j *Joe) Publish(msg *Message) error {
// When a message is published to multiple topics, Joe makes sure to
// not send the Message multiple times to clients that are subscribed
// to more than one topic that receive the given Message. Every client
// receives each unique message once, regardless of how many topics it
// is subscribed to or to how many topics the message is published.
func (j *Joe) Publish(msg *Message, topics []string) error {
if len(topics) == 0 {
return ErrNoTopic
}

// Waiting on done ensures Publish doesn't block the caller goroutine
// when Joe is stopped and implements the required Provider behavior.
select {
case j.message <- msg:
case j.message <- messageWithTopics{message: msg, topics: topics}:
return nil
case <-j.done:
return ErrProviderClosed
Expand Down Expand Up @@ -228,11 +242,20 @@ func (j *Joe) start() {
for {
select {
case msg := <-j.message:
msg = j.replay.Put(msg)

for done, cb := range j.topics[msg.Topic] {
if !cb(msg) {
j.removeSubscriber(done)
toDispatch := j.replay.Put(msg.message, msg.topics)
seen := map[subscriber]struct{}{}

for _, topic := range msg.topics {
for done, cb := range j.topics[topic] {
if _, ok := seen[done]; ok {
continue
}

if cb(toDispatch) {
seen[done] = struct{}{}
} else {
j.removeSubscriber(done)
}
}
}
case sub := <-j.subscription:
Expand Down
27 changes: 14 additions & 13 deletions joe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type mockReplayProvider struct {
callsReplay int
}

func (m *mockReplayProvider) Put(msg *sse.Message) *sse.Message {
func (m *mockReplayProvider) Put(msg *sse.Message, _ []string) *sse.Message {
m.callsPut++
return msg
}
Expand All @@ -34,10 +34,10 @@ func (m *mockReplayProvider) GC() error {

var _ sse.ReplayProvider = (*mockReplayProvider)(nil)

func msg(tb testing.TB, data, id, topic string) *sse.Message {
func msg(tb testing.TB, data, id string) *sse.Message {
tb.Helper()

e := &sse.Message{Topic: topic}
e := &sse.Message{}
e.AppendData(data)
if id != "" {
e.ID = sse.ID(id)
Expand Down Expand Up @@ -84,7 +84,8 @@ func TestJoe_Stop(t *testing.T) {
require.NoError(t, j.Stop())
require.ErrorIs(t, j.Stop(), sse.ErrProviderClosed)
require.ErrorIs(t, j.Subscribe(context.Background(), sse.Subscription{}), sse.ErrProviderClosed)
require.ErrorIs(t, j.Publish(nil), sse.ErrProviderClosed)
require.ErrorIs(t, j.Publish(nil, nil), sse.ErrNoTopic)
require.ErrorIs(t, j.Publish(nil, []string{sse.DefaultTopic}), sse.ErrProviderClosed)
require.Zero(t, rp.callsPut)
require.Zero(t, rp.callsReplay)
require.Zero(t, rp.callsGC)
Expand Down Expand Up @@ -157,9 +158,9 @@ func TestJoe_SubscribePublish(t *testing.T) {

sub := subscribe(t, j, ctx)
<-ctx.waitingOnDone
require.NoError(t, j.Publish(msg(t, "hello", "", sse.DefaultTopic)))
require.NoError(t, j.Publish(msg(t, "hello", ""), []string{sse.DefaultTopic}))
cancel()
require.NoError(t, j.Publish(msg(t, "world", "", sse.DefaultTopic)))
require.NoError(t, j.Publish(msg(t, "world", ""), []string{sse.DefaultTopic}))
msgs := <-sub
require.Equal(t, "data: hello\n\n", msgs[0].String())

Expand Down Expand Up @@ -188,8 +189,8 @@ func TestJoe_Subscribe_multipleTopics(t *testing.T) {
sub := subscribe(t, j, ctx, sse.DefaultTopic, "another topic")
<-ctx.waitingOnDone

_ = j.Publish(msg(t, "hello", "", sse.DefaultTopic))
_ = j.Publish(msg(t, "world", "", "another topic"))
_ = j.Publish(msg(t, "hello", ""), []string{sse.DefaultTopic, "another topic"})
_ = j.Publish(msg(t, "world", ""), []string{"another topic"})

_ = j.Stop()

Expand All @@ -211,8 +212,8 @@ func TestJoe_errors(t *testing.T) {
})
defer j.Stop() //nolint:errcheck // irrelevant

_ = j.Publish(msg(t, "hello", "0", sse.DefaultTopic))
_ = j.Publish(msg(t, "hello", "1", sse.DefaultTopic))
_ = j.Publish(msg(t, "hello", "0"), []string{sse.DefaultTopic})
_ = j.Publish(msg(t, "hello", "1"), []string{sse.DefaultTopic})

var called int
cb := func(_ *sse.Message) bool {
Expand All @@ -227,7 +228,7 @@ func TestJoe_errors(t *testing.T) {
})
require.NoError(t, err, "error not received from replay")

_ = j.Publish(msg(t, "world", "2", sse.DefaultTopic))
_ = j.Publish(msg(t, "world", "2"), []string{sse.DefaultTopic})

require.Equal(t, 1, called, "callback was called after subscribe returned")

Expand All @@ -241,8 +242,8 @@ func TestJoe_errors(t *testing.T) {

<-ctx.waitingOnDone

_ = j.Publish(msg(t, "", "3", sse.DefaultTopic))
_ = j.Publish(msg(t, "", "4", sse.DefaultTopic))
_ = j.Publish(msg(t, "", "3"), []string{sse.DefaultTopic})
_ = j.Publish(msg(t, "", "4"), []string{sse.DefaultTopic})
}()

err = j.Subscribe(ctx, sse.Subscription{Callback: cb, Topics: []string{sse.DefaultTopic}})
Expand Down
20 changes: 2 additions & 18 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,8 @@ func (c *chunk) WriteTo(w io.Writer) (int64, error) {
return int64(n + m), err
}

// Message is the representation of a single message sent from the server to its clients.
//
// A Message is made of a single event, which is sent to each client, and other metadata
// about the message itself: its topic.
//
// Topics are used to filter which events reach which clients. If a client subscribes to
// a certain set of topics, but a message's topic is not part of that set, then the underlying
// event of the message does not reach the client.
//
// The Topic field is used only on the server and is not sent to the client.
// It is not part of the protocol.
// Message is the representation of an event sent from the server to its clients.
type Message struct {
Topic string

chunks []chunk

ID EventID
Expand Down Expand Up @@ -328,11 +316,7 @@ func (e *Message) reset() {
// UnmarshalText extracts the first event found in the given byte slice into the
// receiver. The input is expected to be a wire format event, as defined by the spec.
// Therefore, previous fields present on the Message will be overwritten
// (i.e. event, ID, comments, data, retry), but the Topic will be kept as is,
// as it is not an event field.
//
// A method for marshalling and unmarshalling Messages together with their Topic
// can be seen in the top-level example MessageCustomJSONMarshal.
// (i.e. event, ID, comments, data, retry).
//
// Unmarshaling ignores fields with invalid names. If no valid fields are found,
// an error is returned. For a field to be valid it must end in a newline - if the last
Expand Down
Loading