Skip to content

Commit c3429ee

Browse files
committed
test: add tests
1 parent 935699b commit c3429ee

File tree

6 files changed

+147
-47
lines changed

6 files changed

+147
-47
lines changed

internal/leadership/broadcaster.go

Lines changed: 23 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4,34 +4,35 @@ import (
44
"sync"
55
)
66

7-
type listener struct {
8-
channel chan Leadership
7+
type listener[T any] struct {
8+
channel chan T
99
}
1010

11-
type Broadcaster struct {
11+
type Broadcaster[T any] struct {
1212
mu sync.Mutex
13-
t *Leadership
13+
t *T
1414

15-
inner []listener
16-
outer chan Leadership
15+
inner []listener[T]
16+
outer chan T
1717
}
1818

19-
func (h *Broadcaster) Actual() Leadership {
19+
func (h *Broadcaster[T]) Actual() T {
2020
h.mu.Lock()
2121
defer h.mu.Unlock()
2222

2323
if h.t == nil {
24-
return Leadership{}
24+
var t T
25+
return t
2526
}
2627
return *h.t
2728
}
2829

29-
func (h *Broadcaster) Subscribe() (<-chan Leadership, func()) {
30+
func (h *Broadcaster[T]) Subscribe() (<-chan T, func()) {
3031
h.mu.Lock()
3132
defer h.mu.Unlock()
3233

33-
newChannel := make(chan Leadership, 1)
34-
l := listener{
34+
newChannel := make(chan T, 1)
35+
l := listener[T]{
3536
channel: newChannel,
3637
}
3738
h.inner = append(h.inner, l)
@@ -43,23 +44,20 @@ func (h *Broadcaster) Subscribe() (<-chan Leadership, func()) {
4344
h.mu.Lock()
4445
defer h.mu.Unlock()
4546

46-
index := -1
47-
for i, listener := range h.inner {
47+
for index, listener := range h.inner {
4848
if listener == l {
49-
index = i
49+
if index < len(h.inner)-1 {
50+
h.inner = append(h.inner[:index], h.inner[index+1:]...)
51+
} else {
52+
h.inner = h.inner[:index]
53+
}
5054
break
5155
}
5256
}
53-
54-
if index < len(h.inner)-1 {
55-
h.inner = append(h.inner[:index], h.inner[index+1:]...)
56-
} else {
57-
h.inner = h.inner[:index]
58-
}
5957
}
6058
}
6159

62-
func (h *Broadcaster) Broadcast(t Leadership) {
60+
func (h *Broadcaster[T]) Broadcast(t T) {
6361
h.mu.Lock()
6462
defer h.mu.Unlock()
6563

@@ -70,7 +68,7 @@ func (h *Broadcaster) Broadcast(t Leadership) {
7068
}
7169
}
7270

73-
func (h *Broadcaster) Close() {
71+
func (h *Broadcaster[T]) Close() {
7472
h.mu.Lock()
7573
defer h.mu.Unlock()
7674

@@ -79,15 +77,8 @@ func (h *Broadcaster) Close() {
7977
}
8078
}
8179

82-
func (h *Broadcaster) CountListeners() int {
83-
h.mu.Lock()
84-
defer h.mu.Unlock()
85-
86-
return len(h.inner)
87-
}
88-
89-
func NewSignal() *Broadcaster {
90-
return &Broadcaster{
91-
outer: make(chan Leadership),
80+
func NewBroadcaster[T any]() *Broadcaster[T] {
81+
return &Broadcaster[T]{
82+
outer: make(chan T),
9283
}
9384
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package leadership
2+
3+
import (
4+
"testing"
5+
"time"
6+
)
7+
8+
func TestBroadcaster(t *testing.T) {
9+
t.Parallel()
10+
11+
broadcaster := NewBroadcaster[struct{}]()
12+
t.Cleanup(broadcaster.Close)
13+
14+
const nbSubscriptions = 5
15+
16+
subscriptions := make([]<-chan struct{}, nbSubscriptions)
17+
releases := make([]func(), nbSubscriptions)
18+
19+
for i := 0; i < nbSubscriptions; i++ {
20+
subscriptions[i], releases[i] = broadcaster.Subscribe()
21+
}
22+
23+
go broadcaster.Broadcast(struct{}{})
24+
25+
for _, subscription := range subscriptions {
26+
select {
27+
case <-subscription:
28+
case <-time.After(time.Second):
29+
t.Fatal("timeout waiting for broadcast")
30+
}
31+
}
32+
33+
releases[2]()
34+
subscriptions = append(subscriptions[:2], subscriptions[3:]...)
35+
releases = append(releases[:2], releases[3:]...)
36+
37+
go func() {
38+
broadcaster.Broadcast(struct{}{})
39+
}()
40+
41+
for _, subscription := range subscriptions {
42+
select {
43+
case <-subscription:
44+
case <-time.After(time.Second):
45+
t.Fatal("timeout waiting for broadcast")
46+
}
47+
}
48+
49+
releases[0]()
50+
subscriptions = subscriptions[1:]
51+
releases = releases[1:]
52+
53+
go broadcaster.Broadcast(struct{}{})
54+
55+
for _, subscription := range subscriptions {
56+
select {
57+
case <-subscription:
58+
case <-time.After(time.Second):
59+
t.Fatal("timeout waiting for broadcast")
60+
}
61+
}
62+
63+
releases[2]()
64+
subscriptions = subscriptions[:2]
65+
66+
go broadcaster.Broadcast(struct{}{})
67+
68+
for _, subscription := range subscriptions {
69+
select {
70+
case <-subscription:
71+
case <-time.After(time.Second):
72+
t.Fatal("timeout waiting for broadcast")
73+
}
74+
}
75+
}

internal/leadership/mutex.go renamed to internal/leadership/database_handle.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,20 @@ import (
55
"sync"
66
)
77

8-
type Mutex struct {
8+
type DatabaseHandle struct {
99
*sync.Mutex
1010
db DBHandle
1111
}
1212

13-
func (m *Mutex) Exec(fn func(db bun.IDB)) {
13+
func (m *DatabaseHandle) Exec(fn func(db bun.IDB)) {
1414
m.Mutex.Lock()
1515
defer m.Mutex.Unlock()
1616

1717
fn(m.db)
1818
}
1919

20-
func NewMutex(db DBHandle) *Mutex {
21-
return &Mutex{
20+
func NewDatabaseHandle(db DBHandle) *DatabaseHandle {
21+
return &DatabaseHandle{
2222
Mutex: &sync.Mutex{},
2323
db: db,
2424
}

internal/leadership/leadership.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@ package leadership
22

33
type Leadership struct {
44
Acquired bool
5-
DB *Mutex
5+
DB *DatabaseHandle
66
}

internal/leadership/manager.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,15 @@ import (
99

1010
type Manager struct {
1111
locker Locker
12-
changes *Broadcaster
12+
changes *Broadcaster[Leadership]
1313
logger logging.Logger
1414
retryPeriod time.Duration
1515
stopChannel chan chan struct{}
1616
}
1717

1818
func (m *Manager) Run(ctx context.Context) {
1919
var (
20-
dbMutex *Mutex
20+
dbMutex *DatabaseHandle
2121
nextRetry = time.After(time.Duration(0))
2222
nextPing <-chan time.Time
2323
)
@@ -46,7 +46,7 @@ func (m *Manager) Run(ctx context.Context) {
4646
continue
4747
}
4848

49-
dbMutex = NewMutex(db)
49+
dbMutex = NewDatabaseHandle(db)
5050

5151
m.changes.Broadcast(Leadership{
5252
DB: dbMutex,
@@ -67,7 +67,7 @@ func (m *Manager) Run(ctx context.Context) {
6767
ColumnExpr("1 as v").
6868
Count(ctx)
6969
if err != nil {
70-
m.logger.Error("error pinging db", err)
70+
m.logger.Errorf("error pinging db: %s", err)
7171
_ = dbMutex.db.Close()
7272
dbMutex = nil
7373

@@ -87,6 +87,7 @@ func (m *Manager) Stop(ctx context.Context) error {
8787
select {
8888
// if already closed
8989
case <-m.stopChannel:
90+
m.changes.Close()
9091
return nil
9192
default:
9293
ch := make(chan struct{})
@@ -100,15 +101,15 @@ func (m *Manager) Stop(ctx context.Context) error {
100101
}
101102
}
102103

103-
func (m *Manager) GetSignal() *Broadcaster {
104+
func (m *Manager) GetBroadcaster() *Broadcaster[Leadership] {
104105
return m.changes
105106
}
106107

107108
func NewManager(locker Locker, logger logging.Logger, options ...Option) *Manager {
108109
l := &Manager{
109110
locker: locker,
110111
logger: logger,
111-
changes: NewSignal(),
112+
changes: NewBroadcaster[Leadership](),
112113
retryPeriod: 2 * time.Second,
113114
stopChannel: make(chan chan struct{}),
114115
}

internal/leadership/manager_test.go

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func TestLeaderShip(t *testing.T) {
3636
selectedLeader := -1
3737
require.Eventually(t, func() bool {
3838
for index, manager := range instances {
39-
actual := manager.GetSignal().Actual()
39+
actual := manager.GetBroadcaster().Actual()
4040
if actual.Acquired {
4141
selectedLeader = index
4242
return true
@@ -46,7 +46,7 @@ func TestLeaderShip(t *testing.T) {
4646
}, 2*time.Second, 10*time.Millisecond)
4747
leaderCount := 0
4848
for _, manager := range instances {
49-
if manager.GetSignal().Actual().Acquired {
49+
if manager.GetBroadcaster().Actual().Acquired {
5050
leaderCount++
5151
}
5252
}
@@ -55,7 +55,7 @@ func TestLeaderShip(t *testing.T) {
5555

5656
// ensure the provided db connection is still functional
5757
instances[selectedLeader].
58-
GetSignal().
58+
GetBroadcaster().
5959
Actual().DB.
6060
Exec(func(db bun.IDB) {
6161
require.NoError(t, db.
@@ -66,11 +66,44 @@ func TestLeaderShip(t *testing.T) {
6666
)
6767
})
6868

69+
// Stop the instance to trigger a new leader election
6970
require.NoError(t, instances[selectedLeader].Stop(ctx))
7071

7172
require.Eventually(t, func() bool {
7273
for index, manager := range instances {
73-
if manager.GetSignal().Actual().Acquired {
74+
if manager.GetBroadcaster().Actual().Acquired {
75+
selectedLeader = index
76+
return true
77+
}
78+
}
79+
return false
80+
}, 2*time.Second, 10*time.Millisecond)
81+
82+
broadcaster := instances[selectedLeader].GetBroadcaster()
83+
subscription, release := broadcaster.Subscribe()
84+
t.Cleanup(release)
85+
86+
// We will receive the leadership on the subscription
87+
select {
88+
case <-subscription:
89+
case <-time.After(time.Second):
90+
t.Fatal("timeout waiting for leadership acquirement")
91+
}
92+
93+
// Close the database connection of the actual leader to check the manager is able to detect the connection loss
94+
require.NoError(t, instances[selectedLeader].GetBroadcaster().Actual().DB.db.Close())
95+
96+
select {
97+
case leadership := <-subscription:
98+
require.Equal(t, Leadership{}, leadership)
99+
case <-time.After(time.Second):
100+
t.Fatal("timeout waiting for leadership loss")
101+
}
102+
release()
103+
104+
require.Eventually(t, func() bool {
105+
for index, manager := range instances {
106+
if manager.GetBroadcaster().Actual().Acquired {
74107
selectedLeader = index
75108
return true
76109
}

0 commit comments

Comments
 (0)