Skip to content

Commit 571a891

Browse files
committed
authmailbox: Subscribe method connects to fallback servers
Update MultiSubscription.Subscribe to also initiate subscriptions to fallback servers. These connections are established synchronously in a fire-and-forget non-blocking background routine.
1 parent 7a3f1e6 commit 571a891

File tree

1 file changed

+72
-6
lines changed

1 file changed

+72
-6
lines changed

authmailbox/multi_subscription.go

Lines changed: 72 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,20 @@ import (
55
"fmt"
66
"net/url"
77
"sync"
8+
"time"
89

910
"github.com/lightninglabs/taproot-assets/asset"
11+
"github.com/lightninglabs/taproot-assets/fn"
1012
lfn "github.com/lightningnetwork/lnd/fn/v2"
1113
"github.com/lightningnetwork/lnd/keychain"
1214
)
1315

16+
const (
17+
// DefaultTimeout is the default timeout we use for RPC and database
18+
// operations.
19+
DefaultTimeout = 30 * time.Second
20+
)
21+
1422
// clientSubscriptions holds the subscriptions and cancel functions for a
1523
// specific mailbox client.
1624
type clientSubscriptions struct {
@@ -131,6 +139,10 @@ type MultiSubscription struct {
131139
// message channel that can be used to receive messages from any
132140
// subscribed account, regardless of which mailbox server it belongs to.
133141
msgQueue *lfn.ConcurrentQueue[*ReceivedMessages]
142+
143+
// ContextGuard provides a wait group and main quit channel that can be
144+
// used to create guarded contexts.
145+
*fn.ContextGuard
134146
}
135147

136148
// MultiSubscriptionConfig holds the configuration parameters for creating a
@@ -154,15 +166,69 @@ func NewMultiSubscription(cfg MultiSubscriptionConfig) *MultiSubscription {
154166
cfg: cfg,
155167
registry: newClientRegistry(),
156168
msgQueue: queue,
169+
ContextGuard: &fn.ContextGuard{
170+
DefaultTimeout: DefaultTimeout,
171+
Quit: make(chan struct{}),
172+
},
173+
}
174+
}
175+
176+
// Subscribe adds a subscription for the given client URL and receiver key.
177+
// It launches a goroutine to asynchronously establish any fallback
178+
// subscriptions.
179+
func (m *MultiSubscription) Subscribe(ctx context.Context,
180+
primaryServerURL url.URL, receiverKey keychain.KeyDescriptor,
181+
filter MessageFilter) error {
182+
183+
// Attempt to subscribe to all fallback mailbox servers in parallel and
184+
// in a non-blocking manner.
185+
m.Goroutine(func() error {
186+
errMap, err := fn.ParSliceErrCollect(
187+
ctx, m.cfg.FallbackMboxURLs,
188+
func(ctx context.Context, serverURL url.URL) error {
189+
return m.establishSubscription(
190+
ctx, serverURL, receiverKey, filter,
191+
)
192+
},
193+
)
194+
if err != nil {
195+
return fmt.Errorf("parallel subscription attempt "+
196+
"failed: %w", err)
197+
}
198+
199+
for idx, subErr := range errMap {
200+
serverURL := m.cfg.FallbackMboxURLs[idx]
201+
202+
log.ErrorS(ctx, "Subscription to fallback server "+
203+
"failed", subErr, "server_addr",
204+
serverURL.String())
205+
}
206+
207+
return nil
208+
}, func(err error) {
209+
log.ErrorS(ctx, "Fallback server subscription goroutine "+
210+
"exited with error", err)
211+
})
212+
213+
// Subscribe to the primary mailbox server in a blocking manner. This
214+
// ensures that we have at least one active subscription before
215+
// returning.
216+
err := m.establishSubscription(
217+
ctx, primaryServerURL, receiverKey, filter,
218+
)
219+
if err != nil {
220+
return fmt.Errorf("primary server subscription failed: %w", err)
157221
}
222+
223+
return nil
158224
}
159225

160-
// Subscribe adds a new subscription for the specified client URL and receiver
161-
// key. It starts a new mailbox client if one does not already exist for the
162-
// given URL. The subscription will receive messages that match the provided
163-
// filter and will send them to the shared message queue.
164-
func (m *MultiSubscription) Subscribe(ctx context.Context, serverURL url.URL,
165-
receiverKey keychain.KeyDescriptor, filter MessageFilter) error {
226+
// establishSubscription synchronously subscribes to a server.
227+
// It creates a mailbox client for the URL if none exists.
228+
// The subscription routes messages matching the filter to the shared queue.
229+
func (m *MultiSubscription) establishSubscription(ctx context.Context,
230+
serverURL url.URL, receiverKey keychain.KeyDescriptor,
231+
filter MessageFilter) error {
166232

167233
// Get or create a client for the given server URL. This call is
168234
// thread-safe and will handle locking internally.

0 commit comments

Comments
 (0)