Skip to content

Commit

Permalink
Merge pull request #6547 from The-K-R-O-K/UlyanaAndrukhiv/6344-ambigu…
Browse files Browse the repository at this point in the history
…ous-references

[Observer] Ambiguous references
  • Loading branch information
Guitarheroua authored Oct 23, 2024
2 parents 6a254ed + 02134af commit 1d55978
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 219 deletions.
103 changes: 7 additions & 96 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@ import (
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
badgerds "github.com/ipfs/go-ds-badger2"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/onflow/crypto"
"github.com/rs/zerolog"
"github.com/spf13/pflag"
Expand Down Expand Up @@ -92,17 +89,12 @@ import (
"github.com/onflow/flow-go/network/converter"
"github.com/onflow/flow-go/network/p2p"
"github.com/onflow/flow-go/network/p2p/blob"
p2pbuilder "github.com/onflow/flow-go/network/p2p/builder"
p2pbuilderconfig "github.com/onflow/flow-go/network/p2p/builder/config"
"github.com/onflow/flow-go/network/p2p/cache"
"github.com/onflow/flow-go/network/p2p/conduit"
p2pdht "github.com/onflow/flow-go/network/p2p/dht"
"github.com/onflow/flow-go/network/p2p/keyutils"
p2plogging "github.com/onflow/flow-go/network/p2p/logging"
networkingsubscription "github.com/onflow/flow-go/network/p2p/subscription"
"github.com/onflow/flow-go/network/p2p/translator"
"github.com/onflow/flow-go/network/p2p/unicast/protocols"
"github.com/onflow/flow-go/network/p2p/utils"
"github.com/onflow/flow-go/network/slashing"
"github.com/onflow/flow-go/network/underlay"
"github.com/onflow/flow-go/network/validator"
Expand Down Expand Up @@ -137,8 +129,6 @@ import (
// For a node running as a standalone process, the config fields will be populated from the command line params,
// while for a node running as a library, the config fields are expected to be initialized by the caller.
type ObserverServiceConfig struct {
bootstrapNodeAddresses []string
bootstrapNodePublicKeys []string
observerNetworkingKeyPath string
bootstrapIdentities flow.IdentitySkeletonList // the identity list of bootstrap peers the node uses to discover other nodes
apiRatelimits map[string]int
Expand Down Expand Up @@ -222,8 +212,6 @@ func DefaultObserverServiceConfig() *ObserverServiceConfig {
rpcMetricsEnabled: false,
apiRatelimits: nil,
apiBurstlimits: nil,
bootstrapNodeAddresses: []string{},
bootstrapNodePublicKeys: []string{},
observerNetworkingKeyPath: cmd.NotSet,
apiTimeout: 3 * time.Second,
upstreamNodeAddresses: []string{},
Expand Down Expand Up @@ -333,7 +321,7 @@ func (builder *ObserverServiceBuilder) deriveBootstrapPeerIdentities() error {
return nil
}

ids, err := cmd.BootstrapIdentities(builder.bootstrapNodeAddresses, builder.bootstrapNodePublicKeys)
ids, err := builder.DeriveBootstrapPeerIdentities()
if err != nil {
return fmt.Errorf("failed to derive bootstrap peer identities: %w", err)
}
Expand Down Expand Up @@ -654,14 +642,6 @@ func (builder *ObserverServiceBuilder) extraFlags() {
"observer-networking-key-path",
defaultConfig.observerNetworkingKeyPath,
"path to the networking key for observer")
flags.StringSliceVar(&builder.bootstrapNodeAddresses,
"bootstrap-node-addresses",
defaultConfig.bootstrapNodeAddresses,
"the network addresses of the bootstrap access node if this is an observer e.g. access-001.mainnet.flow.org:9653,access-002.mainnet.flow.org:9653")
flags.StringSliceVar(&builder.bootstrapNodePublicKeys,
"bootstrap-node-public-keys",
defaultConfig.bootstrapNodePublicKeys,
"the networking public key of the bootstrap access node if this is an observer (in the same order as the bootstrap node addresses) e.g. \"d57a5e9c5.....\",\"44ded42d....\"")
flags.DurationVar(&builder.apiTimeout, "upstream-api-timeout", defaultConfig.apiTimeout, "tcp timeout for Flow API gRPC sockets to upstrem nodes")
flags.StringSliceVar(&builder.upstreamNodeAddresses,
"upstream-node-addresses",
Expand Down Expand Up @@ -1001,10 +981,10 @@ func (builder *ObserverServiceBuilder) validateParams() error {
if len(builder.bootstrapIdentities) > 0 {
return nil
}
if len(builder.bootstrapNodeAddresses) == 0 {
if len(builder.BootstrapNodeAddresses) == 0 {
return errors.New("no bootstrap node address provided")
}
if len(builder.bootstrapNodeAddresses) != len(builder.bootstrapNodePublicKeys) {
if len(builder.BootstrapNodeAddresses) != len(builder.BootstrapNodePublicKeys) {
return errors.New("number of bootstrap node addresses and public keys should match")
}
if len(builder.upstreamNodePublicKeys) > 0 && len(builder.upstreamNodeAddresses) != len(builder.upstreamNodePublicKeys) {
Expand All @@ -1013,77 +993,6 @@ func (builder *ObserverServiceBuilder) validateParams() error {
return nil
}

// initPublicLibp2pNode creates a libp2p node for the observer service in the public (unstaked) network.
// The factory function is later passed into the initMiddleware function to eventually instantiate the p2p.LibP2PNode instance
// The LibP2P host is created with the following options:
// * DHT as client and seeded with the given bootstrap peers
// * The specified bind address as the listen address
// * The passed in private key as the libp2p key
// * No connection gater
// * No connection manager
// * No peer manager
// * Default libp2p pubsub options.
// Args:
// - networkKey: the private key to use for the libp2p node
// Returns:
// - p2p.LibP2PNode: the libp2p node
// - error: if any error occurs. Any error returned is considered irrecoverable.
func (builder *ObserverServiceBuilder) initPublicLibp2pNode(networkKey crypto.PrivateKey) (p2p.LibP2PNode, error) {
var pis []peer.AddrInfo

for _, b := range builder.bootstrapIdentities {
pi, err := utils.PeerAddressInfo(*b)
if err != nil {
return nil, fmt.Errorf("could not extract peer address info from bootstrap identity %v: %w", b, err)
}

pis = append(pis, pi)
}

node, err := p2pbuilder.NewNodeBuilder(
builder.Logger,
&builder.FlowConfig.NetworkConfig.GossipSub,
&p2pbuilderconfig.MetricsConfig{
HeroCacheFactory: builder.HeroCacheMetricsFactory(),
Metrics: builder.Metrics.Network,
},
network.PublicNetwork,
builder.BaseConfig.BindAddr,
networkKey,
builder.SporkID,
builder.IdentityProvider,
&builder.FlowConfig.NetworkConfig.ResourceManager,
p2pbuilderconfig.PeerManagerDisableConfig(), // disable peer manager for observer node.
&p2p.DisallowListCacheConfig{
MaxSize: builder.FlowConfig.NetworkConfig.DisallowListNotificationCacheSize,
Metrics: metrics.DisallowListCacheMetricsFactory(builder.HeroCacheMetricsFactory(), network.PublicNetwork),
},
&p2pbuilderconfig.UnicastConfig{
Unicast: builder.FlowConfig.NetworkConfig.Unicast,
}).
SetSubscriptionFilter(
networkingsubscription.NewRoleBasedFilter(
networkingsubscription.UnstakedRole, builder.IdentityProvider,
),
).
SetRoutingSystem(func(ctx context.Context, h host.Host) (routing.Routing, error) {
return p2pdht.NewDHT(ctx, h, protocols.FlowPublicDHTProtocolID(builder.SporkID),
builder.Logger,
builder.Metrics.Network,
p2pdht.AsClient(),
dht.BootstrapPeers(pis...),
)
}).
Build()
if err != nil {
return nil, fmt.Errorf("could not initialize libp2p node for observer: %w", err)
}

builder.LibP2PNode = node

return builder.LibP2PNode, nil
}

// initObserverLocal initializes the observer's ID, network key and network address
// Currently, it reads a node-info.priv.json like any other node.
// TODO: read the node ID from the special bootstrap files
Expand Down Expand Up @@ -1672,11 +1581,13 @@ func (builder *ObserverServiceBuilder) enqueuePublicNetworkInit() {
builder.
Component("public libp2p node", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
var err error
publicLibp2pNode, err = builder.initPublicLibp2pNode(node.NetworkKey)
publicLibp2pNode, err = builder.BuildPublicLibp2pNode(builder.BaseConfig.BindAddr, builder.bootstrapIdentities)
if err != nil {
return nil, fmt.Errorf("could not create public libp2p node: %w", err)
return nil, fmt.Errorf("could not build public libp2p node: %w", err)
}

builder.LibP2PNode = publicLibp2pNode

return publicLibp2pNode, nil
}).
Component("public network", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
Expand Down
52 changes: 27 additions & 25 deletions cmd/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ type FlowNodeBuilder struct {
adminCommandBootstrapper *admin.CommandRunnerBootstrapper
adminCommands map[string]func(config *NodeConfig) commands.AdminCommand
componentBuilder component.ComponentManagerBuilder
bootstrapNodeAddresses []string
bootstrapNodePublicKeys []string
BootstrapNodeAddresses []string
BootstrapNodePublicKeys []string
}

var _ NodeBuilder = (*FlowNodeBuilder)(nil)
Expand Down Expand Up @@ -254,13 +254,13 @@ func (fnb *FlowNodeBuilder) BaseFlags() {

// observer mode allows a unstaked execution node to fetch blocks from a public staked access node, and being able to execute blocks
fnb.flags.BoolVar(&fnb.BaseConfig.ObserverMode, "observer-mode", defaultConfig.ObserverMode, "whether the node is running in observer mode")
fnb.flags.StringSliceVar(&fnb.bootstrapNodePublicKeys,
fnb.flags.StringSliceVar(&fnb.BootstrapNodePublicKeys,
"observer-mode-bootstrap-node-public-keys",
nil,
[]string{},
"the networking public key of the bootstrap access node if this is an observer (in the same order as the bootstrap node addresses) e.g. \"d57a5e9c5.....\",\"44ded42d....\"")
fnb.flags.StringSliceVar(&fnb.bootstrapNodeAddresses,
fnb.flags.StringSliceVar(&fnb.BootstrapNodeAddresses,
"observer-mode-bootstrap-node-addresses",
nil,
[]string{},
"the network addresses of the bootstrap access node if this is an observer e.g. access-001.mainnet.flow.org:9653,access-002.mainnet.flow.org:9653")
}

Expand Down Expand Up @@ -413,8 +413,13 @@ func (fnb *FlowNodeBuilder) EnqueueNetworkInit() {
}

if fnb.ObserverMode {
// observer mode only init pulbic libp2p node
publicLibp2pNode, err := fnb.BuildPublicLibp2pNode(myAddr)
// observer mode only init public libp2p node
ids, err := fnb.DeriveBootstrapPeerIdentities()
if err != nil {
return nil, fmt.Errorf("failed to derive bootstrap peer identities: %w", err)
}

publicLibp2pNode, err := fnb.BuildPublicLibp2pNode(myAddr, ids)
if err != nil {
return nil, fmt.Errorf("could not build public libp2p node: %w", err)
}
Expand Down Expand Up @@ -500,7 +505,18 @@ func (fnb *FlowNodeBuilder) HeroCacheMetricsFactory() metrics.HeroCacheMetricsFa
return metrics.NewNoopHeroCacheMetricsFactory()
}

// initPublicLibp2pNode creates a libp2p node for the observer service in the public (unstaked) network.
// DeriveBootstrapPeerIdentities derives the Flow Identity of the bootstrap peers from the parameters.
// These are the identities of the observers also acting as the DHT bootstrap server
func (fnb *FlowNodeBuilder) DeriveBootstrapPeerIdentities() (flow.IdentitySkeletonList, error) {
ids, err := BootstrapIdentities(fnb.BootstrapNodeAddresses, fnb.BootstrapNodePublicKeys)
if err != nil {
return nil, fmt.Errorf("failed to derive bootstrap peer identities: %w", err)
}

return ids, nil
}

// BuildPublicLibp2pNode creates a libp2p node for the observer service in the public (unstaked) network.
// The factory function is later passed into the initMiddleware function to eventually instantiate the p2p.LibP2PNode instance
// The LibP2P host is created with the following options:
// * DHT as client and seeded with the given bootstrap peers
Expand All @@ -515,24 +531,10 @@ func (fnb *FlowNodeBuilder) HeroCacheMetricsFactory() metrics.HeroCacheMetricsFa
// Returns:
// - p2p.LibP2PNode: the libp2p node
// - error: if any error occurs. Any error returned is considered irrecoverable.
func (fnb *FlowNodeBuilder) BuildPublicLibp2pNode(address string) (p2p.LibP2PNode, error) {
func (fnb *FlowNodeBuilder) BuildPublicLibp2pNode(address string, bootstrapIdentities flow.IdentitySkeletonList) (p2p.LibP2PNode, error) {
var pis []peer.AddrInfo

ids, err := BootstrapIdentities(fnb.bootstrapNodeAddresses, fnb.bootstrapNodePublicKeys)
if err != nil {
return nil, fmt.Errorf("could not create bootstrap identities: %w", err)
}

for _, b := range ids {
pi, err := utils.PeerAddressInfo(*b)
if err != nil {
return nil, fmt.Errorf("could not extract peer address info from bootstrap identity %v: %w", b, err)
}

pis = append(pis, pi)
}

for _, b := range ids {
for _, b := range bootstrapIdentities {
pi, err := utils.PeerAddressInfo(*b)
if err != nil {
return nil, fmt.Errorf("could not extract peer address info from bootstrap identity %v: %w", b, err)
Expand Down
Loading

0 comments on commit 1d55978

Please sign in to comment.