@@ -8,6 +8,10 @@ import (
8
8
"time"
9
9
10
10
vole "github.com/ipfs-shipyard/vole/lib"
11
+ bsmsg "github.com/ipfs/boxo/bitswap/message"
12
+ "github.com/ipfs/boxo/bitswap/message/pb"
13
+ "github.com/ipfs/boxo/bitswap/network"
14
+ "github.com/ipfs/boxo/bitswap/network/httpnet"
11
15
"github.com/ipfs/boxo/ipns"
12
16
"github.com/ipfs/boxo/routing/http/client"
13
17
"github.com/ipfs/boxo/routing/http/contentrouter"
@@ -19,6 +23,7 @@ import (
19
23
record "github.com/libp2p/go-libp2p-record"
20
24
"github.com/libp2p/go-libp2p/core/host"
21
25
"github.com/libp2p/go-libp2p/core/peer"
26
+ "github.com/libp2p/go-libp2p/core/peerstore"
22
27
"github.com/libp2p/go-libp2p/core/routing"
23
28
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
24
29
"github.com/multiformats/go-multiaddr"
@@ -140,17 +145,23 @@ type providerOutput struct {
140
145
Addrs []string
141
146
ConnectionMaddrs []string
142
147
DataAvailableOverBitswap BitswapCheckOutput
148
+ DataAvailableOverHTTP HTTPCheckOutput
143
149
Source string
144
150
}
145
151
146
152
// runCidCheck finds providers of a given CID, using the DHT and IPNI
147
153
// concurrently. A check of connectivity and Bitswap availability is performed
148
154
// for each provider found.
149
- func (d * daemon ) runCidCheck (ctx context.Context , cidKey cid.Cid , ipniURL string ) (cidCheckOutput , error ) {
155
+ func (d * daemon ) runCidCheck (ctx context.Context , cidKey cid.Cid , ipniURL string , httpRetrieval bool ) (cidCheckOutput , error ) {
156
+ protocols := defaultProtocolFilter
157
+ if httpRetrieval {
158
+ protocols = append (protocols , "transport-ipfs-gateway-http" )
159
+ }
160
+
150
161
crClient , err := client .New (ipniURL ,
151
- client .WithStreamResultsRequired (), // // https://specs.ipfs.tech/routing/http-routing-v1/#streaming
152
- client .WithProtocolFilter (defaultProtocolFilter ), // IPIP-484
153
- client .WithDisabledLocalFiltering (false ), // force local filtering in case remote server does not support IPIP-484
162
+ client .WithStreamResultsRequired (), // // https://specs.ipfs.tech/routing/http-routing-v1/#streaming
163
+ client .WithProtocolFilter (protocols ), // IPIP-484
164
+ client .WithDisabledLocalFiltering (false ), // force local filtering in case remote server does not support IPIP-484
154
165
)
155
166
if err != nil {
156
167
return nil , fmt .Errorf ("failed to create content router client: %w" , err )
@@ -210,6 +221,52 @@ func (d *daemon) runCidCheck(ctx context.Context, cidKey cid.Cid, ipniURL string
210
221
wg .Add (1 )
211
222
go func (provider peer.AddrInfo , src string ) {
212
223
defer wg .Done ()
224
+ // Get http retrieval out of the way if this is such
225
+ // provider.
226
+ httpInfo , otherInfo := network .SplitHTTPAddrs (provider )
227
+ if len (httpInfo .Addrs ) > 0 && httpRetrieval {
228
+
229
+ provOutput := providerOutput {
230
+ ID : provider .ID .String (),
231
+ DataAvailableOverBitswap : BitswapCheckOutput {
232
+ Error : "not a bitswap endpoint" ,
233
+ },
234
+ DataAvailableOverHTTP : HTTPCheckOutput {},
235
+ Source : src ,
236
+ }
237
+ for _ , ma := range httpInfo .Addrs {
238
+ provOutput .Addrs = append (provOutput .Addrs , ma .String ())
239
+ }
240
+
241
+ testHost , err := d .createTestHost ()
242
+ if err != nil {
243
+ log .Printf ("Error creating test host: %v\n " , err )
244
+ return
245
+ }
246
+ defer testHost .Close ()
247
+ httpCheck := checkHTTPRetrieval (ctx , testHost , cidKey , httpInfo )
248
+ provOutput .DataAvailableOverHTTP = httpCheck
249
+ if ! httpCheck .Connected {
250
+ provOutput .ConnectionError = httpCheck .Error
251
+ }
252
+ for _ , ma := range httpCheck .Endpoints {
253
+ provOutput .ConnectionMaddrs = append (provOutput .ConnectionMaddrs , ma .String ())
254
+ }
255
+
256
+ mu .Lock ()
257
+ out = append (out , provOutput )
258
+ mu .Unlock ()
259
+
260
+ // Do not continue processing if there are no
261
+ // other addresses as we would trigger dht
262
+ // lookups etc.
263
+ if len (otherInfo .Addrs ) == 0 {
264
+ return
265
+ }
266
+ }
267
+
268
+ // process non-http providers addresses.
269
+ provider = otherInfo
213
270
214
271
outputAddrs := []string {}
215
272
if len (provider .Addrs ) > 0 {
@@ -236,7 +293,10 @@ func (d *daemon) runCidCheck(ctx context.Context, cidKey cid.Cid, ipniURL string
236
293
ID : provider .ID .String (),
237
294
Addrs : outputAddrs ,
238
295
DataAvailableOverBitswap : BitswapCheckOutput {},
239
- Source : src ,
296
+ DataAvailableOverHTTP : HTTPCheckOutput {
297
+ Error : "not an HTTP endpoint" ,
298
+ },
299
+ Source : src ,
240
300
}
241
301
242
302
testHost , err := d .createTestHost ()
@@ -286,10 +346,17 @@ type peerCheckOutput struct {
286
346
ProviderRecordFromPeerInIPNI bool
287
347
ConnectionMaddrs []string
288
348
DataAvailableOverBitswap BitswapCheckOutput
349
+ DataAvailableOverHTTP HTTPCheckOutput
289
350
}
290
351
291
352
// runPeerCheck checks the connectivity and Bitswap availability of a CID from a given peer (either with just peer ID or specific multiaddr)
292
- func (d * daemon ) runPeerCheck (ctx context.Context , ma multiaddr.Multiaddr , ai * peer.AddrInfo , c cid.Cid , ipniURL string ) (* peerCheckOutput , error ) {
353
+ func (d * daemon ) runPeerCheck (ctx context.Context , ma multiaddr.Multiaddr , ai peer.AddrInfo , c cid.Cid , ipniURL string , httpRetrieval bool ) (* peerCheckOutput , error ) {
354
+ testHost , err := d .createTestHost ()
355
+ if err != nil {
356
+ return nil , fmt .Errorf ("server error: %w" , err )
357
+ }
358
+ defer testHost .Close ()
359
+
293
360
addrMap , peerAddrDHTErr := peerAddrsInDHT (ctx , d .dht , d .dhtMessenger , ai .ID )
294
361
295
362
var inDHT , inIPNI bool
@@ -303,6 +370,7 @@ func (d *daemon) runPeerCheck(ctx context.Context, ma multiaddr.Multiaddr, ai *p
303
370
inIPNI = providerRecordFromPeerInIPNI (ctx , ipniURL , c , ai .ID )
304
371
wg .Done ()
305
372
}()
373
+
306
374
wg .Wait ()
307
375
308
376
out := & peerCheckOutput {
@@ -311,38 +379,63 @@ func (d *daemon) runPeerCheck(ctx context.Context, ma multiaddr.Multiaddr, ai *p
311
379
PeerFoundInDHT : addrMap ,
312
380
}
313
381
382
+ // FIXME: without AcceleratedDHT client, we usually timeout the full
383
+ // operation context in the DHT steps. Provide early exit in that
384
+ // case.
385
+ if err := ctx .Err (); err != nil {
386
+ return out , err
387
+ }
388
+
389
+ httpInfo , otherInfo := network .SplitHTTPAddrs (ai )
390
+
391
+ // If they provided an http address and enabled retrieval, try that.
392
+ if len (httpInfo .Addrs ) > 0 && httpRetrieval {
393
+ httpCheck := checkHTTPRetrieval (ctx , testHost , c , httpInfo )
394
+ out .DataAvailableOverHTTP = httpCheck
395
+ if ! httpCheck .Connected {
396
+ out .ConnectionError = httpCheck .Error
397
+ }
398
+ for _ , ma := range httpCheck .Endpoints {
399
+ out .ConnectionMaddrs = append (out .ConnectionMaddrs , ma .String ())
400
+ }
401
+ out .DataAvailableOverBitswap = BitswapCheckOutput {
402
+ Error : "peer multiaddress is an HTTP endpoint" ,
403
+ }
404
+ return out , nil
405
+ }
406
+
407
+ out .DataAvailableOverHTTP = HTTPCheckOutput {
408
+ Error : "peer multiaddress is not an HTTP endpoint or HTTP check not enabled" ,
409
+ }
410
+
411
+ // non-http peers. Try to connect via p2p etc.
314
412
var connectionFailed bool
315
413
316
- // If peerID given,but no addresses check the DHT
317
- if len (ai .Addrs ) == 0 {
414
+ // If this is a non-HTTP peer, try with DHT addresses
415
+ if len (otherInfo .Addrs ) == 0 {
318
416
if peerAddrDHTErr != nil {
319
417
// PeerID is not resolvable via the DHT
320
418
connectionFailed = true
321
419
out .ConnectionError = peerAddrDHTErr .Error ()
322
420
}
421
+
323
422
for a := range addrMap {
324
423
ma , err := multiaddr .NewMultiaddr (a )
325
424
if err != nil {
326
425
log .Println (fmt .Errorf ("error parsing multiaddr %s: %w" , a , err ))
327
426
continue
328
427
}
329
- ai .Addrs = append (ai .Addrs , ma )
428
+ otherInfo .Addrs = append (otherInfo .Addrs , ma )
330
429
}
331
430
}
332
431
333
- testHost , err := d .createTestHost ()
334
- if err != nil {
335
- return nil , fmt .Errorf ("server error: %w" , err )
336
- }
337
- defer testHost .Close ()
338
-
339
432
if ! connectionFailed {
340
433
// Test Is the target connectable
341
434
dialCtx , dialCancel := context .WithTimeout (ctx , time .Second * 120 )
342
435
343
- _ = testHost .Connect (dialCtx , * ai )
436
+ _ = testHost .Connect (dialCtx , otherInfo )
344
437
// Call NewStream to force NAT hole punching. see https://github.com/libp2p/go-libp2p/issues/2714
345
- _ , connErr := testHost .NewStream (dialCtx , ai .ID , "/ipfs/bitswap/1.2.0" , "/ipfs/bitswap/1.1.0" , "/ipfs/bitswap/1.0.0" , "/ipfs/bitswap" )
438
+ _ , connErr := testHost .NewStream (dialCtx , otherInfo .ID , "/ipfs/bitswap/1.2.0" , "/ipfs/bitswap/1.1.0" , "/ipfs/bitswap/1.0.0" , "/ipfs/bitswap" )
346
439
dialCancel ()
347
440
if connErr != nil {
348
441
out .ConnectionError = connErr .Error ()
@@ -389,12 +482,118 @@ func checkBitswapCID(ctx context.Context, host host.Host, c cid.Cid, ma multiadd
389
482
return out
390
483
}
391
484
485
+ type HTTPCheckOutput struct {
486
+ Enabled bool
487
+ Duration time.Duration
488
+ Endpoints []multiaddr.Multiaddr
489
+ Connected bool
490
+ Requested bool
491
+ Found bool
492
+ Error string
493
+ }
494
+
495
+ type httpReceiver struct {
496
+ msgCh chan bsmsg.BitSwapMessage
497
+ errorCh chan error
498
+ }
499
+
500
+ func (recv * httpReceiver ) ReceiveMessage (ctx context.Context , sender peer.ID , incoming bsmsg.BitSwapMessage ) {
501
+ recv .msgCh <- incoming
502
+ }
503
+
504
+ func (recv * httpReceiver ) ReceiveError (err error ) {
505
+ recv .errorCh <- err
506
+ }
507
+
508
+ func (recv * httpReceiver ) PeerConnected (p peer.ID ) { // nop
509
+ }
510
+
511
+ func (recv * httpReceiver ) PeerDisconnected (p peer.ID ) { // nop
512
+ }
513
+
514
+ // FIXME: could expose this directly in Boxo.
515
+ func supportsHEAD (pstore peerstore.Peerstore , p peer.ID ) bool {
516
+ v , err := pstore .Get (p , "http-retrieval-head-support" )
517
+ if err != nil {
518
+ return false
519
+ }
520
+
521
+ b , ok := v .(bool )
522
+ return ok && b
523
+ }
524
+
525
+ func checkHTTPRetrieval (ctx context.Context , host host.Host , c cid.Cid , pinfo peer.AddrInfo ) HTTPCheckOutput {
526
+ log .Printf ("Start of HTTP check for cid %s by attempting to connect to %s" , c , pinfo )
527
+
528
+ out := HTTPCheckOutput {
529
+ Enabled : true ,
530
+ }
531
+
532
+ htnet := httpnet .New (host ,
533
+ httpnet .WithUserAgent (userAgent ),
534
+ httpnet .WithResponseHeaderTimeout (5 * time .Second ), // default: 10
535
+ httpnet .WithHTTPWorkers (1 ),
536
+ )
537
+ defer htnet .Stop ()
538
+
539
+ recv := httpReceiver {
540
+ msgCh : make (chan bsmsg.BitSwapMessage ),
541
+ errorCh : make (chan error ),
542
+ }
543
+ htnet .Start (& recv )
544
+
545
+ pid := pinfo .ID
546
+ err := htnet .Connect (ctx , pinfo )
547
+ defer htnet .DisconnectFrom (ctx , pid )
548
+ if err != nil {
549
+ log .Printf ("End of HTTP check for %s: %s" , c , err )
550
+ out .Error = err .Error ()
551
+ return out
552
+ }
553
+ out .Connected = true
554
+ out .Endpoints = host .Peerstore ().Addrs (pid )
555
+
556
+ if ! supportsHEAD (host .Peerstore (), pid ) {
557
+ log .Printf ("End of HTTP check for %s at %s: no support for HEAD requests" , c , pinfo )
558
+ out .Error = "HTTP endpoint does not support HEAD requests"
559
+ return out
560
+ }
561
+
562
+ // Now we are in a position of sending a HEAD request.
563
+ msg := bsmsg .New (true )
564
+ msg .AddEntry (c , 0 , pb .Message_Wantlist_Have , true )
565
+ start := time .Now ()
566
+ err = htnet .SendMessage (ctx , pid , msg )
567
+ out .Requested = true
568
+ if err != nil {
569
+ log .Printf ("End of HTTP check for %s at . Connected: true. Error: %s" , c , err )
570
+ out .Error = err .Error ()
571
+ return out
572
+ }
573
+
574
+ waitCtx , cancel := context .WithTimeout (ctx , 5 * time .Second )
575
+ defer cancel ()
576
+ select {
577
+ case <- waitCtx .Done ():
578
+ case msg := <- recv .msgCh :
579
+ if len (msg .Haves ()) > 0 {
580
+ out .Found = true
581
+ }
582
+
583
+ case err = <- recv .errorCh :
584
+ out .Error = err .Error ()
585
+ }
586
+
587
+ out .Duration = time .Since (start )
588
+ log .Printf ("End of HTTP check for %s at %s. Connected: true. Requested: true. Found: %t. Error: %s" , c , pinfo , out .Found , out .Error )
589
+ return out
590
+ }
591
+
392
592
func peerAddrsInDHT (ctx context.Context , d kademlia , messenger * dhtpb.ProtocolMessenger , p peer.ID ) (map [string ]int , error ) {
393
593
closestPeers , err := d .GetClosestPeers (ctx , string (p ))
394
594
if err != nil {
395
595
return nil , err
396
596
}
397
-
398
597
resCh := make (chan * peer.AddrInfo , len (closestPeers ))
399
598
400
599
numSuccessfulResponses := execOnMany (ctx , 0.3 , time .Second * 3 , func (ctx context.Context , peerToQuery peer.ID ) error {
0 commit comments