Skip to content

Commit a58ea5a

Browse files
hannahhowardrvaggdirkmc
authored
LoadBalancer for bitswap (and later, more of libp2p) (#786)
* feat(loadbalancer): add message types * feat(messages): add utility functions * feat(loadbalancer): initial load balancer impl implementation of the load balancer node itself * feat(loadbalancer): add service node implements code for running a service node * feat(loadbalancer): integrate into boost and booster-bitswap * Update loadbalancer/loadbalancer.go Co-authored-by: Rod Vagg <rod@vagg.org> * Update loadbalancer/servicenode.go Co-authored-by: Rod Vagg <rod@vagg.org> * Update loadbalancer/servicenode.go Co-authored-by: Rod Vagg <rod@vagg.org> * Update loadbalancer/messages/messages.ipldsch Co-authored-by: Rod Vagg <rod@vagg.org> * Update loadbalancer/messages/messages.ipldsch Co-authored-by: Rod Vagg <rod@vagg.org> * refactor(loadbalancer): remove routing protocol remove the routing protocol, instead relying on a set config. also remove forwarding response for inbound requests * fix(loadbalancer): update tests * refactor(loadbalancer): integrate simplified load balancer removed pub keys to minimize network traffic, added api's to configure and update bitswap peer id, added auto config of bitswap peer id in booster-bitswap * docs(gen): regenerate api docs * chore(lint): fix lint errors * fix(loadbalancer): minor bridgestream fix * Update loadbalancer/servicenode.go Co-authored-by: dirkmc <dirkmdev@gmail.com> * refactor(protocolproxy): address PR comments renames, reconfigured architecture, etc * refactor(make init print out peer id): remove apis and transparent peer id setting. have init print Co-authored-by: Rod Vagg <rod@vagg.org> Co-authored-by: dirkmc <dirkmdev@gmail.com>
1 parent 58163ca commit a58ea5a

File tree

19 files changed

+1484
-34
lines changed

19 files changed

+1484
-34
lines changed

cmd/booster-bitswap/init.go

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"crypto/rand"
6+
"fmt"
7+
"os"
8+
"path/filepath"
9+
10+
lcli "github.com/filecoin-project/lotus/cli"
11+
"github.com/libp2p/go-libp2p"
12+
crypto "github.com/libp2p/go-libp2p-core/crypto"
13+
"github.com/libp2p/go-libp2p-core/host"
14+
"github.com/libp2p/go-libp2p-core/network"
15+
peer "github.com/libp2p/go-libp2p-core/peer"
16+
"github.com/libp2p/go-libp2p/p2p/muxer/mplex"
17+
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
18+
quic "github.com/libp2p/go-libp2p/p2p/transport/quic"
19+
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
20+
"github.com/urfave/cli/v2"
21+
)
22+
23+
func configureRepo(ctx context.Context, cfgDir string, createIfNotExist bool) (peer.ID, crypto.PrivKey, error) {
24+
if cfgDir == "" {
25+
return "", nil, fmt.Errorf("dataDir must be set")
26+
}
27+
28+
if err := os.MkdirAll(cfgDir, 0744); err != nil {
29+
return "", nil, err
30+
}
31+
32+
peerkey, err := loadPeerKey(cfgDir, createIfNotExist)
33+
if err != nil {
34+
return "", nil, err
35+
}
36+
37+
selfPid, err := peer.IDFromPrivateKey(peerkey)
38+
if err != nil {
39+
return "", nil, err
40+
}
41+
42+
return selfPid, peerkey, nil
43+
}
44+
45+
func setupHost(ctx context.Context, cfgDir string, port int) (host.Host, error) {
46+
_, peerKey, err := configureRepo(ctx, cfgDir, false)
47+
if err != nil {
48+
return nil, err
49+
}
50+
return libp2p.New(
51+
libp2p.ListenAddrStrings(
52+
fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port),
53+
fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic", port),
54+
),
55+
libp2p.Transport(tcp.NewTCPTransport),
56+
libp2p.Transport(quic.NewTransport),
57+
libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport),
58+
libp2p.Muxer("/yamux/1.0.0", yamux.DefaultTransport),
59+
libp2p.Identity(peerKey),
60+
libp2p.ResourceManager(network.NullResourceManager),
61+
)
62+
}
63+
64+
func loadPeerKey(cfgDir string, createIfNotExists bool) (crypto.PrivKey, error) {
65+
var peerkey crypto.PrivKey
66+
keyPath := filepath.Join(cfgDir, "libp2p.key")
67+
keyFile, err := os.ReadFile(keyPath)
68+
if err != nil {
69+
if !os.IsNotExist(err) {
70+
return nil, err
71+
}
72+
if os.IsNotExist(err) && !createIfNotExists {
73+
return nil, err
74+
}
75+
log.Infof("Generating new peer key...")
76+
77+
key, _, err := crypto.GenerateEd25519Key(rand.Reader)
78+
if err != nil {
79+
return nil, err
80+
}
81+
peerkey = key
82+
83+
data, err := crypto.MarshalPrivateKey(key)
84+
if err != nil {
85+
return nil, err
86+
}
87+
88+
if err := os.WriteFile(keyPath, data, 0600); err != nil {
89+
return nil, err
90+
}
91+
} else {
92+
key, err := crypto.UnmarshalPrivateKey(keyFile)
93+
if err != nil {
94+
return nil, err
95+
}
96+
97+
peerkey = key
98+
}
99+
100+
if peerkey == nil {
101+
panic("sanity check: peer key is uninitialized")
102+
}
103+
104+
return peerkey, nil
105+
}
106+
107+
var initCmd = &cli.Command{
108+
Name: "init",
109+
Usage: "Init booster-bitswap config",
110+
Before: before,
111+
Flags: []cli.Flag{},
112+
Action: func(cctx *cli.Context) error {
113+
114+
ctx := lcli.ReqContext(cctx)
115+
116+
repoDir := cctx.String(FlagRepo.Name)
117+
118+
peerID, _, err := configureRepo(ctx, repoDir, true)
119+
fmt.Println(peerID)
120+
return err
121+
},
122+
}

