@@ -5,12 +5,20 @@ import (
55 "fmt"
66 "net/url"
77 "sync"
8+ "time"
89
910 "github.com/lightninglabs/taproot-assets/asset"
11+ "github.com/lightninglabs/taproot-assets/fn"
1012 lfn "github.com/lightningnetwork/lnd/fn/v2"
1113 "github.com/lightningnetwork/lnd/keychain"
1214)
1315
16+ const (
17+ // DefaultTimeout is the default timeout we use for RPC and database
18+ // operations.
19+ DefaultTimeout = 30 * time .Second
20+ )
21+
1422// clientSubscriptions holds the subscriptions and cancel functions for a
1523// specific mailbox client.
1624type clientSubscriptions struct {
@@ -131,6 +139,10 @@ type MultiSubscription struct {
131139 // message channel that can be used to receive messages from any
132140 // subscribed account, regardless of which mailbox server it belongs to.
133141 msgQueue * lfn.ConcurrentQueue [* ReceivedMessages ]
142+
143+ // ContextGuard provides a wait group and main quit channel that can be
144+ // used to create guarded contexts.
145+ * fn.ContextGuard
134146}
135147
136148// MultiSubscriptionConfig holds the configuration parameters for creating a
@@ -154,15 +166,64 @@ func NewMultiSubscription(cfg MultiSubscriptionConfig) *MultiSubscription {
154166 cfg : cfg ,
155167 registry : newClientRegistry (),
156168 msgQueue : queue ,
169+ ContextGuard : & fn.ContextGuard {
170+ DefaultTimeout : DefaultTimeout ,
171+ Quit : make (chan struct {}),
172+ },
157173 }
158174}
159175
160176// Subscribe adds a new subscription for the specified client URL and receiver
161- // key. It starts a new mailbox client if one does not already exist for the
162- // given URL. The subscription will receive messages that match the provided
163- // filter and will send them to the shared message queue.
164- func (m * MultiSubscription ) Subscribe (ctx context.Context , serverURL url.URL ,
165- receiverKey keychain.KeyDescriptor , filter MessageFilter ) error {
177+ // key in a non-blocking manner. It starts a goroutine that will establish the
178+ // subscription asynchronously.
179+ func (m * MultiSubscription ) Subscribe (ctx context.Context ,
180+ primaryServerURL url.URL , receiverKey keychain.KeyDescriptor ,
181+ filter MessageFilter ) error {
182+
183+ // Formulate the list of server URLs to attempt subscription to. This
184+ // includes the primary server URL followed by any fallback URLs.
185+ mboxServerURLs := append (
186+ []url.URL {primaryServerURL }, m .cfg .FallbackMboxURLs ... ,
187+ )
188+
189+ // Attempt to subscribe to all mailbox servers in parallel and in a
190+ // non-blocking manner.
191+ m .Goroutine (func () error {
192+ errMap , err := fn .ParSliceErrCollect (
193+ ctx , mboxServerURLs ,
194+ func (ctx context.Context , serverURL url.URL ) error {
195+ return m .subscribeBlocking (
196+ ctx , serverURL , receiverKey , filter ,
197+ )
198+ },
199+ )
200+ if err != nil {
201+ return fmt .Errorf ("parallel subscription attempt " +
202+ "failed: %w" , err )
203+ }
204+
205+ for idx , subErr := range errMap {
206+ serverURL := mboxServerURLs [idx ]
207+
208+ log .ErrorS (ctx , "Subscription to server failed" , subErr ,
209+ "server_addr" , serverURL .String ())
210+ }
211+
212+ return nil
213+ }, func (err error ) {
214+ log .ErrorS (ctx , "Subscription goroutine exited with error" , err )
215+ })
216+
217+ return nil
218+ }
219+
220+ // subscribeBlocking is the blocking implementation of Subscribe. It starts a
221+ // new mailbox client if one does not already exist for the given URL. The
222+ // subscription will receive messages that match the provided filter and will
223+ // send them to the shared message queue.
224+ func (m * MultiSubscription ) subscribeBlocking (ctx context.Context ,
225+ serverURL url.URL , receiverKey keychain.KeyDescriptor ,
226+ filter MessageFilter ) error {
166227
167228 // Get or create a client for the given server URL. This call is
168229 // thread-safe and will handle locking internally.
0 commit comments