Skip to content

Commit

Permalink
Remove start time dependency for announce protocol and proxy (ethereu…
Browse files Browse the repository at this point in the history
…m#1035)

* removed start time dependency for announce protocol and proxy

* created new files gossip_cache.go, message_senders.go, protocol.go

* removed istanbul protocol constants from eth package

* maintain validator connections even if core is not started

* fixed some unit tests

* fixed more unit tests

* removed the candidates property from the istanbul rpc api

* addressed PR comments

* addressed PR commentts

Co-authored-by: Kevin Jue <kevin@celo.org>
  • Loading branch information
kevjue and Kevin Jue authored Jun 2, 2020
1 parent 484f45d commit 96f6904
Show file tree
Hide file tree
Showing 29 changed files with 463 additions and 292 deletions.
6 changes: 6 additions & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,12 @@ func startNode(ctx *cli.Context, stack *node.Node) {
if err := ethereum.StartMining(threads); err != nil {
utils.Fatalf("Failed to start mining: %v", err)
}
// Start the proxy handler if this is a node is proxied and "mining"
if ctx.GlobalBool(utils.ProxiedFlag.Name) {
if err := ethereum.StartProxyHandler(); err != nil {
utils.Fatalf("Failed to start the proxy handler: %v", err)
}
}
}
if !ctx.GlobalBool(utils.VersionCheckFlag.Name) {
blockchain_parameters.SpawnCheck()
Expand Down
4 changes: 2 additions & 2 deletions cmd/geth/misccmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"strings"

"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/consensus/istanbul"
"github.com/ethereum/go-ethereum/params"
"gopkg.in/urfave/cli.v1"
)
Expand Down Expand Up @@ -58,7 +58,7 @@ func version(ctx *cli.Context) error {
fmt.Println("Git Commit Date:", gitDate)
}
fmt.Println("Architecture:", runtime.GOARCH)
fmt.Println("Protocol Versions:", eth.ProtocolVersions)
fmt.Println("Protocol Versions:", istanbul.ProtocolVersions)
fmt.Println("Go Version:", runtime.Version())
fmt.Println("Operating System:", runtime.GOOS)
fmt.Printf("GOPATH=%s\n", os.Getenv("GOPATH"))
Expand Down
1 change: 1 addition & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1387,6 +1387,7 @@ func setIstanbul(ctx *cli.Context, stack *node.Node, cfg *eth.Config) {
cfg.Istanbul.ValidatorEnodeDBPath = stack.ResolvePath(cfg.Istanbul.ValidatorEnodeDBPath)
cfg.Istanbul.VersionCertificateDBPath = stack.ResolvePath(cfg.Istanbul.VersionCertificateDBPath)
cfg.Istanbul.RoundStateDBPath = stack.ResolvePath(cfg.Istanbul.RoundStateDBPath)
cfg.Istanbul.Validator = ctx.GlobalIsSet(MiningEnabledFlag.Name)
}

func setProxyP2PConfig(ctx *cli.Context, proxyCfg *p2p.Config) {
Expand Down
6 changes: 6 additions & 0 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ type Istanbul interface {
// StopAnnouncing stops the announcing
StopAnnouncing() error

// StartProxyHandler starts the proxy handler
StartProxyHandler() error

// StopProxyHandler stops the proxy handler
StopProxyHandler() error

// This is only implemented for Istanbul.
// It will update the validator set diff in the header, if the mined header is the last block of the epoch.
// The changes are executed inline.
Expand Down
8 changes: 4 additions & 4 deletions consensus/istanbul/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ type Backend interface {
// EventMux returns the event mux in backend
EventMux() *event.TypeMux

// BroadcastConsensusMsg sends a message to all validators (include self)
BroadcastConsensusMsg(validators []common.Address, payload []byte) error
// Gossip will send a message to all connnected peers
Gossip(payload []byte, ethMsgCode uint64) error

// Multicast sends a message to it's connected nodes filtered on the 'addresses' parameter (where each address
// is associated with those node's signing key)
// If that parameter is nil, then it will send the message to all it's connected peers.
Multicast(addresses []common.Address, payload []byte, ethMsgCode uint64) error
// If sendToSelf is set to true, then the function will send an event to self via a message event
Multicast(addresses []common.Address, payload []byte, ethMsgCode uint64, sendToSelf bool) error

// Commit delivers an approved proposal to backend.
// The delivered proposal will be put into blockchain.
Expand Down
40 changes: 27 additions & 13 deletions consensus/istanbul/backend/announce.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (sb *Backend) shouldSaveAndPublishValEnodeURLs() (bool, error) {
return false, err
}

return sb.coreStarted && validatorConnSet[sb.Address()], nil
return validatorConnSet[sb.Address()], nil
}

// pruneAnnounceDataStructures will remove entries that are not in the validator connection set from all announce related data structures.
Expand Down Expand Up @@ -419,7 +419,7 @@ func (sb *Backend) generateAndGossipQueryEnode(version uint, enforceRetryBackoff
return err
}

if err := sb.Multicast(nil, payload, istanbulQueryEnodeMsg); err != nil {
if err := sb.Gossip(payload, istanbul.QueryEnodeMsg); err != nil {
return err
}

Expand Down Expand Up @@ -501,7 +501,7 @@ func (sb *Backend) generateQueryEnodeMsg(version uint, queryEnodeDestAddresses [
}

msg := &istanbul.Message{
Code: istanbulQueryEnodeMsg,
Code: istanbul.QueryEnodeMsg,
Msg: queryEnodeBytes,
Address: sb.Address(),
Signature: []byte{},
Expand Down Expand Up @@ -544,11 +544,18 @@ func (sb *Backend) generateEncryptedEnodeURLs(enodeURL string, queryEnodeDestAdd
}

// This function will handle a queryEnode message.
func (sb *Backend) handleQueryEnodeMsg(peer consensus.Peer, payload []byte) error {
func (sb *Backend) handleQueryEnodeMsg(addr common.Address, peer consensus.Peer, payload []byte) error {
logger := sb.logger.New("func", "handleQueryEnodeMsg")

msg := new(istanbul.Message)

// Since this is a gossiped messaged, mark that the peer gossiped it (and presumably processed it) and check to see if this node already processed it
sb.markMessageProcessedByPeer(addr, payload)
if sb.checkIfMessageProcessedBySelf(payload) {
return nil
}
defer sb.markMessageProcessedBySelf(payload)

// Decode message
err := msg.FromPayload(payload, istanbul.GetSignatureAddress)
if err != nil {
Expand Down Expand Up @@ -713,7 +720,7 @@ func (sb *Backend) regossipQueryEnode(msg *istanbul.Message, msgTimestamp uint,
}

logger.Trace("Regossiping the istanbul queryEnode message", "IstanbulMsg", msg.String())
if err := sb.Multicast(nil, payload, istanbulQueryEnodeMsg); err != nil {
if err := sb.Gossip(payload, istanbul.QueryEnodeMsg); err != nil {
return err
}

Expand Down Expand Up @@ -835,7 +842,7 @@ func (sb *Backend) encodeVersionCertificatesMsg(versionCertificates []*versionCe
return nil, err
}
msg := &istanbul.Message{
Code: istanbulVersionCertificatesMsg,
Code: istanbul.VersionCertificatesMsg,
Msg: payload,
}
msgPayload, err := msg.Payload()
Expand All @@ -853,7 +860,7 @@ func (sb *Backend) gossipVersionCertificatesMsg(versionCertificates []*versionCe
logger.Warn("Error encoding version certificate msg", "err", err)
return err
}
return sb.Multicast(nil, payload, istanbulVersionCertificatesMsg)
return sb.Gossip(payload, istanbul.VersionCertificatesMsg)
}

func (sb *Backend) getAllVersionCertificates() ([]*versionCertificate, error) {
Expand Down Expand Up @@ -882,13 +889,20 @@ func (sb *Backend) sendVersionCertificateTable(peer consensus.Peer) error {
logger.Warn("Error encoding version certificate msg", "err", err)
return err
}
return peer.Send(istanbulVersionCertificatesMsg, payload)
return peer.Send(istanbul.VersionCertificatesMsg, payload)
}

func (sb *Backend) handleVersionCertificatesMsg(peer consensus.Peer, payload []byte) error {
func (sb *Backend) handleVersionCertificatesMsg(addr common.Address, peer consensus.Peer, payload []byte) error {
logger := sb.logger.New("func", "handleVersionCertificatesMsg")
logger.Trace("Handling version certificates msg")

// Since this is a gossiped messaged, mark that the peer gossiped it (and presumably processed it) and check to see if this node already processed it
sb.markMessageProcessedByPeer(addr, payload)
if sb.checkIfMessageProcessedBySelf(payload) {
return nil
}
defer sb.markMessageProcessedBySelf(payload)

var msg istanbul.Message
if err := msg.FromPayload(payload, nil); err != nil {
logger.Error("Error in decoding version certificates message", "err", err, "payload", hex.EncodeToString(payload))
Expand Down Expand Up @@ -1043,7 +1057,7 @@ func (sb *Backend) setAndShareUpdatedAnnounceVersion(version uint) error {
destAddresses[i] = address
i++
}
err = sb.Multicast(destAddresses, payload, istanbulEnodeCertificateMsg)
err = sb.Multicast(destAddresses, payload, istanbul.EnodeCertificateMsg, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -1139,7 +1153,7 @@ func (sb *Backend) generateEnodeCertificateMsg(version uint) (*istanbul.Message,
return nil, err
}
msg := &istanbul.Message{
Code: istanbulEnodeCertificateMsg,
Code: istanbul.EnodeCertificateMsg,
Address: sb.Address(),
Msg: enodeCertificateBytes,
}
Expand All @@ -1159,7 +1173,7 @@ func (sb *Backend) generateEnodeCertificateMsg(version uint) (*istanbul.Message,
// will send it back to this node.
// If the proxied validator sends an enode certificate for itself to this node,
// this node will set the enode certificate as its own for handshaking.
func (sb *Backend) handleEnodeCertificateMsg(peer consensus.Peer, payload []byte) error {
func (sb *Backend) handleEnodeCertificateMsg(_ common.Address, peer consensus.Peer, payload []byte) error {
logger := sb.logger.New("func", "handleEnodeCertificateMsg")

var msg istanbul.Message
Expand Down Expand Up @@ -1272,7 +1286,7 @@ func (sb *Backend) sendEnodeCertificateMsg(peer consensus.Peer, msg *istanbul.Me
logger.Error("Error getting payload of enode certificate message", "err", err)
return err
}
return peer.Send(istanbulEnodeCertificateMsg, payload)
return peer.Send(istanbul.EnodeCertificateMsg, payload)
}

func (sb *Backend) setEnodeCertificateMsg(msg *istanbul.Message) error {
Expand Down
2 changes: 1 addition & 1 deletion consensus/istanbul/backend/announce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestHandleIstAnnounce(t *testing.T) {
b.address = val2Address

// Handle val1's announce message
if err = b.handleQueryEnodeMsg(nil, payload); err != nil {
if err = b.handleQueryEnodeMsg(common.Address{}, nil, payload); err != nil {
t.Errorf("error %v", err)
}

Expand Down
Loading

0 comments on commit 96f6904

Please sign in to comment.