cmd/booster-bitswap/main.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,13 @@ import (
1111

1212
var log = logging.Logger("booster")
1313

14+
var FlagRepo = &cli.StringFlag{
15+
Name: "repo",
16+
Usage: "repo directory for Booster bitswap",
17+
Value: "~/.booster-bitswap",
18+
EnvVars: []string{"BOOST_BITSWAP_REPO"},
19+
}
20+
1421
func main() {
1522
app := &cli.App{
1623
Name: "booster-bitswap",
@@ -19,8 +26,10 @@ func main() {
1926
Version: build.UserVersion(),
2027
Flags: []cli.Flag{
2128
cliutil.FlagVeryVerbose,
29+
FlagRepo,
2230
},
2331
Commands: []*cli.Command{
32+
initCmd,
2433
runCmd,
2534
},
2635
}

cmd/booster-bitswap/run.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,14 +83,25 @@ var runCmd = &cli.Command{
8383
remoteStore := remoteblockstore.NewRemoteBlockstore(bapi)
8484
// Create the server API
8585
port := cctx.Int("port")
86-
server := NewBitswapServer(port, remoteStore)
87-
86+
repoDir := cctx.String(FlagRepo.Name)
87+
host, err := setupHost(ctx, repoDir, port)
88+
if err != nil {
89+
return fmt.Errorf("setting up libp2p host: %w", err)
90+
}
8891
// Start the server
92+
server := NewBitswapServer(remoteStore, host)
93+
94+
addrs, err := bapi.NetAddrsListen(ctx)
95+
if err != nil {
96+
return fmt.Errorf("getting boost API addrs: %w", err)
97+
}
98+
8999
log.Infof("Starting booster-bitswap node on port %d", port)
90-
err = server.Start(ctx)
100+
err = server.Start(ctx, addrs)
91101
if err != nil {
92102
return err
93103
}
104+
94105
// Monitor for shutdown.
95106
<-ctx.Done()
96107

cmd/booster-bitswap/server.go

Lines changed: 8 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,55 +2,33 @@ package main
22

33
import (
44
"context"
5-
"crypto/rand"
6-
"fmt"
75

6+
"github.com/filecoin-project/boost/protocolproxy"
87
bsnetwork "github.com/ipfs/go-bitswap/network"
98
"github.com/ipfs/go-bitswap/server"
109
blockstore "github.com/ipfs/go-ipfs-blockstore"
1110
nilrouting "github.com/ipfs/go-ipfs-routing/none"
12-
"github.com/libp2p/go-libp2p"
13-
crypto "github.com/libp2p/go-libp2p-core/crypto"
14-
"github.com/libp2p/go-libp2p-core/network"
15-
"github.com/libp2p/go-libp2p/p2p/muxer/mplex"
16-
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
17-
quic "github.com/libp2p/go-libp2p/p2p/transport/quic"
18-
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
11+
"github.com/libp2p/go-libp2p-core/host"
12+
peer "github.com/libp2p/go-libp2p-core/peer"
1913
)
2014

2115
type BitswapServer struct {
22-
port int
2316
remoteStore blockstore.Blockstore
2417

2518
ctx context.Context
2619
cancel context.CancelFunc
2720
server *server.Server
21+
host host.Host
2822
}
2923

30-
func NewBitswapServer(port int, remoteStore blockstore.Blockstore) *BitswapServer {
31-
return &BitswapServer{port: port, remoteStore: remoteStore}
24+
func NewBitswapServer(remoteStore blockstore.Blockstore, host host.Host) *BitswapServer {
25+
return &BitswapServer{remoteStore: remoteStore, host: host}
3226
}
3327

34-
func (s *BitswapServer) Start(ctx context.Context) error {
28+
func (s *BitswapServer) Start(ctx context.Context, balancer peer.AddrInfo) error {
3529
s.ctx, s.cancel = context.WithCancel(ctx)
36-
// setup libp2p host
37-
privKey, _, err := crypto.GenerateECDSAKeyPair(rand.Reader)
38-
if err != nil {
39-
return err
40-
}
4130

42-
host, err := libp2p.New(
43-
libp2p.ListenAddrStrings(
44-
fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", s.port),
45-
fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic", s.port),
46-
),
47-
libp2p.Transport(tcp.NewTCPTransport),
48-
libp2p.Transport(quic.NewTransport),
49-
libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport),
50-
libp2p.Muxer("/yamux/1.0.0", yamux.DefaultTransport),
51-
libp2p.Identity(privKey),
52-
libp2p.ResourceManager(network.NullResourceManager),
53-
)
31+
host, err := protocolproxy.NewForwardingHost(ctx, s.host, balancer)
5432
if err != nil {
5533
return err
5634
}

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ require (
1010
contrib.go.opencensus.io/exporter/prometheus v0.4.0
1111
github.com/BurntSushi/toml v1.1.0
1212
github.com/alecthomas/jsonschema v0.0.0-20200530073317-71f438968921
13+
github.com/benbjohnson/clock v1.3.0
1314
github.com/buger/goterm v1.0.3
1415
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e
1516
github.com/davecgh/go-spew v1.1.1
@@ -83,6 +84,7 @@ require (
8384
github.com/libp2p/go-libp2p-pubsub v0.7.1
8485
github.com/libp2p/go-libp2p-record v0.1.3
8586
github.com/libp2p/go-libp2p-resource-manager v0.5.3
87+
github.com/libp2p/go-msgio v0.2.0
8688
github.com/mattn/go-sqlite3 v1.14.10
8789
github.com/mitchellh/go-homedir v1.1.0
8890
github.com/multiformats/go-multiaddr v0.6.0

node/builder.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/filecoin-project/boost/node/impl/common"
1919
"github.com/filecoin-project/boost/node/modules"
2020
"github.com/filecoin-project/boost/node/modules/dtypes"
21+
"github.com/filecoin-project/boost/protocolproxy"
2122
"github.com/filecoin-project/boost/retrievalmarket/lp2pimpl"
2223
"github.com/filecoin-project/boost/sealingpipeline"
2324
"github.com/filecoin-project/boost/storagemanager"
@@ -80,7 +81,8 @@ import (
8081
var log = logging.Logger("builder")
8182

8283
// special is a type used to give keys to modules which
83-
// can't really be identified by the returned type
84+
//
85+
// can't really be identified by the returned type
8486
type special struct{ id int }
8587

8688
//nolint:golint
@@ -104,6 +106,7 @@ var (
104106
type invoke int
105107

106108
// Invokes are called in the order they are defined.
109+
//
107110
//nolint:golint
108111
const (
109112
// InitJournal at position 0 initializes the journal global var as soon as
@@ -140,6 +143,7 @@ const (
140143
HandleDealsKey
141144
HandleRetrievalKey
142145
HandleRetrievalTransportsKey
146+
HandleProtocolProxyKey
143147
RunSectorServiceKey
144148

145149
// boost should be started after legacy markets (HandleDealsKey)
@@ -523,7 +527,9 @@ func ConfigBoost(cfg *config.Boost) Option {
523527
Override(new(retrievalmarket.RetrievalProvider), lotus_modules.RetrievalProvider),
524528
Override(HandleRetrievalKey, lotus_modules.HandleRetrieval),
525529
Override(new(*lp2pimpl.TransportsListener), modules.NewTransportsListener(cfg)),
530+
Override(new(*protocolproxy.ProtocolProxy), modules.NewProtocolProxy(cfg)),
526531
Override(HandleRetrievalTransportsKey, modules.HandleRetrievalTransports),
532+
Override(HandleProtocolProxyKey, modules.HandleProtocolProxy),
527533
Override(new(idxprov.MeshCreator), idxprov.NewMeshCreator),
528534
Override(new(provider.Interface), modules.IndexProvider(cfg.IndexProvider)),
529535

node/config/doc_gen.go

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

node/config/types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,8 @@ type DealmakingConfig struct {
206206
// The time that can elapse before a download is considered stalled (and
207207
// another concurrent download is allowed to start).
208208
HttpTransferStallTimeout Duration
209+
210+
BitswapPeerID string
209211
}
210212

211213
type FeeConfig struct {

node/modules/retrieval.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,24 @@ import (
55
"fmt"
66

77
"github.com/filecoin-project/boost/node/config"
8+
"github.com/filecoin-project/boost/protocolproxy"
89
"github.com/filecoin-project/boost/retrievalmarket/lp2pimpl"
910
"github.com/filecoin-project/boost/retrievalmarket/types"
11+
"github.com/ipfs/go-bitswap/network"
1012
"github.com/libp2p/go-libp2p-core/host"
13+
"github.com/libp2p/go-libp2p-core/peer"
14+
"github.com/libp2p/go-libp2p-core/protocol"
1115
"github.com/multiformats/go-multiaddr"
1216
"go.uber.org/fx"
1317
)
1418

19+
var bitswapProtocols = []protocol.ID{
20+
network.ProtocolBitswap,
21+
network.ProtocolBitswapNoVers,
22+
network.ProtocolBitswapOneOne,
23+
network.ProtocolBitswapOneZero,
24+
}
25+
1526
func NewTransportsListener(cfg *config.Boost) func(h host.Host) (*lp2pimpl.TransportsListener, error) {
1627
return func(h host.Host) (*lp2pimpl.TransportsListener, error) {
1728
protos := []types.Protocol{}
@@ -39,6 +50,12 @@ func NewTransportsListener(cfg *config.Boost) func(h host.Host) (*lp2pimpl.Trans
3950
Addresses: []multiaddr.Multiaddr{maddr},
4051
})
4152
}
53+
if cfg.Dealmaking.BitswapPeerID != "" {
54+
protos = append(protos, types.Protocol{
55+
Name: "bitswap",
56+
Addresses: h.Addrs(),
57+
})
58+
}
4259

4360
return lp2pimpl.NewTransportsListener(h, protos), nil
4461
}
@@ -58,3 +75,32 @@ func HandleRetrievalTransports(lc fx.Lifecycle, l *lp2pimpl.TransportsListener)
5875
},
5976
})
6077
}
78+
79+
func NewProtocolProxy(cfg *config.Boost) func(h host.Host) (*protocolproxy.ProtocolProxy, error) {
80+
return func(h host.Host) (*protocolproxy.ProtocolProxy, error) {
81+
peerConfig := map[peer.ID][]protocol.ID{}
82+
if cfg.Dealmaking.BitswapPeerID != "" {
83+
bsPeerID, err := peer.Decode(cfg.Dealmaking.BitswapPeerID)
84+
if err != nil {
85+
return nil, err
86+
}
87+
peerConfig[bsPeerID] = bitswapProtocols
88+
}
89+
return protocolproxy.NewProtocolProxy(h, peerConfig)
90+
}
91+
}
92+
93+
func HandleProtocolProxy(lc fx.Lifecycle, pp *protocolproxy.ProtocolProxy) {
94+
lc.Append(fx.Hook{
95+
OnStart: func(ctx context.Context) error {
96+
log.Info("starting load balancer")
97+
pp.Start(ctx)
98+
return nil
99+
},
100+
OnStop: func(context.Context) error {
101+
log.Info("stopping load balancer")
102+
pp.Close()
103+
return nil
104+
},
105+
})
106+
}

0 commit comments

Comments
 (0)