|  | 
|  | 1 | +package main | 
|  | 2 | + | 
|  | 3 | +import ( | 
|  | 4 | +	"context" | 
|  | 5 | +	"crypto/rand" | 
|  | 6 | +	"fmt" | 
|  | 7 | +	"net/http" | 
|  | 8 | +	_ "net/http/pprof" | 
|  | 9 | +	"sync/atomic" | 
|  | 10 | +	"time" | 
|  | 11 | + | 
|  | 12 | +	"github.com/filecoin-project/boost/tracing" | 
|  | 13 | +	lcli "github.com/filecoin-project/lotus/cli" | 
|  | 14 | +	"github.com/ipfs/go-bitswap/client" | 
|  | 15 | +	bsnetwork "github.com/ipfs/go-bitswap/network" | 
|  | 16 | +	"github.com/ipfs/go-cid" | 
|  | 17 | +	"github.com/ipfs/go-datastore" | 
|  | 18 | +	bstore "github.com/ipfs/go-ipfs-blockstore" | 
|  | 19 | +	nilrouting "github.com/ipfs/go-ipfs-routing/none" | 
|  | 20 | +	ipldlegacy "github.com/ipfs/go-ipld-legacy" | 
|  | 21 | +	"github.com/libp2p/go-libp2p" | 
|  | 22 | +	"github.com/libp2p/go-libp2p/core/crypto" | 
|  | 23 | +	"github.com/libp2p/go-libp2p/core/network" | 
|  | 24 | +	"github.com/libp2p/go-libp2p/core/peer" | 
|  | 25 | +	"github.com/libp2p/go-libp2p/p2p/muxer/mplex" | 
|  | 26 | +	"github.com/libp2p/go-libp2p/p2p/muxer/yamux" | 
|  | 27 | +	quic "github.com/libp2p/go-libp2p/p2p/transport/quic" | 
|  | 28 | +	"github.com/libp2p/go-libp2p/p2p/transport/tcp" | 
|  | 29 | +	"github.com/pkg/profile" | 
|  | 30 | +	"github.com/urfave/cli/v2" | 
|  | 31 | +	"golang.org/x/sync/errgroup" | 
|  | 32 | +) | 
|  | 33 | + | 
|  | 34 | +var fetchCmd = &cli.Command{ | 
|  | 35 | +	Name:        "fetch", | 
|  | 36 | +	Usage:       "fetch <multiaddr> <root cid>", | 
|  | 37 | +	Description: "Fetch all blocks in the DAG under the given root cid from the bitswap node at multiaddr", | 
|  | 38 | +	Before:      before, | 
|  | 39 | +	Flags: []cli.Flag{ | 
|  | 40 | +		&cli.BoolFlag{ | 
|  | 41 | +			Name:  "pprof", | 
|  | 42 | +			Usage: "run pprof web server on localhost:6070", | 
|  | 43 | +		}, | 
|  | 44 | +		&cli.IntFlag{ | 
|  | 45 | +			Name:  "concurrency", | 
|  | 46 | +			Usage: "concurrent request limit - 0 means unlimited", | 
|  | 47 | +			Value: 10, | 
|  | 48 | +		}, | 
|  | 49 | +		&cli.BoolFlag{ | 
|  | 50 | +			Name:  "tracing", | 
|  | 51 | +			Usage: "enables tracing of booster-bitswap calls", | 
|  | 52 | +			Value: false, | 
|  | 53 | +		}, | 
|  | 54 | +		&cli.StringFlag{ | 
|  | 55 | +			Name:  "tracing-endpoint", | 
|  | 56 | +			Usage: "the endpoint for the tracing exporter", | 
|  | 57 | +			Value: "http://tempo:14268/api/traces", | 
|  | 58 | +		}, | 
|  | 59 | +	}, | 
|  | 60 | +	Action: func(cctx *cli.Context) error { | 
|  | 61 | +		if cctx.Args().Len() != 2 { | 
|  | 62 | +			return fmt.Errorf("usage: fetch <multiaddr> <cid>") | 
|  | 63 | +		} | 
|  | 64 | + | 
|  | 65 | +		addrInfoStr := cctx.Args().Get(0) | 
|  | 66 | +		serverAddrInfo, err := peer.AddrInfoFromString(addrInfoStr) | 
|  | 67 | +		if err != nil { | 
|  | 68 | +			return fmt.Errorf("parsing server multiaddr %s: %w", addrInfoStr, err) | 
|  | 69 | +		} | 
|  | 70 | + | 
|  | 71 | +		rootCidStr := cctx.Args().Get(1) | 
|  | 72 | +		rootCid, err := cid.Parse(rootCidStr) | 
|  | 73 | +		if err != nil { | 
|  | 74 | +			return fmt.Errorf("parsing cid %s: %w", rootCidStr, err) | 
|  | 75 | +		} | 
|  | 76 | + | 
|  | 77 | +		defer profile.Start(profile.TraceProfile, profile.ProfilePath(".")).Stop() | 
|  | 78 | + | 
|  | 79 | +		if cctx.Bool("pprof") { | 
|  | 80 | +			go func() { | 
|  | 81 | +				err := http.ListenAndServe("localhost:6065", nil) | 
|  | 82 | +				if err != nil { | 
|  | 83 | +					log.Error(err) | 
|  | 84 | +				} | 
|  | 85 | +			}() | 
|  | 86 | +		} | 
|  | 87 | + | 
|  | 88 | +		ctx := lcli.ReqContext(cctx) | 
|  | 89 | + | 
|  | 90 | +		// Instantiate the tracer and exporter | 
|  | 91 | +		if cctx.Bool("tracing") { | 
|  | 92 | +			tracingStopper, err := tracing.New("booster-bsclient", cctx.String("tracing-endpoint")) | 
|  | 93 | +			if err != nil { | 
|  | 94 | +				return fmt.Errorf("failed to instantiate tracer: %w", err) | 
|  | 95 | +			} | 
|  | 96 | +			log.Info("Tracing exporter enabled") | 
|  | 97 | + | 
|  | 98 | +			defer func() { | 
|  | 99 | +				_ = tracingStopper(ctx) | 
|  | 100 | +			}() | 
|  | 101 | +		} | 
|  | 102 | + | 
|  | 103 | +		// setup libp2p host | 
|  | 104 | +		log.Infow("generating libp2p key") | 
|  | 105 | +		privKey, _, err := crypto.GenerateECDSAKeyPair(rand.Reader) | 
|  | 106 | +		if err != nil { | 
|  | 107 | +			return err | 
|  | 108 | +		} | 
|  | 109 | + | 
|  | 110 | +		host, err := libp2p.New( | 
|  | 111 | +			libp2p.Transport(tcp.NewTCPTransport), | 
|  | 112 | +			libp2p.Transport(quic.NewTransport), | 
|  | 113 | +			libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport), | 
|  | 114 | +			libp2p.Muxer("/yamux/1.0.0", yamux.DefaultTransport), | 
|  | 115 | +			libp2p.Identity(privKey), | 
|  | 116 | +			libp2p.ResourceManager(network.NullResourceManager), | 
|  | 117 | +		) | 
|  | 118 | +		if err != nil { | 
|  | 119 | +			return err | 
|  | 120 | +		} | 
|  | 121 | + | 
|  | 122 | +		// Create a bitswap client | 
|  | 123 | +		nilRouter, err := nilrouting.ConstructNilRouting(ctx, nil, nil, nil) | 
|  | 124 | +		if err != nil { | 
|  | 125 | +			return err | 
|  | 126 | +		} | 
|  | 127 | +		net := bsnetwork.NewFromIpfsHost(host, nilRouter) | 
|  | 128 | +		bs := bstore.NewBlockstore(datastore.NewNullDatastore()) | 
|  | 129 | +		bsClient := client.New(ctx, net, bs) | 
|  | 130 | +		net.Start(bsClient) | 
|  | 131 | + | 
|  | 132 | +		log.Infow("connecting to server", "server", serverAddrInfo.String()) | 
|  | 133 | +		err = host.Connect(ctx, *serverAddrInfo) | 
|  | 134 | +		if err != nil { | 
|  | 135 | +			return fmt.Errorf("connecting to %s: %w", serverAddrInfo, err) | 
|  | 136 | +		} | 
|  | 137 | + | 
|  | 138 | +		var throttle chan struct{} | 
|  | 139 | +		concurrency := cctx.Int("concurrency") | 
|  | 140 | +		if concurrency > 0 { | 
|  | 141 | +			throttle = make(chan struct{}, concurrency) | 
|  | 142 | +		} | 
|  | 143 | + | 
|  | 144 | +		log.Infow("fetch", "cid", rootCid, "concurrency", concurrency) | 
|  | 145 | +		start := time.Now() | 
|  | 146 | +		count, size, err := getBlocks(ctx, bsClient, rootCid, throttle) | 
|  | 147 | +		if err != nil { | 
|  | 148 | +			return fmt.Errorf("getting blocks: %w", err) | 
|  | 149 | +		} | 
|  | 150 | + | 
|  | 151 | +		log.Infow("complete", "count", count, "size", size, "duration", time.Since(start)) | 
|  | 152 | +		return nil | 
|  | 153 | +	}, | 
|  | 154 | +} | 
|  | 155 | + | 
|  | 156 | +func getBlocks(ctx context.Context, bsClient *client.Client, c cid.Cid, throttle chan struct{}) (uint64, uint64, error) { | 
|  | 157 | +	if throttle != nil { | 
|  | 158 | +		throttle <- struct{}{} | 
|  | 159 | +	} | 
|  | 160 | +	// Get the block | 
|  | 161 | +	start := time.Now() | 
|  | 162 | +	blk, err := bsClient.GetBlock(ctx, c) | 
|  | 163 | +	if throttle != nil { | 
|  | 164 | +		<-throttle | 
|  | 165 | +	} | 
|  | 166 | +	if err != nil { | 
|  | 167 | +		return 0, 0, err | 
|  | 168 | +	} | 
|  | 169 | + | 
|  | 170 | +	var size = uint64(len(blk.RawData())) | 
|  | 171 | +	log.Debugw("receive", "cid", c, "size", size, "duration", time.Since(start)) | 
|  | 172 | + | 
|  | 173 | +	// Read the links from the block to child nodes in the DAG | 
|  | 174 | +	var count = uint64(1) | 
|  | 175 | +	nd, err := ipldlegacy.DecodeNode(ctx, blk) | 
|  | 176 | +	if err != nil { | 
|  | 177 | +		return 0, 0, fmt.Errorf("decoding node %s: %w", c, err) | 
|  | 178 | +	} | 
|  | 179 | + | 
|  | 180 | +	var eg errgroup.Group | 
|  | 181 | +	lnks := nd.Links() | 
|  | 182 | +	for _, l := range lnks { | 
|  | 183 | +		l := l | 
|  | 184 | +		// Launch a go routine to fetch the blocks underneath each link | 
|  | 185 | +		eg.Go(func() error { | 
|  | 186 | +			cnt, sz, err := getBlocks(ctx, bsClient, l.Cid, throttle) | 
|  | 187 | +			if err != nil { | 
|  | 188 | +				return err | 
|  | 189 | +			} | 
|  | 190 | +			atomic.AddUint64(&count, cnt) | 
|  | 191 | +			atomic.AddUint64(&size, sz) | 
|  | 192 | +			return nil | 
|  | 193 | +		}) | 
|  | 194 | +	} | 
|  | 195 | +	return count, size, eg.Wait() | 
|  | 196 | +} | 
0 commit comments