Skip to content
3 changes: 0 additions & 3 deletions authmailbox/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,6 @@ func (c *Client) connectAndAuthenticate(ctx context.Context,
msgChan chan<- *ReceivedMessages, acctKey keychain.KeyDescriptor,
filter MessageFilter) (*receiveSubscription, error) {

var receiverKey [33]byte
copy(receiverKey[:], acctKey.PubKey.SerializeCompressed())

// Before we can expect to receive any updates, we need to perform the
// 3-way authentication handshake.
sub := newReceiveSubscription(c.cfg, msgChan, acctKey, filter, c.client)
Expand Down
4 changes: 3 additions & 1 deletion authmailbox/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,9 @@ func TestServerClientAuthAndRestart(t *testing.T) {

// We also add a multi-subscription to the same two keys, so we can make
// sure we can receive messages from multiple clients at once.
multiSub := NewMultiSubscription(*clientCfg)
multiSub := NewMultiSubscription(MultiSubscriptionConfig{
BaseClientConfig: *clientCfg,
})
err := multiSub.Subscribe(
ctx, url.URL{Host: clientCfg.ServerAddress}, clientKey1, filter,
)
Expand Down
266 changes: 211 additions & 55 deletions authmailbox/multi_subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,20 @@ import (
"fmt"
"net/url"
"sync"
"time"

"github.com/lightninglabs/taproot-assets/asset"
"github.com/lightninglabs/taproot-assets/fn"
lfn "github.com/lightningnetwork/lnd/fn/v2"
"github.com/lightningnetwork/lnd/keychain"
)

const (
// DefaultTimeout is the default timeout we use for RPC and database
// operations.
DefaultTimeout = 30 * time.Second
)

// clientSubscriptions holds the subscriptions and cancel functions for a
// specific mailbox client.
type clientSubscriptions struct {
Expand All @@ -26,82 +34,229 @@ type clientSubscriptions struct {
cancels map[asset.SerializedKey]context.CancelFunc
}

// clientRegistry is a thread-safe registry for managing mailbox clients.
// It encapsulates the clients map and provides a safe API for accessing
// and modifying client subscriptions.
type clientRegistry struct {
sync.RWMutex

// clients holds the active mailbox clients, keyed by their server URL.
clients map[url.URL]*clientSubscriptions
}

// newClientRegistry creates a new client registry instance.
func newClientRegistry() *clientRegistry {
return &clientRegistry{
clients: make(map[url.URL]*clientSubscriptions),
}
}

// Get retrieves an existing client or creates a new one if it doesn't
// exist. It returns the client and a boolean indicating whether the client
// was newly created.
func (r *clientRegistry) Get(serverURL url.URL,
cfgCopy ClientConfig) (*clientSubscriptions, bool, error) {

r.Lock()
defer r.Unlock()

client, ok := r.clients[serverURL]
if ok {
return client, false, nil
}

// Create a new client connection.
cfgCopy.ServerAddress = serverURL.Host
mboxClient := NewClient(&cfgCopy)

client = &clientSubscriptions{
client: mboxClient,
subscriptions: make(
map[asset.SerializedKey]ReceiveSubscription,
),
cancels: make(
map[asset.SerializedKey]context.CancelFunc,
),
}
r.clients[serverURL] = client

return client, true, nil
}

// RemoveClient removes a client from the registry.
func (r *clientRegistry) RemoveClient(serverURL url.URL) {
r.Lock()
defer r.Unlock()

delete(r.clients, serverURL)
}

// AddSubscription adds a subscription and its cancel function to a client. If
// the client does not exist, an error is returned.
func (r *clientRegistry) AddSubscription(serverURL url.URL,
key asset.SerializedKey, subscription ReceiveSubscription,
cancel context.CancelFunc) error {

r.Lock()
defer r.Unlock()

client, ok := r.clients[serverURL]
if !ok {
return fmt.Errorf("no client found for %s", serverURL.String())
}

client.subscriptions[key] = subscription
client.cancels[key] = cancel

return nil
}

// ForEach executes a function for each client in the registry. The function
// receives a copy of the client subscriptions to avoid holding the lock
// during potentially long operations.
func (r *clientRegistry) ForEach(fn func(*clientSubscriptions)) {
r.RLock()
defer r.RUnlock()

for _, client := range r.clients {
fn(client)
}
}

// MultiSubscription is a subscription manager that can handle multiple mailbox
// clients, allowing subscriptions to different accounts across different
// mailbox servers. It manages subscriptions and message queues for each client
// and provides a unified interface for receiving messages.
type MultiSubscription struct {
// baseClientConfig holds the basic configuration for the mailbox
// clients. All fields except the ServerAddress are used to create
// new mailbox clients when needed.
baseClientConfig ClientConfig
// cfg holds the configuration for the MultiSubscription instance.
cfg MultiSubscriptionConfig

// clients holds the active mailbox clients, keyed by their server URL.
clients map[url.URL]*clientSubscriptions
// registry manages the active mailbox clients in a thread-safe manner.
registry *clientRegistry

// msgQueue is the concurrent queue that holds received messages from
// all subscriptions across all clients. This allows for a unified
// message channel that can be used to receive messages from any
// subscribed account, regardless of which mailbox server it belongs to.
msgQueue *lfn.ConcurrentQueue[*ReceivedMessages]

sync.RWMutex
// ContextGuard provides a wait group and main quit channel that can be
// used to create guarded contexts.
*fn.ContextGuard
}

// MultiSubscriptionConfig holds the configuration parameters for creating a
// MultiSubscription instance.
type MultiSubscriptionConfig struct {
// baseClientConfig holds the basic configuration for the mailbox
// clients. All fields except the ServerAddress are used to create
// new mailbox clients when needed.
BaseClientConfig ClientConfig

// FallbackMboxURLs are fallback proof courier AuthMailbox services.
FallbackMboxURLs []url.URL
}

// NewMultiSubscription creates a new MultiSubscription instance.
func NewMultiSubscription(baseClientConfig ClientConfig) *MultiSubscription {
func NewMultiSubscription(cfg MultiSubscriptionConfig) *MultiSubscription {
queue := lfn.NewConcurrentQueue[*ReceivedMessages](lfn.DefaultQueueSize)
queue.Start()

return &MultiSubscription{
baseClientConfig: baseClientConfig,
clients: make(map[url.URL]*clientSubscriptions),
msgQueue: queue,
cfg: cfg,
registry: newClientRegistry(),
msgQueue: queue,
ContextGuard: &fn.ContextGuard{
DefaultTimeout: DefaultTimeout,
Quit: make(chan struct{}),
},
}
}

// Subscribe adds a new subscription for the specified client URL and receiver
// key. It starts a new mailbox client if one does not already exist for the
// given URL. The subscription will receive messages that match the provided
// filter and will send them to the shared message queue.
func (m *MultiSubscription) Subscribe(ctx context.Context, serverURL url.URL,
receiverKey keychain.KeyDescriptor, filter MessageFilter) error {
// Subscribe adds a subscription for the given client URL and receiver key.
// It launches a goroutine to asynchronously establish any fallback
// subscriptions.
func (m *MultiSubscription) Subscribe(ctx context.Context,
primaryServerURL url.URL, receiverKey keychain.KeyDescriptor,
filter MessageFilter) error {

// We hold the mutex for access to common resources.
m.Lock()
cfgCopy := m.baseClientConfig
client, ok := m.clients[serverURL]
// Attempt to subscribe to all fallback mailbox servers in parallel and
// in a non-blocking manner.
m.Goroutine(func() error {
errMap, err := fn.ParSliceErrCollect(
ctx, m.cfg.FallbackMboxURLs,
func(ctx context.Context, serverURL url.URL) error {
return m.establishSubscription(
ctx, serverURL, receiverKey, filter,
)
},
)
if err != nil {
return fmt.Errorf("parallel subscription attempt "+
"failed: %w", err)
}

// If this is the first time we're seeing a server URL, we first create
// a network connection to the mailbox server.
if !ok {
cfgCopy.ServerAddress = serverURL.Host

mboxClient := NewClient(&cfgCopy)
client = &clientSubscriptions{
client: mboxClient,
subscriptions: make(
map[asset.SerializedKey]ReceiveSubscription,
),
cancels: make(
map[asset.SerializedKey]context.CancelFunc,
),
for idx, subErr := range errMap {
serverURL := m.cfg.FallbackMboxURLs[idx]

log.ErrorS(ctx, "Subscription to fallback server "+
"failed", subErr, "server_addr",
serverURL.String())
}
m.clients[serverURL] = client

err := mboxClient.Start()
return nil
}, func(err error) {
log.ErrorS(ctx, "Fallback server subscription goroutine "+
"exited with error", err)
})

// Subscribe to the primary mailbox server in a blocking manner. This
// ensures that we have at least one active subscription before
// returning.
err := m.establishSubscription(
ctx, primaryServerURL, receiverKey, filter,
)
if err != nil {
return fmt.Errorf("primary server subscription failed: %w", err)
}

return nil
}

// establishSubscription synchronously subscribes to a server.
// It creates a mailbox client for the URL if none exists.
// The subscription routes messages matching the filter to the shared queue.
func (m *MultiSubscription) establishSubscription(ctx context.Context,
serverURL url.URL, receiverKey keychain.KeyDescriptor,
filter MessageFilter) error {

// Get or create a client for the given server URL. This call is
// thread-safe and will handle locking internally.
cfgCopy := m.cfg.BaseClientConfig
client, isNewClient, err := m.registry.Get(serverURL, cfgCopy)
if err != nil {
return err
}

// Start the mailbox client if it's not already started. This is safe to
// do without holding any locks since the client itself manages its own
// state.
if isNewClient {
log.Debugf("Starting new mailbox client for %s",
serverURL.String())

err = client.client.Start()
if err != nil {
m.Unlock()
return fmt.Errorf("unable to create mailbox client: %w",
// Remove the client from the map if we failed to start
// it.
m.registry.RemoveClient(serverURL)
return fmt.Errorf("unable to start mailbox client: %w",
err)
}
}

// We release the lock here again, because StartAccountSubscription
// might block for a while, and we don't want to hold the lock
// unnecessarily long.
m.Unlock()

// Start the subscription. We don't hold any locks during this call
// since StartAccountSubscription might block for a while.
ctx, cancel := context.WithCancel(ctx)
subscription, err := client.client.StartAccountSubscription(
ctx, m.msgQueue.ChanIn(), receiverKey, filter,
Expand All @@ -112,13 +267,15 @@ func (m *MultiSubscription) Subscribe(ctx context.Context, serverURL url.URL,
err)
}

// We hold the lock again to safely add the subscription and cancel
// function to the client's maps.
m.Lock()
// Add the subscription and cancel function to the client's maps.
// This is thread-safe and handled internally by the registry.
key := asset.ToSerialized(receiverKey.PubKey)
client.subscriptions[key] = subscription
client.cancels[key] = cancel
m.Unlock()
err = m.registry.AddSubscription(serverURL, key, subscription, cancel)
if err != nil {
cancel()
return fmt.Errorf("unable to add subscription to registry: %w",
err)
}

return nil
}
Expand All @@ -138,11 +295,10 @@ func (m *MultiSubscription) Stop() error {

log.Info("Stopping all mailbox clients and subscriptions...")

m.RLock()
defer m.RUnlock()

var lastErr error
for _, client := range m.clients {

// Iterate through all clients in a thread-safe manner and stop them.
m.registry.ForEach(func(client *clientSubscriptions) {
for _, cancel := range client.cancels {
cancel()
}
Expand All @@ -160,7 +316,7 @@ func (m *MultiSubscription) Stop() error {
log.Errorf("Error stopping client: %v", err)
lastErr = err
}
}
})

return lastErr
}
Loading
Loading