Skip to content

Commit f0726d2

Browse files
dirkmchannahhowardnonsenservagg
authored
bitswap client (#856)
* booster bitswap MVP executable (#707) * feat(booster-bitswap): booster bitswap MVP untested * refactor(booster-bitswap): use API for fetching blocks * fix(deps): update deps to compile * feat(booster-bitswap): makefile & fixes add commands to build booster-bitswap, and very a round tripped successful fetch from booster-bitswap * refactor: clean up unused vars etc * fix: booster-bitsawp - check error when creating libp2p key * refactor(node): avoid FreeAndUnsealed method Co-authored-by: Dirk McCormick <dirkmdev@gmail.com> Co-authored-by: Anton Evangelatov <anton.evangelatov@gmail.com> * booster-bitswap devnet and tracing (#796) * return ipld ErrNotFound from remote blockstore interface (#798) * fix: return ipld ErrNotFound from remote blockstore interface * test: add more tests for ipld ErrNotFound * test: comment out part of TestDummydealOnline that is flaky due to a bug in latest lotus (#802) * fix normaliseError nil ptr dereference (#803) * feat: shard selector (#807) * LoadBalancer for bitswap (and later, more of libp2p) (#786) * feat(loadbalancer): add message types * feat(messages): add utility functions * feat(loadbalancer): initial load balancer impl implementation of the load balancer node itself * feat(loadbalancer): add service node implements code for running a service node * feat(loadbalancer): integrate into boost and booster-bitswap * Update loadbalancer/loadbalancer.go Co-authored-by: Rod Vagg <rod@vagg.org> * Update loadbalancer/servicenode.go Co-authored-by: Rod Vagg <rod@vagg.org> * Update loadbalancer/servicenode.go Co-authored-by: Rod Vagg <rod@vagg.org> * Update loadbalancer/messages/messages.ipldsch Co-authored-by: Rod Vagg <rod@vagg.org> * Update loadbalancer/messages/messages.ipldsch Co-authored-by: Rod Vagg <rod@vagg.org> * refactor(loadbalancer): remove routing protocol remove the routing protocol, instead relying on a set config. also remove forwarding response for inbound requests * fix(loadbalancer): update tests * refactor(loadbalancer): integrate simplified load balancer removed pub keys to minimize network traffic, added api's to configure and update bitswap peer id, added auto config of bitswap peer id in booster-bitswap * docs(gen): regenerate api docs * chore(lint): fix lint errors * fix(loadbalancer): minor bridgestream fix * Update loadbalancer/servicenode.go Co-authored-by: dirkmc <dirkmdev@gmail.com> * refactor(protocolproxy): address PR comments renames, reconfigured architecture, etc * refactor(make init print out peer id): remove apis and transparent peer id setting. have init print Co-authored-by: Rod Vagg <rod@vagg.org> Co-authored-by: dirkmc <dirkmdev@gmail.com> * Add block filter via BadBits (#825) * feat(booster-bitswap): add block filter via BadBits * refactor(booster-bitswap): use bitswap blockfilter for filtering * feat(blockfilter): only update when list is modified * feat(blockFilter): add on disk caching * Update cmd/booster-bitswap/blockfilter/blockfilter.go Co-authored-by: dirkmc <dirkmdev@gmail.com> * fix(blockfilter): minor PR fixups Co-authored-by: dirkmc <dirkmdev@gmail.com> * Libp2p 0.22 upgrade (#837) * chore(deps): upgrade to Lotus RC & libp2p v0.22 * chore(deps): update go to 1.18 * ci(circle): update circle to go 1.18 * style(imports): fix imports * fix(build): update ffi * fix(lint): fix deprecated strings.Title method * fix(mod): mod tidy * Protocol Proxy cleanup (#836) * refactor(booster-bitswap): minor UI fixes for booster-bitswap UI * Update cmd/booster-bitswap/init.go Co-authored-by: dirkmc <dirkmdev@gmail.com> Co-authored-by: dirkmc <dirkmdev@gmail.com> * feat: update to dagstore v0.5.5 (#849) * feat: bitswap client * feat: bitswap client - output car file * refactor: bitswap client - remove tracing * feat: debug logs * fix: write blocks to blockstore * fix: duration output * fix: duration output for block received * feat: add pprof to bitswap client * feat: protocol proxy logging * feat: bitswap client - check host supports bitswap protocol * feat: listen for bitswap requests locally as well as through forwarding protocol Co-authored-by: Hannah Howard <hannah@hannahhoward.net> Co-authored-by: Anton Evangelatov <anton.evangelatov@gmail.com> Co-authored-by: Rod Vagg <rod@vagg.org>
1 parent 6e0ac5c commit f0726d2

File tree

8 files changed

+273
-31
lines changed

8 files changed

+273
-31
lines changed
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package bitswap
2+
3+
import (
4+
"github.com/ipfs/go-bitswap/network"
5+
"github.com/libp2p/go-libp2p/core/protocol"
6+
)
7+
8+
var Protocols = []protocol.ID{
9+
network.ProtocolBitswap,
10+
network.ProtocolBitswapNoVers,
11+
network.ProtocolBitswapOneOne,
12+
network.ProtocolBitswapOneZero,
13+
}
14+
15+
var ProtocolStrings = []string{}
16+
17+
func init() {
18+
for _, p := range Protocols {
19+
ProtocolStrings = append(ProtocolStrings, string(p))
20+
}
21+
}

cmd/booster-bitswap/client.go

Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"crypto/rand"
6+
"fmt"
7+
"net/http"
8+
_ "net/http/pprof"
9+
"sort"
10+
"sync/atomic"
11+
"time"
12+
13+
"github.com/filecoin-project/boost/cmd/booster-bitswap/bitswap"
14+
lcli "github.com/filecoin-project/lotus/cli"
15+
"github.com/ipfs/go-bitswap/client"
16+
bsnetwork "github.com/ipfs/go-bitswap/network"
17+
blocks "github.com/ipfs/go-block-format"
18+
"github.com/ipfs/go-cid"
19+
nilrouting "github.com/ipfs/go-ipfs-routing/none"
20+
ipldlegacy "github.com/ipfs/go-ipld-legacy"
21+
"github.com/ipld/go-car/v2/blockstore"
22+
"github.com/libp2p/go-libp2p"
23+
"github.com/libp2p/go-libp2p/core/crypto"
24+
"github.com/libp2p/go-libp2p/core/network"
25+
"github.com/libp2p/go-libp2p/core/peer"
26+
"github.com/libp2p/go-libp2p/p2p/muxer/mplex"
27+
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
28+
quic "github.com/libp2p/go-libp2p/p2p/transport/quic"
29+
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
30+
"github.com/urfave/cli/v2"
31+
"golang.org/x/sync/errgroup"
32+
)
33+
34+
var fetchCmd = &cli.Command{
35+
Name: "fetch",
36+
Usage: "fetch <multiaddr> <root cid> <output car path>",
37+
Description: "Fetch all blocks in the DAG under the given root cid from the bitswap node at multiaddr",
38+
Before: before,
39+
Flags: []cli.Flag{
40+
&cli.BoolFlag{
41+
Name: "pprof",
42+
Usage: "run pprof web server on localhost:6071",
43+
},
44+
&cli.IntFlag{
45+
Name: "concurrency",
46+
Usage: "concurrent request limit - 0 means unlimited",
47+
Value: 10,
48+
},
49+
},
50+
Action: func(cctx *cli.Context) error {
51+
if cctx.Bool("pprof") {
52+
go func() {
53+
err := http.ListenAndServe("localhost:6071", nil)
54+
if err != nil {
55+
log.Error(err)
56+
}
57+
}()
58+
}
59+
60+
if cctx.Args().Len() != 3 {
61+
return fmt.Errorf("usage: fetch <multiaddr> <root cid> <output car path>")
62+
}
63+
64+
addrInfoStr := cctx.Args().Get(0)
65+
serverAddrInfo, err := peer.AddrInfoFromString(addrInfoStr)
66+
if err != nil {
67+
return fmt.Errorf("parsing server multiaddr %s: %w", addrInfoStr, err)
68+
}
69+
70+
rootCidStr := cctx.Args().Get(1)
71+
rootCid, err := cid.Parse(rootCidStr)
72+
if err != nil {
73+
return fmt.Errorf("parsing cid %s: %w", rootCidStr, err)
74+
}
75+
76+
outputCarPath := cctx.Args().Get(2)
77+
78+
ctx := lcli.ReqContext(cctx)
79+
80+
// setup libp2p host
81+
log.Infow("generating libp2p key")
82+
privKey, _, err := crypto.GenerateECDSAKeyPair(rand.Reader)
83+
if err != nil {
84+
return err
85+
}
86+
87+
host, err := libp2p.New(
88+
libp2p.Transport(tcp.NewTCPTransport),
89+
libp2p.Transport(quic.NewTransport),
90+
libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport),
91+
libp2p.Muxer("/yamux/1.0.0", yamux.DefaultTransport),
92+
libp2p.Identity(privKey),
93+
libp2p.ResourceManager(network.NullResourceManager),
94+
)
95+
if err != nil {
96+
return err
97+
}
98+
99+
// Create a bitswap client
100+
nilRouter, err := nilrouting.ConstructNilRouting(ctx, nil, nil, nil)
101+
if err != nil {
102+
return err
103+
}
104+
net := bsnetwork.NewFromIpfsHost(host, nilRouter)
105+
bs, err := blockstore.OpenReadWrite(outputCarPath, []cid.Cid{rootCid}, blockstore.UseWholeCIDs(true))
106+
if err != nil {
107+
return fmt.Errorf("creating blockstore at %s: %w", outputCarPath, err)
108+
}
109+
110+
ctx, cancel := context.WithCancel(ctx)
111+
defer cancel()
112+
brn := &blockReceiver{bs: bs, ctx: ctx, cancel: cancel}
113+
bsClient := client.New(ctx, net, bs, client.WithBlockReceivedNotifier(brn))
114+
defer bsClient.Close()
115+
net.Start(bsClient)
116+
117+
// Connect to host
118+
connectStart := time.Now()
119+
log.Infow("connecting to server", "server", serverAddrInfo.String())
120+
err = host.Connect(ctx, *serverAddrInfo)
121+
if err != nil {
122+
return fmt.Errorf("connecting to %s: %w", serverAddrInfo, err)
123+
}
124+
log.Debugw("connected to server", "duration", time.Since(connectStart).String())
125+
126+
// Check host's libp2p protocols
127+
protos, err := host.Peerstore().GetProtocols(serverAddrInfo.ID)
128+
if err != nil {
129+
return fmt.Errorf("getting protocols from peer store for %s: %w", serverAddrInfo.ID, err)
130+
}
131+
sort.Slice(protos, func(i, j int) bool {
132+
return protos[i] < protos[j]
133+
})
134+
log.Debugw("host libp2p protocols", "protocols", protos)
135+
p, err := host.Peerstore().FirstSupportedProtocol(serverAddrInfo.ID, bitswap.ProtocolStrings...)
136+
if err != nil {
137+
return fmt.Errorf("getting first supported protocol from peer store for %s: %w", serverAddrInfo.ID, err)
138+
}
139+
if p == "" {
140+
return fmt.Errorf("host %s does not support any know bitswap protocols: %s", serverAddrInfo.ID, bitswap.ProtocolStrings)
141+
}
142+
143+
var throttle chan struct{}
144+
concurrency := cctx.Int("concurrency")
145+
if concurrency > 0 {
146+
throttle = make(chan struct{}, concurrency)
147+
}
148+
149+
// Fetch all blocks under the root cid
150+
log.Infow("fetch", "cid", rootCid, "concurrency", concurrency)
151+
start := time.Now()
152+
count, size, err := getBlocks(ctx, bsClient, rootCid, throttle)
153+
if err != nil {
154+
return fmt.Errorf("getting blocks: %w", err)
155+
}
156+
157+
log.Infow("fetch complete", "count", count, "size", size, "duration", time.Since(start).String())
158+
log.Debug("finalizing")
159+
finalizeStart := time.Now()
160+
defer func() { log.Infow("finalize complete", "duration", time.Since(finalizeStart).String()) }()
161+
return bs.Finalize()
162+
},
163+
}
164+
165+
func getBlocks(ctx context.Context, bsClient *client.Client, c cid.Cid, throttle chan struct{}) (uint64, uint64, error) {
166+
if throttle != nil {
167+
throttle <- struct{}{}
168+
}
169+
// Get the block
170+
start := time.Now()
171+
blk, err := bsClient.GetBlock(ctx, c)
172+
if throttle != nil {
173+
<-throttle
174+
}
175+
if err != nil {
176+
return 0, 0, err
177+
}
178+
179+
var size = uint64(len(blk.RawData()))
180+
log.Debugw("receive", "cid", c, "size", size, "duration", time.Since(start).String())
181+
182+
// Read the links from the block to child nodes in the DAG
183+
var count = uint64(1)
184+
nd, err := ipldlegacy.DecodeNode(ctx, blk)
185+
if err != nil {
186+
return 0, 0, fmt.Errorf("decoding node %s: %w", c, err)
187+
}
188+
189+
var eg errgroup.Group
190+
lnks := nd.Links()
191+
for _, l := range lnks {
192+
l := l
193+
// Launch a go routine to fetch the blocks underneath each link
194+
eg.Go(func() error {
195+
cnt, sz, err := getBlocks(ctx, bsClient, l.Cid, throttle)
196+
if err != nil {
197+
return err
198+
}
199+
atomic.AddUint64(&count, cnt)
200+
atomic.AddUint64(&size, sz)
201+
return nil
202+
})
203+
}
204+
205+
return count, size, eg.Wait()
206+
}
207+
208+
type blockReceiver struct {
209+
bs *blockstore.ReadWrite
210+
ctx context.Context
211+
cancel context.CancelFunc
212+
}
213+
214+
func (b blockReceiver) ReceivedBlocks(id peer.ID, blks []blocks.Block) {
215+
err := b.bs.PutMany(b.ctx, blks)
216+
if err != nil {
217+
log.Errorw("failed to write blocks to blockstore: %s", err)
218+
b.cancel()
219+
}
220+
}

