Skip to content

Commit 85b7358

Browse files
hannahhowarddirkmcnonsense
committed
booster bitswap MVP executable (#707)
* feat(booster-bitswap): booster bitswap MVP untested * refactor(booster-bitswap): use API for fetching blocks * fix(deps): update deps to compile * feat(booster-bitswap): makefile & fixes add commands to build booster-bitswap, and very a round tripped successful fetch from booster-bitswap * refactor: clean up unused vars etc * fix: booster-bitsawp - check error when creating libp2p key * refactor(node): avoid FreeAndUnsealed method Co-authored-by: Dirk McCormick <dirkmdev@gmail.com> Co-authored-by: Anton Evangelatov <anton.evangelatov@gmail.com>
1 parent 847ccc9 commit 85b7358

File tree

18 files changed

+681
-94
lines changed

18 files changed

+681
-94
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
/boostd
44
/devnet
55
/booster-http
6+
/booster-bitswap
67
/docgen-md
78
/docgen-openrpc
89
extern/filecoin-ffi/rust/target

Makefile

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,12 @@ booster-http: $(BUILD_DEPS)
9898
.PHONY: booster-http
9999
BINS+=booster-http
100100

101+
booster-bitswap: $(BUILD_DEPS)
102+
rm -f booster-bitswap
103+
$(GOCC) build $(GOFLAGS) -o booster-bitswap ./cmd/booster-bitswap
104+
.PHONY: booster-bitswap
105+
BINS+=booster-bitswap
106+
101107
devnet: $(BUILD_DEPS)
102108
rm -f devnet
103109
$(GOCC) build $(GOFLAGS) -o devnet ./cmd/devnet

api/api.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ type Boost interface {
4848
BoostDagstorePiecesContainingMultihash(ctx context.Context, mh multihash.Multihash) ([]cid.Cid, error) //perm:read
4949
BoostDagstoreListShards(ctx context.Context) ([]DagstoreShardInfo, error) //perm:read
5050

51+
// MethodGroup: Blockstore
52+
BlockstoreGet(ctx context.Context, c cid.Cid) ([]byte, error) //perm:read
53+
BlockstoreHas(ctx context.Context, c cid.Cid) (bool, error) //perm:read
54+
BlockstoreGetSize(ctx context.Context, c cid.Cid) (int, error) //perm:read
55+
5156
// RuntimeSubsystems returns the subsystems that are enabled
5257
// in this instance.
5358
RuntimeSubsystems(ctx context.Context) (lapi.MinerSubsystems, error) //perm:read

api/proxy_gen.go

Lines changed: 39 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

build/openrpc/boost.json.gz

148 Bytes
Binary file not shown.

cmd/booster-bitswap/main.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package main
2+
3+
import (
4+
"os"
5+
6+
"github.com/filecoin-project/boost/build"
7+
cliutil "github.com/filecoin-project/boost/cli/util"
8+
logging "github.com/ipfs/go-log/v2"
9+
"github.com/urfave/cli/v2"
10+
)
11+
12+
var log = logging.Logger("booster")
13+
14+
func main() {
15+
app := &cli.App{
16+
Name: "booster-bitswap",
17+
Usage: "Bitswap endpoint for retrieval from Filecoin",
18+
EnableBashCompletion: true,
19+
Version: build.UserVersion(),
20+
Flags: []cli.Flag{
21+
cliutil.FlagVeryVerbose,
22+
},
23+
Commands: []*cli.Command{
24+
runCmd,
25+
},
26+
}
27+
app.Setup()
28+
29+
if err := app.Run(os.Args); err != nil {
30+
os.Stderr.WriteString("Error: " + err.Error() + "\n")
31+
}
32+
}
33+
34+
func before(cctx *cli.Context) error {
35+
_ = logging.SetLogLevel("booster", "INFO")
36+
37+
if cliutil.IsVeryVerbose {
38+
_ = logging.SetLogLevel("booster", "DEBUG")
39+
}
40+
41+
return nil
42+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package remoteblockstore
2+
3+
import (
4+
"context"
5+
"errors"
6+
7+
blocks "github.com/ipfs/go-block-format"
8+
logging "github.com/ipfs/go-log/v2"
9+
10+
"github.com/ipfs/go-cid"
11+
blockstore "github.com/ipfs/go-ipfs-blockstore"
12+
)
13+
14+
var log = logging.Logger("remote-blockstore")
15+
16+
var _ blockstore.Blockstore = (*RemoteBlockstore)(nil)
17+
18+
type RemoteBlockstoreAPI interface {
19+
BlockstoreGet(ctx context.Context, c cid.Cid) ([]byte, error)
20+
BlockstoreHas(ctx context.Context, c cid.Cid) (bool, error)
21+
BlockstoreGetSize(ctx context.Context, c cid.Cid) (int, error)
22+
}
23+
24+
// RemoteBlockstore is a read-only blockstore over all cids across all pieces on a provider.
25+
type RemoteBlockstore struct {
26+
api RemoteBlockstoreAPI
27+
}
28+
29+
func NewRemoteBlockstore(api RemoteBlockstoreAPI) blockstore.Blockstore {
30+
return &RemoteBlockstore{
31+
api: api,
32+
}
33+
}
34+
35+
func (ro *RemoteBlockstore) Get(ctx context.Context, c cid.Cid) (b blocks.Block, err error) {
36+
log.Debugw("Get", "cid", c)
37+
data, err := ro.api.BlockstoreGet(ctx, c)
38+
log.Debugw("Get response", "cid", c, "error", err)
39+
if err != nil {
40+
return nil, err
41+
}
42+
return blocks.NewBlockWithCid(data, c)
43+
}
44+
45+
func (ro *RemoteBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) {
46+
log.Debugw("Has", "cid", c)
47+
has, err := ro.api.BlockstoreHas(ctx, c)
48+
log.Debugw("Has response", "cid", c, "has", has, "error", err)
49+
return has, err
50+
}
51+
52+
func (ro *RemoteBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
53+
log.Debugw("GetSize", "cid", c)
54+
size, err := ro.api.BlockstoreGetSize(ctx, c)
55+
log.Debugw("GetSize response", "cid", c, "size", size, "error", err)
56+
return size, err
57+
}
58+
59+
// --- UNSUPPORTED BLOCKSTORE METHODS -------
60+
func (ro *RemoteBlockstore) DeleteBlock(context.Context, cid.Cid) error {
61+
return errors.New("unsupported operation DeleteBlock")
62+
}
63+
func (ro *RemoteBlockstore) HashOnRead(_ bool) {}
64+
func (ro *RemoteBlockstore) Put(context.Context, blocks.Block) error {
65+
return errors.New("unsupported operation Put")
66+
}
67+
func (ro *RemoteBlockstore) PutMany(context.Context, []blocks.Block) error {
68+
return errors.New("unsupported operation PutMany")
69+
}
70+
func (ro *RemoteBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
71+
return nil, errors.New("unsupported operation AllKeysChan")
72+
}

cmd/booster-bitswap/run.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net/http"
7+
_ "net/http/pprof"
8+
"strings"
9+
10+
"github.com/filecoin-project/boost/api"
11+
bclient "github.com/filecoin-project/boost/api/client"
12+
cliutil "github.com/filecoin-project/boost/cli/util"
13+
"github.com/filecoin-project/boost/cmd/booster-bitswap/remoteblockstore"
14+
"github.com/filecoin-project/go-jsonrpc"
15+
lcli "github.com/filecoin-project/lotus/cli"
16+
"github.com/urfave/cli/v2"
17+
)
18+
19+
var runCmd = &cli.Command{
20+
Name: "run",
21+
Usage: "Start a booster-bitswap process",
22+
Before: before,
23+
Flags: []cli.Flag{
24+
&cli.BoolFlag{
25+
Name: "pprof",
26+
Usage: "run pprof web server on localhost:6070",
27+
},
28+
&cli.UintFlag{
29+
Name: "port",
30+
Usage: "the port to listen for bitswap requests on",
31+
Value: 8888,
32+
},
33+
&cli.StringFlag{
34+
Name: "api-boost",
35+
Usage: "the endpoint for the boost API",
36+
Required: true,
37+
},
38+
},
39+
Action: func(cctx *cli.Context) error {
40+
if cctx.Bool("pprof") {
41+
go func() {
42+
err := http.ListenAndServe("localhost:6070", nil)
43+
if err != nil {
44+
log.Error(err)
45+
}
46+
}()
47+
}
48+
49+
// Connect to the Boost API
50+
ctx := lcli.ReqContext(cctx)
51+
boostAPIInfo := cctx.String("api-boost")
52+
bapi, bcloser, err := getBoostAPI(ctx, boostAPIInfo)
53+
if err != nil {
54+
return fmt.Errorf("getting boost API: %w", err)
55+
}
56+
defer bcloser()
57+
58+
remoteStore := remoteblockstore.NewRemoteBlockstore(bapi)
59+
// Create the server API
60+
port := cctx.Int("port")
61+
server := NewBitswapServer(port, remoteStore)
62+
63+
// Start the server
64+
log.Infof("Starting booster-bitswap node on port %d", port)
65+
err = server.Start(ctx)
66+
if err != nil {
67+
return err
68+
}
69+
// Monitor for shutdown.
70+
<-ctx.Done()
71+
72+
log.Info("Shutting down...")
73+
74+
err = server.Stop()
75+
if err != nil {
76+
return err
77+
}
78+
log.Info("Graceful shutdown successful")
79+
80+
// Sync all loggers.
81+
_ = log.Sync() //nolint:errcheck
82+
83+
return nil
84+
},
85+
}
86+
87+
func getBoostAPI(ctx context.Context, ai string) (api.Boost, jsonrpc.ClientCloser, error) {
88+
ai = strings.TrimPrefix(strings.TrimSpace(ai), "BOOST_API_INFO=")
89+
info := cliutil.ParseApiInfo(ai)
90+
addr, err := info.DialArgs("v0")
91+
if err != nil {
92+
return nil, nil, fmt.Errorf("could not get DialArgs: %w", err)
93+
}
94+
95+
log.Infof("Using boost API at %s", addr)
96+
api, closer, err := bclient.NewBoostRPCV0(ctx, addr, info.AuthHeader())
97+
if err != nil {
98+
return nil, nil, fmt.Errorf("creating full node service API: %w", err)
99+
}
100+
101+
return api, closer, nil
102+
}

cmd/booster-bitswap/server.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"crypto/rand"
6+
"fmt"
7+
8+
bsnetwork "github.com/ipfs/go-bitswap/network"
9+
"github.com/ipfs/go-bitswap/server"
10+
blockstore "github.com/ipfs/go-ipfs-blockstore"
11+
nilrouting "github.com/ipfs/go-ipfs-routing/none"
12+
"github.com/libp2p/go-libp2p"
13+
crypto "github.com/libp2p/go-libp2p-core/crypto"
14+
"github.com/libp2p/go-libp2p-core/network"
15+
"github.com/libp2p/go-libp2p/p2p/muxer/mplex"
16+
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
17+
quic "github.com/libp2p/go-libp2p/p2p/transport/quic"
18+
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
19+
)
20+
21+
type BitswapServer struct {
22+
port int
23+
remoteStore blockstore.Blockstore
24+
25+
ctx context.Context
26+
cancel context.CancelFunc
27+
server *server.Server
28+
}
29+
30+
func NewBitswapServer(port int, remoteStore blockstore.Blockstore) *BitswapServer {
31+
return &BitswapServer{port: port, remoteStore: remoteStore}
32+
}
33+
34+
func (s *BitswapServer) Start(ctx context.Context) error {
35+
s.ctx, s.cancel = context.WithCancel(ctx)
36+
// setup libp2p host
37+
privKey, _, err := crypto.GenerateECDSAKeyPair(rand.Reader)
38+
if err != nil {
39+
return err
40+
}
41+
42+
host, err := libp2p.New(
43+
libp2p.ListenAddrStrings(
44+
fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", s.port),
45+
fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic", s.port),
46+
),
47+
libp2p.Transport(tcp.NewTCPTransport),
48+
libp2p.Transport(quic.NewTransport),
49+
libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport),
50+
libp2p.Muxer("/yamux/1.0.0", yamux.DefaultTransport),
51+
libp2p.Identity(privKey),
52+
libp2p.ResourceManager(network.NullResourceManager),
53+
)
54+
if err != nil {
55+
return err
56+
}
57+
58+
// start a bitswap session on the provider
59+
nilRouter, err := nilrouting.ConstructNilRouting(ctx, nil, nil, nil)
60+
if err != nil {
61+
return err
62+
}
63+
bsopts := []server.Option{server.MaxOutstandingBytesPerPeer(1 << 20)}
64+
net := bsnetwork.NewFromIpfsHost(host, nilRouter)
65+
s.server = server.New(ctx, net, s.remoteStore, bsopts...)
66+
net.Start(s.server)
67+
68+
log.Infow("bitswap server running", "multiaddrs", host.Addrs(), "peerId", host.ID())
69+
return nil
70+
}
71+
72+
func (s *BitswapServer) Stop() error {
73+
s.cancel()
74+
return s.server.Close()
75+
}

0 commit comments

Comments
 (0)