-
Notifications
You must be signed in to change notification settings - Fork 116
feat: add leadership package #661
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a1b7c0e
d758b47
d9778e3
3827711
ef690f0
279ff6f
1d6200a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,2 @@ | ||
//go:generate gomarkdoc -o README.md --repository.default-branch main | ||
//go:generate gomarkdoc -o README.md --repository.default-branch main --repository.url https://github.com/formancehq/ledger | ||
package ledger |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,84 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
package leadership | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
import ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
"sync" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
type listener[T any] struct { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
channel chan T | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
type Broadcaster[T any] struct { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
mu sync.Mutex | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
t *T | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
inner []listener[T] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
outer chan T | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
func (h *Broadcaster[T]) Actual() T { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
h.mu.Lock() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
defer h.mu.Unlock() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if h.t == nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
var t T | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return t | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return *h.t | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
func (h *Broadcaster[T]) Subscribe() (<-chan T, func()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
h.mu.Lock() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
defer h.mu.Unlock() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
newChannel := make(chan T, 1) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
l := listener[T]{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
channel: newChannel, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
h.inner = append(h.inner, l) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if h.t != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
newChannel <- *h.t | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return newChannel, func() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
h.mu.Lock() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
defer h.mu.Unlock() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for index, listener := range h.inner { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if listener == l { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
if index < len(h.inner)-1 { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
h.inner = append(h.inner[:index], h.inner[index+1:]...) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
h.inner = h.inner[:index] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
break | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+30
to
+58
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Optimize subscription cleanup. The unsubscribe function has several areas for improvement:
Consider this improved implementation: func (h *Broadcaster[T]) Subscribe() (<-chan T, func()) {
h.mu.Lock()
defer h.mu.Unlock()
newChannel := make(chan T, 1)
l := listener[T]{
channel: newChannel,
}
h.inner = append(h.inner, l)
if h.t != nil {
newChannel <- *h.t
}
+ var once sync.Once
return newChannel, func() {
+ once.Do(func() {
h.mu.Lock()
defer h.mu.Unlock()
for index, listener := range h.inner {
if listener == l {
- if index < len(h.inner)-1 {
- h.inner = append(h.inner[:index], h.inner[index+1:]...)
- } else {
- h.inner = h.inner[:index]
- }
+ // Use copy for better performance
+ copy(h.inner[index:], h.inner[index+1:])
+ h.inner = h.inner[:len(h.inner)-1]
break
}
}
+ })
}
} 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
func (h *Broadcaster[T]) Broadcast(t T) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
h.mu.Lock() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
defer h.mu.Unlock() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
h.t = &t | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for _, inner := range h.inner { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
inner.channel <- t | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+60
to
+69
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Prevent blocking on slow consumers. The current implementation could block if any subscriber is slow to consume messages. Consider using non-blocking sends to prevent this. func (h *Broadcaster[T]) Broadcast(t T) {
h.mu.Lock()
defer h.mu.Unlock()
h.t = &t
for _, inner := range h.inner {
- inner.channel <- t
+ select {
+ case inner.channel <- t:
+ default:
+ // Skip slow consumers
+ }
}
} 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
func (h *Broadcaster[T]) Close() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
h.mu.Lock() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
defer h.mu.Unlock() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
for _, inner := range h.inner { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
close(inner.channel) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
func NewBroadcaster[T any]() *Broadcaster[T] { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
return &Broadcaster[T]{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
outer: make(chan T), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
package leadership | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
) | ||
|
||
func TestBroadcaster(t *testing.T) { | ||
t.Parallel() | ||
|
||
broadcaster := NewBroadcaster[struct{}]() | ||
t.Cleanup(broadcaster.Close) | ||
|
||
const nbSubscriptions = 5 | ||
|
||
subscriptions := make([]<-chan struct{}, nbSubscriptions) | ||
releases := make([]func(), nbSubscriptions) | ||
|
||
for i := 0; i < nbSubscriptions; i++ { | ||
subscriptions[i], releases[i] = broadcaster.Subscribe() | ||
} | ||
|
||
go broadcaster.Broadcast(struct{}{}) | ||
|
||
for _, subscription := range subscriptions { | ||
select { | ||
case <-subscription: | ||
case <-time.After(time.Second): | ||
t.Fatal("timeout waiting for broadcast") | ||
} | ||
} | ||
|
||
releases[2]() | ||
subscriptions = append(subscriptions[:2], subscriptions[3:]...) | ||
releases = append(releases[:2], releases[3:]...) | ||
|
||
go broadcaster.Broadcast(struct{}{}) | ||
|
||
for _, subscription := range subscriptions { | ||
select { | ||
case <-subscription: | ||
case <-time.After(time.Second): | ||
t.Fatal("timeout waiting for broadcast") | ||
} | ||
} | ||
|
||
releases[0]() | ||
subscriptions = subscriptions[1:] | ||
releases = releases[1:] | ||
|
||
go broadcaster.Broadcast(struct{}{}) | ||
|
||
for _, subscription := range subscriptions { | ||
select { | ||
case <-subscription: | ||
case <-time.After(time.Second): | ||
t.Fatal("timeout waiting for broadcast") | ||
} | ||
} | ||
|
||
releases[2]() | ||
subscriptions = subscriptions[:2] | ||
|
||
go broadcaster.Broadcast(struct{}{}) | ||
|
||
for _, subscription := range subscriptions { | ||
select { | ||
case <-subscription: | ||
case <-time.After(time.Second): | ||
t.Fatal("timeout waiting for broadcast") | ||
} | ||
} | ||
} | ||
Comment on lines
+8
to
+73
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Enhance test coverage. While the current test covers basic functionality, consider adding tests for:
Here's an example of additional test cases: func TestBroadcasterConcurrent(t *testing.T) {
t.Parallel()
broadcaster := NewBroadcaster[int]()
t.Cleanup(broadcaster.Close)
const nbGoroutines = 10
done := make(chan struct{})
// Start multiple goroutines that subscribe and unsubscribe
for i := 0; i < nbGoroutines; i++ {
go func() {
defer func() { done <- struct{}{} }()
ch, cancel := broadcaster.Subscribe()
defer cancel()
// Receive some messages
for j := 0; j < 5; j++ {
select {
case <-ch:
case <-time.After(time.Second):
t.Error("timeout waiting for broadcast")
}
}
}()
}
// Broadcast messages while goroutines are subscribing/unsubscribing
for i := 0; i < 10; i++ {
broadcaster.Broadcast(i)
}
// Wait for all goroutines to finish
for i := 0; i < nbGoroutines; i++ {
<-done
}
}
func TestBroadcasterSlowConsumer(t *testing.T) {
t.Parallel()
broadcaster := NewBroadcaster[int]()
t.Cleanup(broadcaster.Close)
// Create a slow consumer
ch, cancel := broadcaster.Subscribe()
defer cancel()
// Broadcast should not block even if consumer is slow
for i := 0; i < 100; i++ {
broadcaster.Broadcast(i)
}
// Verify we can still receive the latest value
select {
case v := <-ch:
if v != 99 {
t.Errorf("expected 99, got %d", v)
}
case <-time.After(time.Second):
t.Error("timeout waiting for broadcast")
}
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
package leadership | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
) | ||
|
||
type contextKey struct{} | ||
|
||
var holderContextKey contextKey = struct{}{} | ||
|
||
func ContextWithLeadershipInfo(ctx context.Context) context.Context { | ||
return context.WithValue(ctx, holderContextKey, &holder{}) | ||
} | ||
|
||
func IsLeader(ctx context.Context) bool { | ||
h := ctx.Value(holderContextKey) | ||
if h == nil { | ||
return false | ||
} | ||
holder := h.(*holder) | ||
holder.Lock() | ||
defer holder.Unlock() | ||
|
||
return holder.isLeader | ||
} | ||
|
||
func setIsLeader(ctx context.Context, isLeader bool) { | ||
h := ctx.Value(holderContextKey) | ||
if h == nil { | ||
return | ||
} | ||
holder := h.(*holder) | ||
holder.Lock() | ||
defer holder.Unlock() | ||
|
||
holder.isLeader = isLeader | ||
} | ||
|
||
type holder struct { | ||
sync.Mutex | ||
isLeader bool | ||
} |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,25 @@ | ||||||||||||||||||||||||||||||||
package leadership | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
import ( | ||||||||||||||||||||||||||||||||
"github.com/uptrace/bun" | ||||||||||||||||||||||||||||||||
"sync" | ||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
type DatabaseHandle struct { | ||||||||||||||||||||||||||||||||
*sync.Mutex | ||||||||||||||||||||||||||||||||
db DBHandle | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
Comment on lines
+8
to
+11
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Define or import the DBHandle type. The Please either:
🛠️ Refactor suggestion Avoid embedding sync.Mutex to prevent exposing Lock/Unlock methods. Embedding type DatabaseHandle struct {
- *sync.Mutex
+ mu *sync.Mutex
db DBHandle
} 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
func (m *DatabaseHandle) Exec(fn func(db bun.IDB)) { | ||||||||||||||||||||||||||||||||
m.Mutex.Lock() | ||||||||||||||||||||||||||||||||
defer m.Mutex.Unlock() | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
fn(m.db) | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
Comment on lines
+13
to
+18
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Add context and error handling support to the Exec method. The current implementation has several limitations:
Consider this improved implementation: -func (m *DatabaseHandle) Exec(fn func(db bun.IDB)) {
+func (m *DatabaseHandle) Exec(ctx context.Context, fn func(db bun.IDB) error) error {
m.Mutex.Lock()
defer m.Mutex.Unlock()
- fn(m.db)
+ if err := fn(m.db); err != nil {
+ return fmt.Errorf("database operation failed: %w", err)
+ }
+ return nil
} 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
func NewDatabaseHandle(db DBHandle) *DatabaseHandle { | ||||||||||||||||||||||||||||||||
return &DatabaseHandle{ | ||||||||||||||||||||||||||||||||
Mutex: &sync.Mutex{}, | ||||||||||||||||||||||||||||||||
db: db, | ||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
package leadership | ||
|
||
type Leadership struct { | ||
Acquired bool | ||
DB *DatabaseHandle | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
package leadership | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"github.com/uptrace/bun" | ||
) | ||
|
||
const leadershipAdvisoryLockKey = 123456789 | ||
|
||
type DBHandle interface { | ||
bun.IDB | ||
Close() error | ||
} | ||
|
||
// Locker take a lock at process level | ||
// It returns a bun.IDB which MUST be invalidated when the lock is lost | ||
type Locker interface { | ||
Take(ctx context.Context) (DBHandle, error) | ||
} | ||
|
||
type defaultLocker struct { | ||
db *bun.DB | ||
} | ||
|
||
func (p *defaultLocker) Take(ctx context.Context) (DBHandle, error) { | ||
conn, err := p.db.Conn(ctx) | ||
if err != nil { | ||
return nil, fmt.Errorf("error opening new connection: %w", err) | ||
} | ||
|
||
ret := conn.QueryRowContext(ctx, "select pg_try_advisory_lock(?)", leadershipAdvisoryLockKey) | ||
if ret.Err() != nil { | ||
_ = conn.Close() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Handle connection close errors. The error from - _ = conn.Close()
+ if closeErr := conn.Close(); closeErr != nil {
+ return nil, fmt.Errorf("error closing connection after lock acquisition failure: %w (original error: %v)", closeErr, err)
+ } Also applies to: 40-40, 45-45 |
||
return nil, fmt.Errorf("error acquiring lock: %w", ret.Err()) | ||
} | ||
|
||
var acquired bool | ||
if err := ret.Scan(&acquired); err != nil { | ||
_ = conn.Close() | ||
return nil, err | ||
} | ||
|
||
if !acquired { | ||
_ = conn.Close() | ||
return nil, nil | ||
} | ||
|
||
return conn, nil | ||
} | ||
|
||
func NewDefaultLocker(db *bun.DB) Locker { | ||
return &defaultLocker{db: db} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
//go:build it | ||
|
||
package leadership | ||
|
||
import ( | ||
. "github.com/formancehq/go-libs/v2/testing/utils" | ||
"testing" | ||
|
||
"github.com/formancehq/go-libs/v2/logging" | ||
"github.com/formancehq/go-libs/v2/testing/docker" | ||
"github.com/formancehq/go-libs/v2/testing/platform/pgtesting" | ||
) | ||
|
||
var ( | ||
srv *pgtesting.PostgresServer | ||
) | ||
|
||
func TestMain(m *testing.M) { | ||
WithTestMain(func(t *TestingTForMain) int { | ||
srv = pgtesting.CreatePostgresServer(t, docker.NewPool(t, logging.Testing()), pgtesting.WithExtension("pgcrypto")) | ||
|
||
return m.Run() | ||
}) | ||
} |
Uh oh!
There was an error while loading. Please reload this page.