Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Server API refactor #11

Merged
merged 16 commits into from
Jul 22, 2023
Merged
Prev Previous commit
Next Next commit
Add panics to ReplayProvider
  • Loading branch information
tmaxmax committed Jul 22, 2023
commit 05ba1da996e6ae3a3221367969abd7a83303b3bd
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ This version brings a number of refactors to the server-side tooling the library
### Changed

- `ReplayProvider.Put` takes a simple `*Message` and returns a `*Message`, instead of changing the `*Message` to which the `**Message` parameter points.
It also takes a slice of topics, given that the `Message` doesn't hold the topic itself anymore.
It also takes a slice of topics, given that the `Message` doesn't hold the topic itself anymore. If the Message cannot be put, the method must now panic – see documentation for info.
- Because `Message.ExpiresAt` is removed, the `ValidReplayProvider` sets the expiry itself.
- `Server.Publish` now takes a list of topics.
- `Provider.Publish` now takes a non-empty slice of topics.
Expand Down
8 changes: 5 additions & 3 deletions joe.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ import (
// If not specified otherwise, the errors returned are implementation-specific.
type ReplayProvider interface {
// Put adds a new event to the replay buffer. The Message that is returned may not have the
// same address, if the replay provider automatically sets IDs. It may also be nil if the
// message couldn't be queued – for example, the provider expects the message to have an ID
// but it didn't.
// same address, if the replay provider automatically sets IDs.
//
// Put panics if the message couldn't be queued – if no topics are provided, or
// a message without an ID is put into a ReplayProvider which does not
// automatically set IDs.
//
// The Put operation may be executed by the replay provider in another goroutine only if
// it can ensure that any Replay operation called after the Put goroutine is started
Expand Down
42 changes: 36 additions & 6 deletions replay_buffer.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package sse

import (
"errors"
"strconv"
"strings"
)

// A buffer is the underlying storage for a provider. Its methods are used by the provider to implement
Expand Down Expand Up @@ -34,19 +36,38 @@ func (b *bufferBase) front() *messageWithTopics {
return &b.buf[0]
}

func (b *bufferBase) queue(message *Message, topics []string) *Message {
if len(topics) == 0 {
panic(errors.New("go-sse: no topics provided for Message.\n" + formatMessagePanicString(message)))
}

b.buf = append(b.buf, messageWithTopics{message: message, topics: topics})

return message
}

type bufferNoID struct {
lastRemovedID EventID
bufferBase
}

func (b *bufferNoID) queue(message *Message, topics []string) *Message {
if !message.ID.IsSet() {
return nil
// We could maybe return this as an error and change the ReplayProvider
// interface to return the error. The issue with that is the following:
// even if we return this message as an error, providers can't handle it
// in any meaningful manner – for example, Joe has no way to report
// a replay.Put error, as that's not run on the main goroutine.
// A panic seems fitting, as putting a message without an ID when using
// a provider that doesn't add IDs is breaking the API contract – that is,
// the provider expects a message with an ID. It seems to be an irrecoverable
// error which should be caught in development.
panicString := "go-sse: a Message without an ID was given to a provider that doesn't set IDs automatically.\n" + formatMessagePanicString(message)

panic(errors.New(panicString))
}

b.buf = append(b.buf, messageWithTopics{message: message, topics: topics})

return message
return b.bufferBase.queue(message, topics)
}

func (b *bufferNoID) dequeue() {
Expand Down Expand Up @@ -87,9 +108,8 @@ func (b *bufferAutoID) queue(message *Message, topics []string) *Message {
message = message.Clone()
message.ID = ID(strconv.FormatInt(b.upcomingID, autoIDBase))
b.upcomingID++
b.buf = append(b.buf, messageWithTopics{message: message, topics: topics})

return message
return b.bufferBase.queue(message, topics)
}

func (b *bufferAutoID) dequeue() {
Expand All @@ -116,3 +136,13 @@ func getBuffer(autoIDs bool, capacity int) buffer {
}
return &bufferNoID{bufferBase: base}
}

func formatMessagePanicString(m *Message) string {
ret := "The message is the following:\n"
for _, line := range strings.SplitAfter(m.String(), "\n") {
if strings.TrimSpace(line) != "" {
ret += "│ " + line
}
}
return ret + "└─■"
}
15 changes: 15 additions & 0 deletions replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,21 @@ func TestFiniteReplayProvider(t *testing.T) {

require.NoError(t, p.Replay(sse.Subscription{}), "replay failed on provider without messages")

require.PanicsWithError(t, `go-sse: a Message without an ID was given to a provider that doesn't set IDs automatically.
The message is the following:
│ event: panic
└─■`, func() {
p.Put(&sse.Message{Type: sse.Type("panic")}, []string{sse.DefaultTopic})
})

require.PanicsWithError(t, `go-sse: no topics provided for Message.
The message is the following:
│ id: 5
│ event: panic
└─■`, func() {
p.Put(&sse.Message{ID: sse.ID("5"), Type: sse.Type("panic")}, nil)
})

p.Put(msg(t, "", "1"), []string{sse.DefaultTopic})
p.Put(msg(t, "hello", "2"), []string{sse.DefaultTopic})
p.Put(msg(t, "there", "3"), []string{"t"})
Expand Down