Skip to content

Commit

Permalink
refactor!: split subscribers and local subscribers
Browse files Browse the repository at this point in the history
  • Loading branch information
dunglas committed Nov 6, 2024
1 parent 3d5743c commit 01c8aab
Show file tree
Hide file tree
Showing 18 changed files with 231 additions and 212 deletions.
8 changes: 4 additions & 4 deletions bolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (t *BoltTransport) persist(updateID string, updateJSON []byte) error {
}

// AddSubscriber adds a new subscriber to the transport.
func (t *BoltTransport) AddSubscriber(s *Subscriber) error {
func (t *BoltTransport) AddSubscriber(s *LocalSubscriber) error {
select {
case <-t.closed:
return ErrClosedTransport
Expand All @@ -226,7 +226,7 @@ func (t *BoltTransport) AddSubscriber(s *Subscriber) error {
}

// RemoveSubscriber removes a new subscriber from the transport.
func (t *BoltTransport) RemoveSubscriber(s *Subscriber) error {
func (t *BoltTransport) RemoveSubscriber(s *LocalSubscriber) error {
select {
case <-t.closed:
return ErrClosedTransport
Expand All @@ -249,7 +249,7 @@ func (t *BoltTransport) GetSubscribers() (string, []*Subscriber, error) {
}

//nolint:gocognit
func (t *BoltTransport) dispatchHistory(s *Subscriber, toSeq uint64) error {
func (t *BoltTransport) dispatchHistory(s *LocalSubscriber, toSeq uint64) error {
err := t.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(t.bucketName))
if b == nil {
Expand Down Expand Up @@ -311,7 +311,7 @@ func (t *BoltTransport) Close() (err error) {
t.Lock()
defer t.Unlock()

t.subscribers.Walk(0, func(s *Subscriber) bool {
t.subscribers.Walk(0, func(s *LocalSubscriber) bool {
s.Disconnect()

return true
Expand Down
28 changes: 14 additions & 14 deletions bolt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestBoltTransportHistory(t *testing.T) {
})
}

s := NewSubscriber("8", transport.logger, &TopicSelectorStore{})
s := NewLocalSubscriber("8", transport.logger, &TopicSelectorStore{})
s.SetTopics(topics, nil)

require.NoError(t, transport.AddSubscriber(s))
Expand Down Expand Up @@ -68,7 +68,7 @@ func TestBoltTransportLogsBogusLastEventID(t *testing.T) {
Topics: topics,
})

s := NewSubscriber("711131", logger, &TopicSelectorStore{})
s := NewLocalSubscriber("711131", logger, &TopicSelectorStore{})
s.SetTopics(topics, nil)

require.NoError(t, transport.AddSubscriber(s))
Expand All @@ -87,7 +87,7 @@ func TestBoltTopicSelectorHistory(t *testing.T) {
transport.Dispatch(&Update{Topics: []string{"http://example.com/subscribed-public-only"}, Private: true, Event: Event{ID: "3"}})
transport.Dispatch(&Update{Topics: []string{"http://example.com/subscribed-public-only"}, Event: Event{ID: "4"}})

s := NewSubscriber(EarliestLastEventID, transport.logger, &TopicSelectorStore{})
s := NewLocalSubscriber(EarliestLastEventID, transport.logger, &TopicSelectorStore{})
s.SetTopics([]string{"http://example.com/subscribed", "http://example.com/subscribed-public-only"}, []string{"http://example.com/subscribed"})

require.NoError(t, transport.AddSubscriber(s))
Expand All @@ -109,7 +109,7 @@ func TestBoltTransportRetrieveAllHistory(t *testing.T) {
})
}

s := NewSubscriber(EarliestLastEventID, transport.logger, &TopicSelectorStore{})
s := NewLocalSubscriber(EarliestLastEventID, transport.logger, &TopicSelectorStore{})
s.SetTopics(topics, nil)
require.NoError(t, transport.AddSubscriber(s))

Expand Down Expand Up @@ -139,7 +139,7 @@ func TestBoltTransportHistoryAndLive(t *testing.T) {
})
}

s := NewSubscriber("8", transport.logger, &TopicSelectorStore{})
s := NewLocalSubscriber("8", transport.logger, &TopicSelectorStore{})
s.SetTopics(topics, nil)
require.NoError(t, transport.AddSubscriber(s))

Expand Down Expand Up @@ -221,7 +221,7 @@ func TestBoltTransportDoNotDispatchUntilListen(t *testing.T) {
defer os.Remove("test.db")
assert.Implements(t, (*Transport)(nil), transport)

s := NewSubscriber("", transport.logger, &TopicSelectorStore{})
s := NewLocalSubscriber("", transport.logger, &TopicSelectorStore{})
require.NoError(t, transport.AddSubscriber(s))

var wg sync.WaitGroup
Expand All @@ -245,7 +245,7 @@ func TestBoltTransportDispatch(t *testing.T) {
defer os.Remove("test.db")
assert.Implements(t, (*Transport)(nil), transport)

s := NewSubscriber("", transport.logger, &TopicSelectorStore{})
s := NewLocalSubscriber("", transport.logger, &TopicSelectorStore{})
s.SetTopics([]string{"https://example.com/foo", "https://example.com/private"}, []string{"https://example.com/private"})

require.NoError(t, transport.AddSubscriber(s))
Expand Down Expand Up @@ -274,7 +274,7 @@ func TestBoltTransportClosed(t *testing.T) {
defer os.Remove("test.db")
assert.Implements(t, (*Transport)(nil), transport)

s := NewSubscriber("", transport.logger, &TopicSelectorStore{})
s := NewLocalSubscriber("", transport.logger, &TopicSelectorStore{})
s.SetTopics([]string{"https://example.com/foo"}, nil)
require.NoError(t, transport.AddSubscriber(s))

Expand All @@ -293,11 +293,11 @@ func TestBoltCleanDisconnectedSubscribers(t *testing.T) {
defer transport.Close()
defer os.Remove("test.db")

s1 := NewSubscriber("", transport.logger, &TopicSelectorStore{})
s1 := NewLocalSubscriber("", transport.logger, &TopicSelectorStore{})
s1.SetTopics([]string{"foo"}, []string{})
require.NoError(t, transport.AddSubscriber(s1))

s2 := NewSubscriber("", transport.logger, &TopicSelectorStore{})
s2 := NewLocalSubscriber("", transport.logger, &TopicSelectorStore{})
s2.SetTopics([]string{"foo"}, []string{})
require.NoError(t, transport.AddSubscriber(s2))

Expand All @@ -318,19 +318,19 @@ func TestBoltGetSubscribers(t *testing.T) {
defer transport.Close()
defer os.Remove("test.db")

s1 := NewSubscriber("", transport.logger, &TopicSelectorStore{})
s1 := NewLocalSubscriber("", transport.logger, &TopicSelectorStore{})
require.NoError(t, transport.AddSubscriber(s1))

s2 := NewSubscriber("", transport.logger, &TopicSelectorStore{})
s2 := NewLocalSubscriber("", transport.logger, &TopicSelectorStore{})
require.NoError(t, transport.AddSubscriber(s2))

lastEventID, subscribers, err := transport.GetSubscribers()
require.NoError(t, err)

assert.Equal(t, EarliestLastEventID, lastEventID)
assert.Len(t, subscribers, 2)
assert.Contains(t, subscribers, s1)
assert.Contains(t, subscribers, s2)
assert.Contains(t, subscribers, &s1.Subscriber)
assert.Contains(t, subscribers, &s2.Subscriber)
}

func TestBoltLastEventID(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions local.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (t *LocalTransport) Dispatch(update *Update) error {
}

// AddSubscriber adds a new subscriber to the transport.
func (t *LocalTransport) AddSubscriber(s *Subscriber) error {
func (t *LocalTransport) AddSubscriber(s *LocalSubscriber) error {
select {
case <-t.closed:
return ErrClosedTransport
Expand All @@ -74,7 +74,7 @@ func (t *LocalTransport) AddSubscriber(s *Subscriber) error {
}

// RemoveSubscriber removes a subscriber from the transport.
func (t *LocalTransport) RemoveSubscriber(s *Subscriber) error {
func (t *LocalTransport) RemoveSubscriber(s *LocalSubscriber) error {
select {
case <-t.closed:
return ErrClosedTransport
Expand Down Expand Up @@ -102,7 +102,7 @@ func (t *LocalTransport) Close() (err error) {
t.Lock()
defer t.Unlock()
close(t.closed)
t.subscribers.Walk(0, func(s *Subscriber) bool {
t.subscribers.Walk(0, func(s *LocalSubscriber) bool {
s.Disconnect()

return true
Expand Down
2 changes: 1 addition & 1 deletion local_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func subBenchLocalTransport(b *testing.B, topics, concurrency, matchPct int, tes
out := make(chan *Update, 50000)
tss := &TopicSelectorStore{}
for i := 0; i < concurrency; i++ {
s := NewSubscriber("", zap.NewNop(), tss)
s := NewLocalSubscriber("", zap.NewNop(), tss)
if i%100 < matchPct {
s.SetTopics(tsMatch, nil)
} else {
Expand Down
28 changes: 14 additions & 14 deletions local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestLocalTransportDoNotDispatchUntilListen(t *testing.T) {
err := transport.Dispatch(u)
require.NoError(t, err)

s := NewSubscriber("", logger, &TopicSelectorStore{})
s := NewLocalSubscriber("", logger, &TopicSelectorStore{})
s.SetTopics(u.Topics, nil)
require.NoError(t, transport.AddSubscriber(s))

Expand All @@ -43,7 +43,7 @@ func TestLocalTransportDispatch(t *testing.T) {
defer transport.Close()
assert.Implements(t, (*Transport)(nil), transport)

s := NewSubscriber("", logger, &TopicSelectorStore{})
s := NewLocalSubscriber("", logger, &TopicSelectorStore{})
s.SetTopics([]string{"http://example.com/foo"}, nil)
require.NoError(t, transport.AddSubscriber(s))

Expand All @@ -60,10 +60,10 @@ func TestLocalTransportClosed(t *testing.T) {

tss := &TopicSelectorStore{}

s := NewSubscriber("", logger, tss)
s := NewLocalSubscriber("", logger, tss)
require.NoError(t, transport.AddSubscriber(s))
require.NoError(t, transport.Close())
assert.Equal(t, transport.AddSubscriber(NewSubscriber("", logger, tss)), ErrClosedTransport)
assert.Equal(t, transport.AddSubscriber(NewLocalSubscriber("", logger, tss)), ErrClosedTransport)
assert.Equal(t, transport.Dispatch(&Update{}), ErrClosedTransport)

_, ok := <-s.out
Expand All @@ -78,10 +78,10 @@ func TestLiveCleanDisconnectedSubscribers(t *testing.T) {

tss := &TopicSelectorStore{}

s1 := NewSubscriber("", logger, tss)
s1 := NewLocalSubscriber("", logger, tss)
require.NoError(t, transport.AddSubscriber(s1))

s2 := NewSubscriber("", logger, tss)
s2 := NewLocalSubscriber("", logger, tss)
require.NoError(t, transport.AddSubscriber(s2))

assert.Equal(t, 2, transport.subscribers.Len())
Expand All @@ -101,7 +101,7 @@ func TestLiveReading(t *testing.T) {
defer transport.Close()
assert.Implements(t, (*Transport)(nil), transport)

s := NewSubscriber("", logger, &TopicSelectorStore{})
s := NewLocalSubscriber("", logger, &TopicSelectorStore{})
s.SetTopics([]string{"https://example.com"}, nil)
require.NoError(t, transport.AddSubscriber(s))

Expand All @@ -113,23 +113,23 @@ func TestLiveReading(t *testing.T) {
}

func TestLocalTransportGetSubscribers(t *testing.T) {
logger := zap.NewNop()
transport, _ := DeprecatedNewLocalTransport(&url.URL{Scheme: "local"}, logger)
transport := NewLocalTransport()
defer transport.Close()
require.NotNil(t, transport)

logger := zap.NewNop()
tss := &TopicSelectorStore{}

s1 := NewSubscriber("", logger, tss)
s1 := NewLocalSubscriber("", logger, tss)
require.NoError(t, transport.AddSubscriber(s1))

s2 := NewSubscriber("", logger, tss)
s2 := NewLocalSubscriber("", logger, tss)
require.NoError(t, transport.AddSubscriber(s2))

lastEventID, subscribers, err := transport.(TransportSubscribers).GetSubscribers()
lastEventID, subscribers, err := transport.GetSubscribers()
require.NoError(t, err)
assert.Equal(t, EarliestLastEventID, lastEventID)
assert.Len(t, subscribers, 2)
assert.Contains(t, subscribers, s1)
assert.Contains(t, subscribers, s2)
assert.Contains(t, subscribers, &s1.Subscriber)
assert.Contains(t, subscribers, &s2.Subscriber)
}
Loading

0 comments on commit 01c8aab

Please sign in to comment.