cmd/booster-bitswap/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ func main() {
3131
Commands: []*cli.Command{
3232
initCmd,
3333
runCmd,
34+
fetchCmd,
3435
},
3536
}
3637
app.Setup()

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ require (
6363
github.com/ipfs/go-ipfs-files v0.1.1
6464
github.com/ipfs/go-ipfs-routing v0.2.1
6565
github.com/ipfs/go-ipld-format v0.4.0
66+
github.com/ipfs/go-ipld-legacy v0.1.1
6667
github.com/ipfs/go-log/v2 v2.5.1
6768
github.com/ipfs/go-merkledag v0.6.0
6869
github.com/ipfs/go-metrics-interface v0.0.1
@@ -217,7 +218,6 @@ require (
217218
github.com/ipfs/go-ipfs-pq v0.0.2 // indirect
218219
github.com/ipfs/go-ipfs-util v0.0.2 // indirect
219220
github.com/ipfs/go-ipld-cbor v0.0.6 // indirect
220-
github.com/ipfs/go-ipld-legacy v0.1.1 // indirect
221221
github.com/ipfs/go-ipns v0.2.0 // indirect
222222
github.com/ipfs/go-log v1.0.5 // indirect
223223
github.com/ipfs/go-path v0.3.0 // indirect

node/impl/boost_legacy.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"time"
99

1010
"github.com/filecoin-project/dagstore/shard"
11+
"github.com/multiformats/go-multihash"
12+
1113
"github.com/filecoin-project/go-address"
1214
datatransfer "github.com/filecoin-project/go-data-transfer"
1315
"github.com/filecoin-project/go-fil-markets/piecestore"
@@ -18,7 +20,6 @@ import (
1820
"github.com/filecoin-project/lotus/chain/types"
1921
"github.com/ipfs/go-cid"
2022
peer "github.com/libp2p/go-libp2p/core/peer"
21-
"github.com/multiformats/go-multihash"
2223
)
2324

2425
func (sm *BoostAPI) MarketListDataTransfers(ctx context.Context) ([]lapi.DataTransferChannel, error) {

node/modules/retrieval.go

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,25 +4,18 @@ import (
44
"context"
55
"fmt"
66

7+
"github.com/filecoin-project/boost/cmd/booster-bitswap/bitswap"
78
"github.com/filecoin-project/boost/node/config"
89
"github.com/filecoin-project/boost/protocolproxy"
910
"github.com/filecoin-project/boost/retrievalmarket/lp2pimpl"
1011
"github.com/filecoin-project/boost/retrievalmarket/types"
11-
"github.com/ipfs/go-bitswap/network"
1212
"github.com/libp2p/go-libp2p/core/host"
1313
"github.com/libp2p/go-libp2p/core/peer"
1414
"github.com/libp2p/go-libp2p/core/protocol"
1515
"github.com/multiformats/go-multiaddr"
1616
"go.uber.org/fx"
1717
)
1818

19-
var bitswapProtocols = []protocol.ID{
20-
network.ProtocolBitswap,
21-
network.ProtocolBitswapNoVers,
22-
network.ProtocolBitswapOneOne,
23-
network.ProtocolBitswapOneZero,
24-
}
25-
2619
func NewTransportsListener(cfg *config.Boost) func(h host.Host) (*lp2pimpl.TransportsListener, error) {
2720
return func(h host.Host) (*lp2pimpl.TransportsListener, error) {
2821
protos := []types.Protocol{}
@@ -84,7 +77,7 @@ func NewProtocolProxy(cfg *config.Boost) func(h host.Host) (*protocolproxy.Proto
8477
if err != nil {
8578
return nil, err
8679
}
87-
peerConfig[bsPeerID] = bitswapProtocols
80+
peerConfig[bsPeerID] = bitswap.Protocols
8881
}
8982
return protocolproxy.NewProtocolProxy(h, peerConfig)
9083
}
@@ -93,12 +86,12 @@ func NewProtocolProxy(cfg *config.Boost) func(h host.Host) (*protocolproxy.Proto
9386
func HandleProtocolProxy(lc fx.Lifecycle, pp *protocolproxy.ProtocolProxy) {
9487
lc.Append(fx.Hook{
9588
OnStart: func(ctx context.Context) error {
96-
log.Info("starting load balancer")
89+
log.Info("starting libp2p protocol proxy")
9790
pp.Start(ctx)
9891
return nil
9992
},
10093
OnStop: func(context.Context) error {
101-
log.Info("stopping load balancer")
94+
log.Info("stopping libp2p protocol proxy")
10295
pp.Close()
10396
return nil
10497
},

0 commit comments

Comments
 (0)