Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions cmd/booster-bitswap/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package main

import (
"context"
"crypto/rand"
"fmt"
"os"
"path/filepath"

lcli "github.com/filecoin-project/lotus/cli"
"github.com/libp2p/go-libp2p"
crypto "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
peer "github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p/p2p/muxer/mplex"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
quic "github.com/libp2p/go-libp2p/p2p/transport/quic"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
"github.com/urfave/cli/v2"
)

func configureRepo(ctx context.Context, cfgDir string, createIfNotExist bool) (peer.ID, crypto.PrivKey, error) {
if cfgDir == "" {
return "", nil, fmt.Errorf("dataDir must be set")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest changing this error message to `FlagRepo.name + " is a required flag"

}

if err := os.MkdirAll(cfgDir, 0744); err != nil {
return "", nil, err
}

peerkey, err := loadPeerKey(cfgDir, createIfNotExist)
if err != nil {
return "", nil, err
}

selfPid, err := peer.IDFromPrivateKey(peerkey)
if err != nil {
return "", nil, err
}

return selfPid, peerkey, nil
}

func setupHost(ctx context.Context, cfgDir string, port int) (host.Host, error) {
_, peerKey, err := configureRepo(ctx, cfgDir, false)
if err != nil {
return nil, err
}
return libp2p.New(
libp2p.ListenAddrStrings(
fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port),
fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic", port),
),
libp2p.Transport(tcp.NewTCPTransport),
libp2p.Transport(quic.NewTransport),
libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport),
libp2p.Muxer("/yamux/1.0.0", yamux.DefaultTransport),
libp2p.Identity(peerKey),
libp2p.ResourceManager(network.NullResourceManager),
)
}

func loadPeerKey(cfgDir string, createIfNotExists bool) (crypto.PrivKey, error) {
var peerkey crypto.PrivKey
keyPath := filepath.Join(cfgDir, "libp2p.key")
keyFile, err := os.ReadFile(keyPath)
if err != nil {
if !os.IsNotExist(err) {
return nil, err
}
if os.IsNotExist(err) && !createIfNotExists {
return nil, err
}
log.Infof("Generating new peer key...")

key, _, err := crypto.GenerateEd25519Key(rand.Reader)
if err != nil {
return nil, err
}
peerkey = key

data, err := crypto.MarshalPrivateKey(key)
if err != nil {
return nil, err
}

if err := os.WriteFile(keyPath, data, 0600); err != nil {
return nil, err
}
} else {
key, err := crypto.UnmarshalPrivateKey(keyFile)
if err != nil {
return nil, err
}

peerkey = key
}

if peerkey == nil {
panic("sanity check: peer key is uninitialized")
}

return peerkey, nil
}

var initCmd = &cli.Command{
Name: "init",
Usage: "Init booster-bitswap config",
Before: before,
Flags: []cli.Flag{},
Action: func(cctx *cli.Context) error {

ctx := lcli.ReqContext(cctx)

repoDir := cctx.String(FlagRepo.Name)

peerID, _, err := configureRepo(ctx, repoDir, true)
fmt.Println(peerID)
return err
},
}
9 changes: 9 additions & 0 deletions cmd/booster-bitswap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ import (

var log = logging.Logger("booster")

var FlagRepo = &cli.StringFlag{
Name: "repo",
Usage: "repo directory for Booster bitswap",
Value: "~/.booster-bitswap",
EnvVars: []string{"BOOST_BITSWAP_REPO"},
}

func main() {
app := &cli.App{
Name: "booster-bitswap",
Expand All @@ -19,8 +26,10 @@ func main() {
Version: build.UserVersion(),
Flags: []cli.Flag{
cliutil.FlagVeryVerbose,
FlagRepo,
},
Commands: []*cli.Command{
initCmd,
runCmd,
},
}
Expand Down
17 changes: 14 additions & 3 deletions cmd/booster-bitswap/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,25 @@ var runCmd = &cli.Command{
remoteStore := remoteblockstore.NewRemoteBlockstore(bapi)
// Create the server API
port := cctx.Int("port")
server := NewBitswapServer(port, remoteStore)

repoDir := cctx.String(FlagRepo.Name)
host, err := setupHost(ctx, repoDir, port)
if err != nil {
return fmt.Errorf("setting up libp2p host: %w", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the peer key is not found, I think this will output an error like "setting up libp2p host: libp2p.key not found"
I'd suggest we show something more informative, eg

if os.IsNotExist(err) {
	return nil, fmt.Errorf("booster-bitswap has not been initialized. Run the booster-bitswap init command.")
}

}
// Start the server
server := NewBitswapServer(remoteStore, host)

addrs, err := bapi.NetAddrsListen(ctx)
if err != nil {
return fmt.Errorf("getting boost API addrs: %w", err)
}

log.Infof("Starting booster-bitswap node on port %d", port)
err = server.Start(ctx)
err = server.Start(ctx, addrs)
if err != nil {
return err
}

// Monitor for shutdown.
<-ctx.Done()

Expand Down
38 changes: 8 additions & 30 deletions cmd/booster-bitswap/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,55 +2,33 @@ package main

import (
"context"
"crypto/rand"
"fmt"

"github.com/filecoin-project/boost/protocolproxy"
bsnetwork "github.com/ipfs/go-bitswap/network"
"github.com/ipfs/go-bitswap/server"
blockstore "github.com/ipfs/go-ipfs-blockstore"
nilrouting "github.com/ipfs/go-ipfs-routing/none"
"github.com/libp2p/go-libp2p"
crypto "github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p/p2p/muxer/mplex"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
quic "github.com/libp2p/go-libp2p/p2p/transport/quic"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
"github.com/libp2p/go-libp2p-core/host"
peer "github.com/libp2p/go-libp2p-core/peer"
)

type BitswapServer struct {
port int
remoteStore blockstore.Blockstore

ctx context.Context
cancel context.CancelFunc
server *server.Server
host host.Host
}

func NewBitswapServer(port int, remoteStore blockstore.Blockstore) *BitswapServer {
return &BitswapServer{port: port, remoteStore: remoteStore}
func NewBitswapServer(remoteStore blockstore.Blockstore, host host.Host) *BitswapServer {
return &BitswapServer{remoteStore: remoteStore, host: host}
}

func (s *BitswapServer) Start(ctx context.Context) error {
func (s *BitswapServer) Start(ctx context.Context, balancer peer.AddrInfo) error {
s.ctx, s.cancel = context.WithCancel(ctx)
// setup libp2p host
privKey, _, err := crypto.GenerateECDSAKeyPair(rand.Reader)
if err != nil {
return err
}

host, err := libp2p.New(
libp2p.ListenAddrStrings(
fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", s.port),
fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic", s.port),
),
libp2p.Transport(tcp.NewTCPTransport),
libp2p.Transport(quic.NewTransport),
libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport),
libp2p.Muxer("/yamux/1.0.0", yamux.DefaultTransport),
libp2p.Identity(privKey),
libp2p.ResourceManager(network.NullResourceManager),
)
host, err := protocolproxy.NewForwardingHost(ctx, s.host, balancer)
if err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
contrib.go.opencensus.io/exporter/prometheus v0.4.0
github.com/BurntSushi/toml v1.1.0
github.com/alecthomas/jsonschema v0.0.0-20200530073317-71f438968921
github.com/benbjohnson/clock v1.3.0
github.com/buger/goterm v1.0.3
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e
github.com/davecgh/go-spew v1.1.1
Expand Down Expand Up @@ -83,6 +84,7 @@ require (
github.com/libp2p/go-libp2p-pubsub v0.7.1
github.com/libp2p/go-libp2p-record v0.1.3
github.com/libp2p/go-libp2p-resource-manager v0.5.3
github.com/libp2p/go-msgio v0.2.0
github.com/mattn/go-sqlite3 v1.14.10
github.com/mitchellh/go-homedir v1.1.0
github.com/multiformats/go-multiaddr v0.6.0
Expand Down
8 changes: 7 additions & 1 deletion node/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/filecoin-project/boost/node/impl/common"
"github.com/filecoin-project/boost/node/modules"
"github.com/filecoin-project/boost/node/modules/dtypes"
"github.com/filecoin-project/boost/protocolproxy"
"github.com/filecoin-project/boost/retrievalmarket/lp2pimpl"
"github.com/filecoin-project/boost/sealingpipeline"
"github.com/filecoin-project/boost/storagemanager"
Expand Down Expand Up @@ -80,7 +81,8 @@ import (
var log = logging.Logger("builder")

// special is a type used to give keys to modules which
// can't really be identified by the returned type
//
// can't really be identified by the returned type
type special struct{ id int }

//nolint:golint
Expand All @@ -104,6 +106,7 @@ var (
type invoke int

// Invokes are called in the order they are defined.
//
//nolint:golint
const (
// InitJournal at position 0 initializes the journal global var as soon as
Expand Down Expand Up @@ -140,6 +143,7 @@ const (
HandleDealsKey
HandleRetrievalKey
HandleRetrievalTransportsKey
HandleProtocolProxyKey
RunSectorServiceKey

// boost should be started after legacy markets (HandleDealsKey)
Expand Down Expand Up @@ -523,7 +527,9 @@ func ConfigBoost(cfg *config.Boost) Option {
Override(new(retrievalmarket.RetrievalProvider), lotus_modules.RetrievalProvider),
Override(HandleRetrievalKey, lotus_modules.HandleRetrieval),
Override(new(*lp2pimpl.TransportsListener), modules.NewTransportsListener(cfg)),
Override(new(*protocolproxy.ProtocolProxy), modules.NewProtocolProxy(cfg)),
Override(HandleRetrievalTransportsKey, modules.HandleRetrievalTransports),
Override(HandleProtocolProxyKey, modules.HandleProtocolProxy),
Override(new(idxprov.MeshCreator), idxprov.NewMeshCreator),
Override(new(provider.Interface), modules.IndexProvider(cfg.IndexProvider)),

Expand Down
6 changes: 6 additions & 0 deletions node/config/doc_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions node/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ type DealmakingConfig struct {
// The time that can elapse before a download is considered stalled (and
// another concurrent download is allowed to start).
HttpTransferStallTimeout Duration

BitswapPeerID string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a detailed comment about what this config value is. The comment ends up being copied into the config file itself, so it should be easy for users to understand what the value is and know what to set it to.

}

type FeeConfig struct {
Expand Down
46 changes: 46 additions & 0 deletions node/modules/retrieval.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,24 @@ import (
"fmt"

"github.com/filecoin-project/boost/node/config"
"github.com/filecoin-project/boost/protocolproxy"
"github.com/filecoin-project/boost/retrievalmarket/lp2pimpl"
"github.com/filecoin-project/boost/retrievalmarket/types"
"github.com/ipfs/go-bitswap/network"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/multiformats/go-multiaddr"
"go.uber.org/fx"
)

var bitswapProtocols = []protocol.ID{
network.ProtocolBitswap,
network.ProtocolBitswapNoVers,
network.ProtocolBitswapOneOne,
network.ProtocolBitswapOneZero,
}

func NewTransportsListener(cfg *config.Boost) func(h host.Host) (*lp2pimpl.TransportsListener, error) {
return func(h host.Host) (*lp2pimpl.TransportsListener, error) {
protos := []types.Protocol{}
Expand Down Expand Up @@ -39,6 +50,12 @@ func NewTransportsListener(cfg *config.Boost) func(h host.Host) (*lp2pimpl.Trans
Addresses: []multiaddr.Multiaddr{maddr},
})
}
if cfg.Dealmaking.BitswapPeerID != "" {
protos = append(protos, types.Protocol{
Name: "bitswap",
Addresses: h.Addrs(),
})
}

return lp2pimpl.NewTransportsListener(h, protos), nil
}
Expand All @@ -58,3 +75,32 @@ func HandleRetrievalTransports(lc fx.Lifecycle, l *lp2pimpl.TransportsListener)
},
})
}

func NewProtocolProxy(cfg *config.Boost) func(h host.Host) (*protocolproxy.ProtocolProxy, error) {
return func(h host.Host) (*protocolproxy.ProtocolProxy, error) {
peerConfig := map[peer.ID][]protocol.ID{}
if cfg.Dealmaking.BitswapPeerID != "" {
bsPeerID, err := peer.Decode(cfg.Dealmaking.BitswapPeerID)
if err != nil {
return nil, err
}
peerConfig[bsPeerID] = bitswapProtocols
}
return protocolproxy.NewProtocolProxy(h, peerConfig)
}
}

func HandleProtocolProxy(lc fx.Lifecycle, pp *protocolproxy.ProtocolProxy) {
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
log.Info("starting load balancer")
pp.Start(ctx)
return nil
},
OnStop: func(context.Context) error {
log.Info("stopping load balancer")
pp.Close()
return nil
},
})
}
Loading