Skip to content

Commit

Permalink
node: governor publish gossip (wormhole-foundation#1538)
Browse files Browse the repository at this point in the history
* Governor publish gossip

Change-Id: I2b8b1ea84a0c411101a7027acd3a27a6d6464d59

* Update the config publish time

Change-Id: Ic6abf84befb1c20756da2ff66b15a8325dc46067

* Not setting value on enqueued VAAs correctly

Change-Id: I9fd3a5d8fc574f8382125445fa688efdae45b88c

* Publish at most 20 VAAs, not 20 per chain

Change-Id: Ic9dff99c59ee89d57fd79158844a1fe1a0003112

* Switch to using signed messages

Change-Id: I66cddc7477cd477aa77bdadfc346b588f2ae645b

* Publish status only once per minute

Change-Id: I972fb0cf868e89c6f74ae4441471a55df389f4dd

* Minor comment change

Change-Id: I0d3e443cbec7edd282f89c1a5cce5d5ec8776d55
  • Loading branch information
bruce-riley authored Sep 26, 2022
1 parent fb9f93e commit 2b582b1
Show file tree
Hide file tree
Showing 4 changed files with 258 additions and 12 deletions.
24 changes: 14 additions & 10 deletions node/pkg/governor/governor.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,20 @@ func (ce *chainEntry) isBigTransfer(value uint64) bool {
}

type ChainGovernor struct {
db db.GovernorDB
logger *zap.Logger
mutex sync.Mutex
tokens map[tokenKey]*tokenEntry
tokensByCoinGeckoId map[string][]*tokenEntry
chains map[vaa.ChainID]*chainEntry
msgsToPublish []*common.MessagePublication
dayLengthInMinutes int
coinGeckoQuery string
env int
db db.GovernorDB
logger *zap.Logger
mutex sync.Mutex
tokens map[tokenKey]*tokenEntry
tokensByCoinGeckoId map[string][]*tokenEntry
chains map[vaa.ChainID]*chainEntry
msgsToPublish []*common.MessagePublication
dayLengthInMinutes int
coinGeckoQuery string
env int
nextStatusPublishTime time.Time
nextConfigPublishTime time.Time
statusPublishCounter int64
configPublishCounter int64
}

func NewChainGovernor(
Expand Down
168 changes: 167 additions & 1 deletion node/pkg/governor/governor_monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,22 @@
// - This is a single metric that indicates the total number of enqueued VAAs across all chains. This provides a quick check if
// anything is currently being limited.

// The chain governor also publishes the following messages to the gossip network
//
// SignedChainGovernorConfig
// - Published once every five minutes.
// - Contains a list of configured chains, along with the daily limit, big transaction size and current price.
//
// - SignedChainGovernorStatus
// - Published once a minute.
// - Contains a list of configured chains along with their remaining available notional value, the number of enqueued VAAs
// and information on zero or more enqueued VAAs.
// - Only the first 20 enqueued VAAs are include, to constrain the message size.

package governor

import (
"crypto/ecdsa"
"fmt"
"sort"
"time"
Expand All @@ -73,8 +86,13 @@ import (
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"

ethCommon "github.com/ethereum/go-ethereum/common"
ethCrypto "github.com/ethereum/go-ethereum/crypto"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"google.golang.org/protobuf/proto"
)

// Admin command to display status to the log.
Expand Down Expand Up @@ -378,7 +396,7 @@ var (
})
)

func (gov *ChainGovernor) CollectMetrics(hb *gossipv1.Heartbeat) {
func (gov *ChainGovernor) CollectMetrics(hb *gossipv1.Heartbeat, sendC chan []byte, gk *ecdsa.PrivateKey, ourAddr ethCommon.Address) {
gov.mutex.Lock()
defer gov.mutex.Unlock()

Expand Down Expand Up @@ -431,4 +449,152 @@ func (gov *ChainGovernor) CollectMetrics(hb *gossipv1.Heartbeat) {
}

metricTotalEnqueuedVAAs.Set(float64(totalPending))

if startTime.After(gov.nextConfigPublishTime) {
gov.publishConfig(hb, sendC, gk, ourAddr)
gov.nextConfigPublishTime = startTime.Add(time.Minute * time.Duration(5))
}

if startTime.After(gov.nextStatusPublishTime) {
gov.publishStatus(hb, sendC, startTime, gk, ourAddr)
gov.nextStatusPublishTime = startTime.Add(time.Minute)
}
}

var governorMessagePrefix = []byte("governor|")

func (gov *ChainGovernor) publishConfig(hb *gossipv1.Heartbeat, sendC chan []byte, gk *ecdsa.PrivateKey, ourAddr ethCommon.Address) {
chains := make([]*gossipv1.ChainGovernorConfig_Chain, 0)
for _, ce := range gov.chains {
chains = append(chains, &gossipv1.ChainGovernorConfig_Chain{
ChainId: uint32(ce.emitterChainId),
NotionalLimit: ce.dailyLimit,
BigTransactionSize: ce.bigTransactionSize,
})
}

tokens := make([]*gossipv1.ChainGovernorConfig_Token, 0)
for tk, te := range gov.tokens {
price, _ := te.price.Float32()
tokens = append(tokens, &gossipv1.ChainGovernorConfig_Token{
OriginChainId: uint32(tk.chain),
OriginAddress: "0x" + tk.addr.String(),
Price: price,
})
}

gov.configPublishCounter += 1
payload := &gossipv1.ChainGovernorConfig{
NodeName: hb.NodeName,
Counter: gov.configPublishCounter,
Timestamp: hb.Timestamp,
Chains: chains,
Tokens: tokens,
}

b, err := proto.Marshal(payload)
if err != nil {
gov.logger.Error("cgov: failed to marshal config message", zap.Error(err))
return
}

digest := ethCrypto.Keccak256Hash(append(governorMessagePrefix, b...))

sig, err := ethCrypto.Sign(digest.Bytes(), gk)
if err != nil {
panic(err)
}

msg := gossipv1.GossipMessage{Message: &gossipv1.GossipMessage_SignedChainGovernorConfig{
SignedChainGovernorConfig: &gossipv1.SignedChainGovernorConfig{
Config: b,
Signature: sig,
GuardianAddr: ourAddr.Bytes(),
}}}

b, err = proto.Marshal(&msg)
if err != nil {
panic(err)
}

sendC <- b
}

func (gov *ChainGovernor) publishStatus(hb *gossipv1.Heartbeat, sendC chan []byte, startTime time.Time, gk *ecdsa.PrivateKey, ourAddr ethCommon.Address) {
chains := make([]*gossipv1.ChainGovernorStatus_Chain, 0)
numEnqueued := 0
for _, ce := range gov.chains {
value := sumValue(ce.transfers, startTime)
if value >= ce.dailyLimit {
value = 0
} else {
value = ce.dailyLimit - value
}

enqueuedVaas := make([]*gossipv1.ChainGovernorStatus_EnqueuedVAA, 0)
for _, pe := range ce.pending {
value, err := computeValue(pe.amount, pe.token)
if err != nil {
gov.logger.Error("cgov: failed to compute value of pending transfer", zap.String("msgID", pe.dbData.Msg.MessageIDString()), zap.Error(err))
value = 0
}

if numEnqueued < 20 {
numEnqueued = numEnqueued + 1
enqueuedVaas = append(enqueuedVaas, &gossipv1.ChainGovernorStatus_EnqueuedVAA{
Sequence: pe.dbData.Msg.Sequence,
ReleaseTime: uint32(pe.dbData.ReleaseTime.Unix()),
NotionalValue: value,
TxHash: pe.dbData.Msg.TxHash.String(),
})
}
}

emitter := gossipv1.ChainGovernorStatus_Emitter{
EmitterAddress: "0x" + ce.emitterAddr.String(),
TotalEnqueuedVaas: uint64(len(ce.pending)),
EnqueuedVaas: enqueuedVaas,
}

chains = append(chains, &gossipv1.ChainGovernorStatus_Chain{
ChainId: uint32(ce.emitterChainId),
RemainingAvailableNotional: value,
Emitters: []*gossipv1.ChainGovernorStatus_Emitter{&emitter},
})
}

gov.statusPublishCounter += 1
payload := &gossipv1.ChainGovernorStatus{
NodeName: hb.NodeName,
Counter: gov.statusPublishCounter,
Timestamp: hb.Timestamp,
Chains: chains,
}

b, err := proto.Marshal(payload)
if err != nil {
gov.logger.Error("cgov: failed to marshal status message", zap.Error(err))
return
}

digest := ethCrypto.Keccak256Hash(append(governorMessagePrefix, b...))

sig, err := ethCrypto.Sign(digest.Bytes(), gk)
if err != nil {
panic(err)
}

msg := gossipv1.GossipMessage{Message: &gossipv1.GossipMessage_SignedChainGovernorStatus{
SignedChainGovernorStatus: &gossipv1.SignedChainGovernorStatus{
Status: b,
Signature: sig,
GuardianAddr: ourAddr.Bytes(),
}}}

b, err = proto.Marshal(&msg)
if err != nil {
panic(err)
}

sendC <- b
}
6 changes: 5 additions & 1 deletion node/pkg/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func Run(obsvC chan *gossipv1.SignedObservation, obsvReqC chan *gossipv1.Observa
collectNodeMetrics(ourAddr, h.ID(), heartbeat)

if gov != nil {
gov.CollectMetrics(heartbeat)
gov.CollectMetrics(heartbeat, sendC, gk, ourAddr)
}

b, err := proto.Marshal(heartbeat)
Expand Down Expand Up @@ -407,6 +407,10 @@ func Run(obsvC chan *gossipv1.SignedObservation, obsvReqC chan *gossipv1.Observa

obsvReqC <- r
}
case *gossipv1.GossipMessage_SignedChainGovernorConfig:
logger.Debug("cgov: received config message")
case *gossipv1.GossipMessage_SignedChainGovernorStatus:
logger.Debug("cgov: received status message")
default:
p2pMessagesReceived.WithLabelValues("unknown").Inc()
logger.Warn("received unknown message type (running outdated software?)",
Expand Down
72 changes: 72 additions & 0 deletions proto/gossip/v1/gossip.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ message GossipMessage {
SignedObservationRequest signed_observation_request = 5;
SignedBatchObservation signed_batch_observation = 6;
SignedBatchVAAWithQuorum signed_batch_vaa_with_quorum = 7;
SignedChainGovernorConfig signed_chain_governor_config = 8;
SignedChainGovernorStatus signed_chain_governor_status = 9;
}
}

Expand Down Expand Up @@ -145,3 +147,73 @@ message SignedBatchObservation {
message SignedBatchVAAWithQuorum {
bytes batch_vaa = 1;
}

// This message is published every five minutes.
message SignedChainGovernorConfig {
// Serialized ChainGovernorConfig message.
bytes config = 1;

// ECDSA signature using the node's guardian key.
bytes signature = 2;

// Guardian address that signed this payload (truncated Eth address).
bytes guardian_addr = 3;
}

message ChainGovernorConfig {
message Chain {
uint32 chain_id = 1;
uint64 notional_limit = 2;
uint64 big_transaction_size = 3;
}

message Token {
uint32 origin_chain_id = 1;
string origin_address = 2; // human-readable hex-encoded (leading 0x)
float price = 3;
}

string node_name = 1;
int64 counter = 2;
int64 timestamp = 3;
repeated Chain chains = 4;
repeated Token tokens = 5;
}

// This message is published every minute.
message SignedChainGovernorStatus {
// Serialized ChainGovernorStatus message.
bytes status = 1;

// ECDSA signature using the node's guardian key.
bytes signature = 2;

// Guardian address that signed this payload (truncated Eth address).
bytes guardian_addr = 3;
}

message ChainGovernorStatus {
message EnqueuedVAA {
uint64 sequence = 1; // Chain and emitter address are assumed.
uint32 release_time = 2;
uint64 notional_value = 3;
string tx_hash = 4;
}

message Emitter {
string emitter_address = 1; // human-readable hex-encoded (leading 0x)
uint64 total_enqueued_vaas = 2;
repeated EnqueuedVAA enqueued_vaas = 3; // Only the first 20 will be included.
}

message Chain {
uint32 chain_id = 1;
uint64 remaining_available_notional = 2;
repeated Emitter emitters = 3;
}

string node_name = 1;
int64 counter = 2;
int64 timestamp = 3;
repeated Chain chains = 4;
}

0 comments on commit 2b582b1

Please sign in to comment.