- 
                Notifications
    You must be signed in to change notification settings 
- Fork 76
bitswap client #856
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
bitswap client #856
Changes from all commits
85b7358
              98c2e19
              14ed4a5
              87dc643
              5ddd9de
              e0e1d73
              58163ca
              a58ea5a
              c9d91e6
              0e75016
              957e65d
              e2c6012
              e3557d7
              5b425ea
              e9da747
              0bd7736
              1cfa691
              d3b5593
              f2f9729
              55c278d
              c1f57f3
              c4c3f1d
              bf618c6
              f1a1ccd
              f4246da
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| package bitswap | ||
|  | ||
| import ( | ||
| "github.com/ipfs/go-bitswap/network" | ||
| "github.com/libp2p/go-libp2p/core/protocol" | ||
| ) | ||
|  | ||
| var Protocols = []protocol.ID{ | ||
| network.ProtocolBitswap, | ||
| network.ProtocolBitswapNoVers, | ||
| network.ProtocolBitswapOneOne, | ||
| network.ProtocolBitswapOneZero, | ||
| } | ||
|  | ||
| var ProtocolStrings = []string{} | ||
|  | ||
| func init() { | ||
| for _, p := range Protocols { | ||
| ProtocolStrings = append(ProtocolStrings, string(p)) | ||
| } | ||
| } | 
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,220 @@ | ||
| package main | ||
|  | ||
| import ( | ||
| "context" | ||
| "crypto/rand" | ||
| "fmt" | ||
| "net/http" | ||
| _ "net/http/pprof" | ||
| "sort" | ||
| "sync/atomic" | ||
| "time" | ||
|  | ||
| "github.com/filecoin-project/boost/cmd/booster-bitswap/bitswap" | ||
| lcli "github.com/filecoin-project/lotus/cli" | ||
| "github.com/ipfs/go-bitswap/client" | ||
| bsnetwork "github.com/ipfs/go-bitswap/network" | ||
| blocks "github.com/ipfs/go-block-format" | ||
| "github.com/ipfs/go-cid" | ||
| nilrouting "github.com/ipfs/go-ipfs-routing/none" | ||
| ipldlegacy "github.com/ipfs/go-ipld-legacy" | ||
| "github.com/ipld/go-car/v2/blockstore" | ||
| "github.com/libp2p/go-libp2p" | ||
| "github.com/libp2p/go-libp2p/core/crypto" | ||
| "github.com/libp2p/go-libp2p/core/network" | ||
| "github.com/libp2p/go-libp2p/core/peer" | ||
| "github.com/libp2p/go-libp2p/p2p/muxer/mplex" | ||
| "github.com/libp2p/go-libp2p/p2p/muxer/yamux" | ||
| quic "github.com/libp2p/go-libp2p/p2p/transport/quic" | ||
| "github.com/libp2p/go-libp2p/p2p/transport/tcp" | ||
| "github.com/urfave/cli/v2" | ||
| "golang.org/x/sync/errgroup" | ||
| ) | ||
|  | ||
| var fetchCmd = &cli.Command{ | ||
| Name: "fetch", | ||
| Usage: "fetch <multiaddr> <root cid> <output car path>", | ||
| Description: "Fetch all blocks in the DAG under the given root cid from the bitswap node at multiaddr", | ||
| Before: before, | ||
| Flags: []cli.Flag{ | ||
| &cli.BoolFlag{ | ||
| Name: "pprof", | ||
| Usage: "run pprof web server on localhost:6071", | ||
| }, | ||
| &cli.IntFlag{ | ||
| Name: "concurrency", | ||
| Usage: "concurrent request limit - 0 means unlimited", | ||
| Value: 10, | ||
| }, | ||
| }, | ||
| Action: func(cctx *cli.Context) error { | ||
| if cctx.Bool("pprof") { | ||
| go func() { | ||
| err := http.ListenAndServe("localhost:6071", nil) | ||
| if err != nil { | ||
| log.Error(err) | ||
| } | ||
| }() | ||
| } | ||
|  | ||
| if cctx.Args().Len() != 3 { | ||
| return fmt.Errorf("usage: fetch <multiaddr> <root cid> <output car path>") | ||
| } | ||
|  | ||
| addrInfoStr := cctx.Args().Get(0) | ||
| serverAddrInfo, err := peer.AddrInfoFromString(addrInfoStr) | ||
| if err != nil { | ||
| return fmt.Errorf("parsing server multiaddr %s: %w", addrInfoStr, err) | ||
| } | ||
|  | ||
| rootCidStr := cctx.Args().Get(1) | ||
| rootCid, err := cid.Parse(rootCidStr) | ||
| if err != nil { | ||
| return fmt.Errorf("parsing cid %s: %w", rootCidStr, err) | ||
| } | ||
|  | ||
| outputCarPath := cctx.Args().Get(2) | ||
|  | ||
| ctx := lcli.ReqContext(cctx) | ||
|  | ||
| // setup libp2p host | ||
| log.Infow("generating libp2p key") | ||
| privKey, _, err := crypto.GenerateECDSAKeyPair(rand.Reader) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|  | ||
| host, err := libp2p.New( | ||
| libp2p.Transport(tcp.NewTCPTransport), | ||
| libp2p.Transport(quic.NewTransport), | ||
| 
      Comment on lines
    
      +88
     to 
      +89
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are these the only transports you're supporting because they're the only ones Boost supports? | ||
| libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport), | ||
| libp2p.Muxer("/yamux/1.0.0", yamux.DefaultTransport), | ||
| libp2p.Identity(privKey), | ||
| libp2p.ResourceManager(network.NullResourceManager), | ||
| ) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|  | ||
| // Create a bitswap client | ||
| nilRouter, err := nilrouting.ConstructNilRouting(ctx, nil, nil, nil) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| net := bsnetwork.NewFromIpfsHost(host, nilRouter) | ||
| bs, err := blockstore.OpenReadWrite(outputCarPath, []cid.Cid{rootCid}, blockstore.UseWholeCIDs(true)) | ||
| if err != nil { | ||
| return fmt.Errorf("creating blockstore at %s: %w", outputCarPath, err) | ||
| } | ||
|  | ||
| ctx, cancel := context.WithCancel(ctx) | ||
| defer cancel() | ||
| brn := &blockReceiver{bs: bs, ctx: ctx, cancel: cancel} | ||
| bsClient := client.New(ctx, net, bs, client.WithBlockReceivedNotifier(brn)) | ||
| defer bsClient.Close() | ||
| net.Start(bsClient) | ||
|  | ||
| // Connect to host | ||
| connectStart := time.Now() | ||
| log.Infow("connecting to server", "server", serverAddrInfo.String()) | ||
| err = host.Connect(ctx, *serverAddrInfo) | ||
| if err != nil { | ||
| return fmt.Errorf("connecting to %s: %w", serverAddrInfo, err) | ||
| 
      Comment on lines
    
      +120
     to 
      +122
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will likely cause you problems with large files since if the connection gets pruned on the server side it'll never get re-established and your download will stall (e.g. like ipfs/ipget#103). It's resolvable by trying to keep the connection alive. | ||
| } | ||
| log.Debugw("connected to server", "duration", time.Since(connectStart).String()) | ||
|  | ||
| // Check host's libp2p protocols | ||
| protos, err := host.Peerstore().GetProtocols(serverAddrInfo.ID) | ||
| if err != nil { | ||
| return fmt.Errorf("getting protocols from peer store for %s: %w", serverAddrInfo.ID, err) | ||
| } | ||
| sort.Slice(protos, func(i, j int) bool { | ||
| return protos[i] < protos[j] | ||
| }) | ||
| log.Debugw("host libp2p protocols", "protocols", protos) | ||
| p, err := host.Peerstore().FirstSupportedProtocol(serverAddrInfo.ID, bitswap.ProtocolStrings...) | ||
| if err != nil { | ||
| return fmt.Errorf("getting first supported protocol from peer store for %s: %w", serverAddrInfo.ID, err) | ||
| } | ||
| if p == "" { | ||
| return fmt.Errorf("host %s does not support any know bitswap protocols: %s", serverAddrInfo.ID, bitswap.ProtocolStrings) | ||
| } | ||
|  | ||
| var throttle chan struct{} | ||
| concurrency := cctx.Int("concurrency") | ||
| if concurrency > 0 { | ||
| throttle = make(chan struct{}, concurrency) | ||
| } | ||
|  | ||
| // Fetch all blocks under the root cid | ||
| log.Infow("fetch", "cid", rootCid, "concurrency", concurrency) | ||
| start := time.Now() | ||
| count, size, err := getBlocks(ctx, bsClient, rootCid, throttle) | ||
| if err != nil { | ||
| return fmt.Errorf("getting blocks: %w", err) | ||
| } | ||
|  | ||
| log.Infow("fetch complete", "count", count, "size", size, "duration", time.Since(start).String()) | ||
| log.Debug("finalizing") | ||
| finalizeStart := time.Now() | ||
| defer func() { log.Infow("finalize complete", "duration", time.Since(finalizeStart).String()) }() | ||
| return bs.Finalize() | ||
| }, | ||
| } | ||
|  | ||
| func getBlocks(ctx context.Context, bsClient *client.Client, c cid.Cid, throttle chan struct{}) (uint64, uint64, error) { | ||
| if throttle != nil { | ||
| throttle <- struct{}{} | ||
| } | ||
| // Get the block | ||
| start := time.Now() | ||
| blk, err := bsClient.GetBlock(ctx, c) | ||
| if throttle != nil { | ||
| <-throttle | ||
| } | ||
| if err != nil { | ||
| return 0, 0, err | ||
| } | ||
|  | ||
| var size = uint64(len(blk.RawData())) | ||
| log.Debugw("receive", "cid", c, "size", size, "duration", time.Since(start).String()) | ||
|  | ||
| // Read the links from the block to child nodes in the DAG | ||
| var count = uint64(1) | ||
| nd, err := ipldlegacy.DecodeNode(ctx, blk) | ||
| if err != nil { | ||
| return 0, 0, fmt.Errorf("decoding node %s: %w", c, err) | ||
| } | ||
|  | ||
| var eg errgroup.Group | ||
| lnks := nd.Links() | ||
| for _, l := range lnks { | ||
| l := l | ||
| // Launch a go routine to fetch the blocks underneath each link | ||
| eg.Go(func() error { | ||
| cnt, sz, err := getBlocks(ctx, bsClient, l.Cid, throttle) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| atomic.AddUint64(&count, cnt) | ||
| atomic.AddUint64(&size, sz) | ||
| return nil | ||
| }) | ||
| } | ||
|  | ||
| return count, size, eg.Wait() | ||
| } | ||
|  | ||
| type blockReceiver struct { | ||
| bs *blockstore.ReadWrite | ||
| ctx context.Context | ||
| cancel context.CancelFunc | ||
| } | ||
|  | ||
| func (b blockReceiver) ReceivedBlocks(id peer.ID, blks []blocks.Block) { | ||
| err := b.bs.PutMany(b.ctx, blks) | ||
| if err != nil { | ||
| log.Errorw("failed to write blocks to blockstore: %s", err) | ||
| b.cancel() | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -31,6 +31,7 @@ func main() { | |
| Commands: []*cli.Command{ | ||
| initCmd, | ||
| runCmd, | ||
| fetchCmd, | ||
| }, | ||
| } | ||
| app.Setup() | ||
|  | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason you went with ECDSA over ED25519, I tend to see the latter as the default in more places.