Skip to content

Commit b022f1b

Browse files
committed
feat: bitswap client
1 parent e2c6012 commit b022f1b

File tree

1 file changed

+192
-0
lines changed

1 file changed

+192
-0
lines changed

cmd/booster-bitswap/client.go

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"crypto/rand"
6+
"fmt"
7+
"net/http"
8+
_ "net/http/pprof"
9+
"sync/atomic"
10+
"time"
11+
12+
"github.com/filecoin-project/boost/tracing"
13+
lcli "github.com/filecoin-project/lotus/cli"
14+
"github.com/ipfs/go-bitswap/client"
15+
bsnetwork "github.com/ipfs/go-bitswap/network"
16+
"github.com/ipfs/go-cid"
17+
"github.com/ipfs/go-datastore"
18+
bstore "github.com/ipfs/go-ipfs-blockstore"
19+
nilrouting "github.com/ipfs/go-ipfs-routing/none"
20+
ipldlegacy "github.com/ipfs/go-ipld-legacy"
21+
"github.com/libp2p/go-libp2p"
22+
"github.com/libp2p/go-libp2p/core/crypto"
23+
"github.com/libp2p/go-libp2p/core/network"
24+
"github.com/libp2p/go-libp2p/core/peer"
25+
"github.com/libp2p/go-libp2p/p2p/muxer/mplex"
26+
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
27+
quic "github.com/libp2p/go-libp2p/p2p/transport/quic"
28+
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
29+
"github.com/pkg/profile"
30+
"github.com/urfave/cli/v2"
31+
"golang.org/x/sync/errgroup"
32+
)
33+
34+
var fetchCmd = &cli.Command{
35+
Name: "fetch",
36+
Usage: "fetch <multiaddr> <root cid>",
37+
Description: "Fetch all blocks in the DAG under the given root cid from the bitswap node at multiaddr",
38+
Before: before,
39+
Flags: []cli.Flag{
40+
&cli.BoolFlag{
41+
Name: "pprof",
42+
Usage: "run pprof web server on localhost:6070",
43+
},
44+
&cli.IntFlag{
45+
Name: "concurrency",
46+
Usage: "concurrent request limit - 0 means unlimited",
47+
Value: 10,
48+
},
49+
&cli.BoolFlag{
50+
Name: "tracing",
51+
Usage: "enables tracing of booster-bitswap calls",
52+
Value: false,
53+
},
54+
&cli.StringFlag{
55+
Name: "tracing-endpoint",
56+
Usage: "the endpoint for the tracing exporter",
57+
Value: "http://tempo:14268/api/traces",
58+
},
59+
},
60+
Action: func(cctx *cli.Context) error {
61+
if cctx.Args().Len() != 2 {
62+
return fmt.Errorf("usage: fetch <multiaddr> <cid>")
63+
}
64+
65+
addrInfoStr := cctx.Args().Get(0)
66+
serverAddrInfo, err := peer.AddrInfoFromString(addrInfoStr)
67+
if err != nil {
68+
return fmt.Errorf("parsing server multiaddr %s: %w", addrInfoStr, err)
69+
}
70+
71+
rootCidStr := cctx.Args().Get(1)
72+
rootCid, err := cid.Parse(rootCidStr)
73+
if err != nil {
74+
return fmt.Errorf("parsing cid %s: %w", rootCidStr, err)
75+
}
76+
77+
defer profile.Start(profile.TraceProfile, profile.ProfilePath(".")).Stop()
78+
79+
if cctx.Bool("pprof") {
80+
go func() {
81+
err := http.ListenAndServe("localhost:6065", nil)
82+
if err != nil {
83+
log.Error(err)
84+
}
85+
}()
86+
}
87+
88+
ctx := lcli.ReqContext(cctx)
89+
90+
// Instantiate the tracer and exporter
91+
if cctx.Bool("tracing") {
92+
tracingStopper, err := tracing.New("booster-bsclient", cctx.String("tracing-endpoint"))
93+
if err != nil {
94+
return fmt.Errorf("failed to instantiate tracer: %w", err)
95+
}
96+
log.Info("Tracing exporter enabled")
97+
98+
defer func() {
99+
_ = tracingStopper(ctx)
100+
}()
101+
}
102+
103+
// setup libp2p host
104+
log.Infow("generating libp2p key")
105+
privKey, _, err := crypto.GenerateECDSAKeyPair(rand.Reader)
106+
if err != nil {
107+
return err
108+
}
109+
110+
host, err := libp2p.New(
111+
libp2p.Transport(tcp.NewTCPTransport),
112+
libp2p.Transport(quic.NewTransport),
113+
libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport),
114+
libp2p.Muxer("/yamux/1.0.0", yamux.DefaultTransport),
115+
libp2p.Identity(privKey),
116+
libp2p.ResourceManager(network.NullResourceManager),
117+
)
118+
if err != nil {
119+
return err
120+
}
121+
122+
// Create a bitswap client
123+
nilRouter, err := nilrouting.ConstructNilRouting(ctx, nil, nil, nil)
124+
if err != nil {
125+
return err
126+
}
127+
net := bsnetwork.NewFromIpfsHost(host, nilRouter)
128+
bs := bstore.NewBlockstore(datastore.NewNullDatastore())
129+
bsClient := client.New(ctx, net, bs)
130+
net.Start(bsClient)
131+
132+
log.Infow("connecting to server", "server", serverAddrInfo.String())
133+
err = host.Connect(ctx, *serverAddrInfo)
134+
if err != nil {
135+
return fmt.Errorf("connecting to %s: %w", serverAddrInfo, err)
136+
}
137+
138+
var throttle chan struct{}
139+
concurrency := cctx.Int("concurrency")
140+
if concurrency > 0 {
141+
throttle = make(chan struct{}, concurrency)
142+
}
143+
144+
log.Infow("fetch", "cid", rootCid, "concurrency", concurrency)
145+
start := time.Now()
146+
count, size, err := getBlocks(ctx, bsClient, rootCid, throttle)
147+
if err != nil {
148+
return fmt.Errorf("getting blocks: %w", err)
149+
}
150+
151+
log.Infow("complete", "count", count, "size", size, "duration", time.Since(start))
152+
return nil
153+
},
154+
}
155+
156+
func getBlocks(ctx context.Context, bsClient *client.Client, c cid.Cid, throttle chan struct{}) (uint64, uint64, error) {
157+
if throttle != nil {
158+
throttle <- struct{}{}
159+
}
160+
// Get the block
161+
start := time.Now()
162+
blk, err := bsClient.GetBlock(ctx, c)
163+
if throttle != nil {
164+
<-throttle
165+
}
166+
if err != nil {
167+
return 0, 0, err
168+
}
169+
170+
var size = uint64(len(blk.RawData()))
171+
log.Debugw("receive", "cid", c, "size", size, "duration", time.Since(start))
172+
173+
// Read the links from the block to child nodes in the DAG
174+
var count = uint64(1)
175+
nd, err := ipldlegacy.DecodeNode(ctx, blk)
176+
var eg errgroup.Group
177+
lnks := nd.Links()
178+
for _, l := range lnks {
179+
l := l
180+
// Launch a go routine to fetch the blocks underneath each link
181+
eg.Go(func() error {
182+
cnt, sz, err := getBlocks(ctx, bsClient, l.Cid, throttle)
183+
if err != nil {
184+
return err
185+
}
186+
atomic.AddUint64(&count, cnt)
187+
atomic.AddUint64(&size, sz)
188+
return nil
189+
})
190+
}
191+
return count, size, eg.Wait()
192+
}

0 commit comments

Comments
 (0)