Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use circular buffer in FiniteReplayProvider #23

Merged
merged 9 commits into from
Mar 1, 2024
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,20 @@

This file tracks changes to this project. It follows the [Keep a Changelog format](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

### Removed

- `FiniteReplayProvider.{Count, AutoIDs}` – use the constructor instead

hugowetterberg marked this conversation as resolved.
Show resolved Hide resolved
### Added

- `NewFiniteReplayProvider` constructor

### Fixed

- `FiniteReplayProvider` doesn't leak memory anymore and respects the stored messages count it was given. Previously when a new message was put after the messages count was reached and some other messages were removed, the total messages count would grow unexpectedly and `FiniteReplayProvider` would store and replay more events than it was configured to.

## [0.8.0] - 2024-01-30

This version removes all external dependencies of `go-sse`. All our bugs are belong to us! It also does some API and documentation cleanups.
Expand Down
7 changes: 5 additions & 2 deletions joe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,11 @@ data: world
func TestJoe_errors(t *testing.T) {
t.Parallel()

fin, err := sse.NewFiniteReplayProvider(1, false)
tests.Equal(t, err, nil, "should create new FiniteReplayProvider")
hugowetterberg marked this conversation as resolved.
Show resolved Hide resolved

j := &sse.Joe{
ReplayProvider: &sse.FiniteReplayProvider{Count: 1},
ReplayProvider: fin,
}
defer j.Shutdown(context.Background()) //nolint:errcheck // irrelevant

Expand All @@ -247,7 +250,7 @@ func TestJoe_errors(t *testing.T) {
return callErr
})

err := j.Subscribe(context.Background(), sse.Subscription{
err = j.Subscribe(context.Background(), sse.Subscription{
Client: client,
LastEventID: sse.ID("0"),
Topics: []string{sse.DefaultTopic},
Expand Down
111 changes: 91 additions & 20 deletions replay_provider.go
Original file line number Diff line number Diff line change
@@ -1,56 +1,127 @@
package sse

import (
"errors"
"strconv"
"time"
)

// NewFiniteReplayProvider creates a finite replay provider with the given max
// count and auto ID behaviour.
//
// Count is the maximum number of events FiniteReplayProvider should hold as
// valid. It must be greater than zero.
//
// AutoIDs configures FiniteReplayProvider to automatically set the IDs of
// events.
func NewFiniteReplayProvider(
count int, autoIDs bool,
) (*FiniteReplayProvider, error) {
if count < 1 {
hugowetterberg marked this conversation as resolved.
Show resolved Hide resolved
return nil, errors.New("count must be greater than zero")
}

return &FiniteReplayProvider{
cap: count,
buf: make([]messageWithTopics, count),
autoIDs: autoIDs,
}, nil
}

// FiniteReplayProvider is a replay provider that replays at maximum a certain number of events.
// The events must have an ID unless the AutoIDs flag is toggled.
type FiniteReplayProvider struct {
b buffer

// Count is the maximum number of events FiniteReplayProvider should hold as valid.
// It must be a positive integer, or the code will panic.
Count int
// AutoIDs configures FiniteReplayProvider to automatically set the IDs of events.
AutoIDs bool
cap int
buf []messageWithTopics
hugowetterberg marked this conversation as resolved.
Show resolved Hide resolved
head int
tail int
autoIDs bool
currentID int64
}

// Put puts a message into the provider's buffer. If there are more messages than the maximum
// number, the oldest message is removed.
func (f *FiniteReplayProvider) Put(message *Message, topics []string) *Message {
if f.b == nil {
f.b = getBuffer(f.AutoIDs, f.Count)
if len(topics) == 0 {
panic(errors.New(
"go-sse: no topics provided for Message.\n" +
formatMessagePanicString(message)))
}

if f.autoIDs {
f.currentID++

message.ID = ID(strconv.FormatInt(f.currentID, 10))
} else if !message.ID.IsSet() {
panicString := "go-sse: a Message without an ID was given to a provider that doesn't set IDs automatically.\n" + formatMessagePanicString(message)

panic(errors.New(panicString))
}

if f.b.len() == f.b.cap() {
f.b.dequeue()
f.buf[f.tail] = messageWithTopics{message: message, topics: topics}

f.tail++
if f.tail >= f.cap {
f.tail = 0
}

return f.b.queue(message, topics)
if f.tail == f.head {
f.head = f.tail + 1

if f.head > f.cap {
f.head = 0
}
}

return message
}

// Replay replays the messages in the buffer to the listener.
// It doesn't take into account the messages' expiry times.
func (f *FiniteReplayProvider) Replay(subscription Subscription) error {
if f.b == nil {
if f.head == f.tail {
return nil
}

events := f.b.slice(subscription.LastEventID)
if len(events) == 0 {
return nil
// Replay head to end and start to tail when head is after tail.
if f.tail < f.head {
foundFirst, err := replay(subscription, f.buf[f.tail:], false)
if err != nil {
return err
}

_, err = replay(subscription, f.buf[0:f.tail], foundFirst)
if err != nil {
return err
}
} else {
_, err := replay(subscription, f.buf[0:f.tail], false)
if err != nil {
return err
}
}

return subscription.Client.Flush()
}

func replay(
sub Subscription, events []messageWithTopics, foundFirstEvent bool,
) (hasFoundFirstEvent bool, err error) {
for _, e := range events {
if topicsIntersect(subscription.Topics, e.topics) {
if err := subscription.Client.Send(e.message); err != nil {
return err
if !foundFirstEvent && e.message.ID == sub.LastEventID {
foundFirstEvent = true

continue
}

if foundFirstEvent && topicsIntersect(sub.Topics, e.topics) {
if err := sub.Client.Send(e.message); err != nil {
return false, err
}
}
}

return subscription.Client.Flush()
return foundFirstEvent, nil
}

// ValidReplayProvider is a ReplayProvider that replays all the buffered non-expired events.
Expand Down
74 changes: 69 additions & 5 deletions replay_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package sse_test

import (
"fmt"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -108,7 +110,8 @@ func TestValidReplayProvider(t *testing.T) {
func TestFiniteReplayProvider(t *testing.T) {
t.Parallel()

p := &sse.FiniteReplayProvider{Count: 3}
p, err := sse.NewFiniteReplayProvider(3, false)
hugowetterberg marked this conversation as resolved.
Show resolved Hide resolved
hugowetterberg marked this conversation as resolved.
Show resolved Hide resolved
tests.Equal(t, err, nil, "should create new FiniteReplayProvider")

tests.Equal(t, p.Replay(sse.Subscription{}), nil, "replay failed on provider without messages")

Expand Down Expand Up @@ -142,11 +145,72 @@ The message is the following:
tests.Equal(t, replayed.String(), "id: 4\ndata: world\n\n", "invalid replayed message")

p.Put(msg(t, "", "5"), []string{"t"})
p.Put(msg(t, "", "6"), []string{"t"})
p.Put(msg(t, "again", "7"), []string{sse.DefaultTopic})
p.Put(msg(t, "again", "6"), []string{sse.DefaultTopic})

replayed = replay(t, p, sse.ID("4"), sse.DefaultTopic, "topic with no messages")[0]
tests.Equal(t, replayed.String(), "id: 7\ndata: again\n\n", "invalid replayed message")
tests.Equal(t, replayed.String(), "id: 6\ndata: again\n\n", "invalid replayed message")

testReplayError(t, &sse.FiniteReplayProvider{Count: 10}, nil)
tr, err := sse.NewFiniteReplayProvider(10, false)
tests.Equal(t, err, nil, "should create new FiniteReplayProvider")

testReplayError(t, tr, nil)
}

func TestFiniteReplayProvider_allocations(t *testing.T) {
hugowetterberg marked this conversation as resolved.
Show resolved Hide resolved
p, err := sse.NewFiniteReplayProvider(3, false)
tests.Equal(t, err, nil, "should create new FiniteReplayProvider")

const runs = 100

topics := []string{sse.DefaultTopic}
// Add one to the number of runs to take the warmup run of
// AllocsPerRun() into account.
queue := make([]*sse.Message, runs+1)
lastID := runs

for i := 0; i < len(queue); i++ {
queue[i] = msg(t,
fmt.Sprintf("message %d", i),
strconv.Itoa(i),
)
}

var run int

avgAllocs := testing.AllocsPerRun(runs, func() {
_ = p.Put(queue[run], topics)

run++
})

tests.Equal(t, avgAllocs, 0, "no allocations should be made on Put()")

var replayCount int

cb := mockClient(func(m *sse.Message) error {
if m != nil {
replayCount++
}

return nil
})

sub := sse.Subscription{
Client: cb,
Topics: topics,
}

sub.LastEventID = sse.ID(strconv.Itoa(lastID - 3))

err = p.Replay(sub)
tests.Equal(t, err, nil, "replay from fourth last should succeed")

tests.Equal(t, replayCount, 0, "replay from fourth last should not yield messages")

sub.LastEventID = sse.ID(strconv.Itoa(lastID - 2))

err = p.Replay(sub)
tests.Equal(t, err, nil, "replay from third last should succeed")

tests.Equal(t, replayCount, 2, "replay from third last should yield 2 messages")
}