From 0121483f5ea89ee51df71885e241bbbaea8358b4 Mon Sep 17 00:00:00 2001 From: Uh Sado Date: Sun, 31 Oct 2021 00:07:12 +0000 Subject: [PATCH] eth, eth/protocols/eth: added metadium message handlers --- eth/handler.go | 5 +- eth/protocols/eth/handler.go | 14 +++++ eth/protocols/eth/handlers.go | 28 +++++++++ eth/protocols/eth/metadium_handlers.go | 78 ++++++++++++++++++++++++++ eth/protocols/eth/peer.go | 33 +++++++++++ eth/protocols/eth/protocol.go | 67 ++++++++++++++++++++-- 6 files changed, 216 insertions(+), 9 deletions(-) create mode 100644 eth/protocols/eth/metadium_handlers.go diff --git a/eth/handler.go b/eth/handler.go index a5722d3bbd5c..e119bf48ba6c 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -18,7 +18,6 @@ package eth import ( "errors" - "fmt" "math" "math/big" "sync" @@ -538,7 +537,7 @@ func (h *handler) txBroadcastLoop() { // RequestMinerStatus sends GetStatusExMsg to the given peer func (h *handler) RequestMinerStatus(id enode.ID) error { - if p := h.peers.peer(fmt.Sprintf("%x", id[:8])); p != nil { + if p := h.peers.peer(id.String()); p != nil { return p.RequestStatusEx() } else { return ethereum.NotFound @@ -547,7 +546,7 @@ func (h *handler) RequestMinerStatus(id enode.ID) error { // RequestEtcdAddMember is an internal protocol level command to add a node to the etcd cluster func (h *handler) RequestEtcdAddMember(id enode.ID) error { - if p := h.peers.peer(fmt.Sprintf("%x", id[:8])); p != nil { + if p := h.peers.peer(id.String()); p != nil { return p.RequestEtcdAddMember() } else { return ethereum.NotFound diff --git a/eth/protocols/eth/handler.go b/eth/protocols/eth/handler.go index 6bbaa2f555f1..fbff97f4423a 100644 --- a/eth/protocols/eth/handler.go +++ b/eth/protocols/eth/handler.go @@ -186,6 +186,13 @@ var eth65 = map[uint64]msgHandler{ NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes, GetPooledTransactionsMsg: handleGetPooledTransactions, PooledTransactionsMsg: handlePooledTransactions, + // metadium message handlers + GetPendingTxsMsg: handleGetPendingTxs, + GetStatusExMsg: handleGetStatusEx, + StatusExMsg: handleStatusEx, + EtcdAddMemberMsg: handleEtcdAddMember, + EtcdClusterMsg: handleEtcdCluster, + TransactionsExMsg: handleTransactionsEx, } var eth66 = map[uint64]msgHandler{ @@ -204,6 +211,13 @@ var eth66 = map[uint64]msgHandler{ ReceiptsMsg: handleReceipts66, GetPooledTransactionsMsg: handleGetPooledTransactions66, PooledTransactionsMsg: handlePooledTransactions66, + // metadium message handlers - not eth/66 yet + GetPendingTxsMsg: handleGetPendingTxs, + GetStatusExMsg: handleGetStatusEx, + StatusExMsg: handleStatusEx, + EtcdAddMemberMsg: handleEtcdAddMember, + EtcdClusterMsg: handleEtcdCluster, + TransactionsExMsg: handleTransactionsEx, } // handleMessage is invoked whenever an inbound message is received from a remote diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index d7d993a23dee..84edabf177af 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -23,6 +23,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" + metaminer "github.com/ethereum/go-ethereum/metadium/miner" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" ) @@ -477,6 +478,33 @@ func handleTransactions(backend Backend, msg Decoder, peer *Peer) error { return backend.Handle(peer, &txs) } +func handleTransactionsEx(backend Backend, msg Decoder, peer *Peer) error { + // Transactions arrived, make sure we have a valid and fresh chain to handle them + if !backend.AcceptTxs() { + return nil + } + // Transactions can be processed, parse all of them and deliver to the pool + var txexs TransactionsExPacket + if err := msg.Decode(&txexs); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + + go func() error { + signer := types.MakeSigner(backend.Chain().Config(), backend.Chain().CurrentBlock().Number()) + txs := types.TxExs2Txs(signer, txexs, metaminer.IsPartner(peer.ID())) + for i, tx := range txs { + // Validate and mark the remote transaction + if tx == nil { + return fmt.Errorf("%w: transaction %d is nil", errDecode, i) + } + peer.markTransaction(tx.Hash()) + } + txsp := TransactionsPacket(txs) + return backend.Handle(peer, &txsp) + }() + return nil +} + func handlePooledTransactions(backend Backend, msg Decoder, peer *Peer) error { // Transactions arrived, make sure we have a valid and fresh chain to handle them if !backend.AcceptTxs() { diff --git a/eth/protocols/eth/metadium_handlers.go b/eth/protocols/eth/metadium_handlers.go new file mode 100644 index 000000000000..c65a01d84e74 --- /dev/null +++ b/eth/protocols/eth/metadium_handlers.go @@ -0,0 +1,78 @@ +package eth + +import ( + "fmt" + + metaapi "github.com/ethereum/go-ethereum/metadium/api" + metaminer "github.com/ethereum/go-ethereum/metadium/miner" +) + +func handleGetPendingTxs(backend Backend, msg Decoder, peer *Peer) error { + // not supported, just ignore it. + return nil +} + +func handleGetStatusEx(backend Backend, msg Decoder, peer *Peer) error { + if !metaminer.AmPartner() || !metaminer.IsPartner(peer.ID()) { + return nil + } + + go func() { + statusEx := metaapi.GetMinerStatus() + statusEx.LatestBlockTd = backend.Chain().GetTd(statusEx.LatestBlockHash, + statusEx.LatestBlockHeight.Uint64()) + if err := peer.SendStatusEx(statusEx); err != nil { + // ignore the error + } + }() + + return nil +} + +func handleStatusEx(backend Backend, msg Decoder, peer *Peer) error { + if !metaminer.AmPartner() || !metaminer.IsPartner(peer.ID()) { + return nil + } + var status metaapi.MetadiumMinerStatus + if err := msg.Decode(&status); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + + go func() { + if _, td := peer.Head(); status.LatestBlockTd.Cmp(td) > 0 { + peer.SetHead(status.LatestBlockHash, status.LatestBlockTd) + } + metaapi.GotStatusEx(&status) + }() + + return nil +} + +func handleEtcdAddMember(backend Backend, msg Decoder, peer *Peer) error { + if !metaminer.AmPartner() || !metaminer.IsPartner(peer.ID()) { + return nil + } + + go func() { + cluster, _ := metaapi.EtcdAddMember(peer.ID()) + if err := peer.SendEtcdCluster(cluster); err != nil { + // ignore the error + } + }() + + return nil +} + +func handleEtcdCluster(backend Backend, msg Decoder, peer *Peer) error { + if !metaminer.AmPartner() || !metaminer.IsPartner(peer.ID()) { + return nil + } + var cluster string + if err := msg.Decode(&cluster); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + + go metaapi.GotEtcdCluster(cluster) + + return nil +} diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index 60e8534d54dc..808103f58204 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -24,6 +24,7 @@ import ( mapset "github.com/deckarep/golang-set" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + metaapi "github.com/ethereum/go-ethereum/metadium/api" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rlp" ) @@ -402,6 +403,32 @@ func (p *Peer) ReplyReceiptsRLP(id uint64, receipts []rlp.RawValue) error { }) } +// SendStatusEx sends this node's miner status +func (p *Peer) SendStatusEx(status *metaapi.MetadiumMinerStatus) error { + return p2p.Send(p.rw, StatusExMsg, status) +} + +// ReplyStatusEx is the eth/66 response to GetStatusEx +func (p *Peer) ReplyStatusEx(id uint64, status *metaapi.MetadiumMinerStatus) error { + return p2p.Send(p.rw, StatusExMsg, StatusExPacket66{ + RequestId: id, + StatusExPacket: StatusExPacket(*status), + }) +} + +// SendEtcdCluster sends this node's etcd cluster +func (p *Peer) SendEtcdCluster(cluster string) error { + return p2p.Send(p.rw, EtcdClusterMsg, cluster) +} + +// ReplyEtcdCluster is the eth/66 response to EtcdAddMember +func (p *Peer) ReplyEtcdCluster(id uint64, cluster string) error { + return p2p.Send(p.rw, EtcdClusterMsg, EtcdClusterPacket66{ + RequestId: id, + EtcdClusterPacket: EtcdClusterPacket(cluster), + }) +} + // RequestOneHeader is a wrapper around the header query functions to fetch a // single header. It is used solely by the fetcher. func (p *Peer) RequestOneHeader(hash common.Hash) error { @@ -545,11 +572,17 @@ func (p *Peer) RequestTxs(hashes []common.Hash) error { // RequestStatusEx fetches extended status of the peer func (p *Peer) RequestStatusEx() error { p.Log().Debug("Fetching extended status") + id := rand.Uint64() + + requestTracker.Track(p.id, p.version, GetStatusExMsg, StatusExMsg, id) return p2p.Send(p.rw, GetStatusExMsg, common.Big1) } // RequestEtcdAddMember requests the peer to add this node to the cluster func (p *Peer) RequestEtcdAddMember() error { p.Log().Debug("Trying to join etcd network") + id := rand.Uint64() + + requestTracker.Track(p.id, p.version, EtcdAddMemberMsg, EtcdClusterMsg, id) return p2p.Send(p.rw, EtcdAddMemberMsg, common.Big1) } diff --git a/eth/protocols/eth/protocol.go b/eth/protocols/eth/protocol.go index e56f6ed79546..6a8615fe2c9f 100644 --- a/eth/protocols/eth/protocol.go +++ b/eth/protocols/eth/protocol.go @@ -25,6 +25,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/forkid" "github.com/ethereum/go-ethereum/core/types" + metaapi "github.com/ethereum/go-ethereum/metadium/api" "github.com/ethereum/go-ethereum/rlp" ) @@ -70,12 +71,12 @@ const ( PooledTransactionsMsg = 0x0a // Added by Metadium, meta/64 - GetPendingTxsMsg = 0x11 - GetStatusExMsg = 0x12 - StatusExMsg = 0x13 - EtcdAddMemberMsg = 0x14 - EtcdClusterMsg = 0x15 - TxExMsg = 0x16 + GetPendingTxsMsg = 0x11 + GetStatusExMsg = 0x12 + StatusExMsg = 0x13 + EtcdAddMemberMsg = 0x14 + EtcdClusterMsg = 0x15 + TransactionsExMsg = 0x16 ) var ( @@ -128,6 +129,9 @@ func (p *NewBlockHashesPacket) Unpack() ([]common.Hash, []uint64) { // TransactionsPacket is the network packet for broadcasting new transactions. type TransactionsPacket []*types.Transaction +// TransactionsExPacket is the network packet for broadcasting new extended transactions. +type TransactionsExPacket []*types.TransactionEx + // GetBlockHeadersPacket represents a block header query. type GetBlockHeadersPacket struct { Origin HashOrNumber // Block from which to retrieve headers @@ -329,6 +333,42 @@ type PooledTransactionsRLPPacket66 struct { PooledTransactionsRLPPacket } +// GetStatusExPacket is the network packet for GetStatusEx +type GetStatusExPacket int + +// GetStatusExPacket66 is the eth/66 form of GetSTatusExPacket +type GetStatusExPacket66 struct { + RequestId uint64 + GetStatusExPacket +} + +// StatusExPacket is the network packet for extended status of a node +type StatusExPacket metaapi.MetadiumMinerStatus + +// StatusExPacket66 is the eth/66 form of StatusExPacket +type StatusExPacket66 struct { + RequestId uint64 + StatusExPacket +} + +// EtcdAddMemberPacket is the netowkr packet for EtcdAddMember +type EtcdAddMemberPacket int + +// EtcdAddMemberPacket66 is the eth/66 form of EtcdAddMember +type EtcdAddMemberPacket66 struct { + RequestId uint64 + EtcdAddMemberPacket +} + +// EtcdClusterPacket is the network packet for EtcdAddMember / EtcdCluster exchange +type EtcdClusterPacket string + +// EtcdClusterPacket66 is the eth/66 form of EtcdClusterPacket +type EtcdClusterPacket66 struct { + RequestId uint64 + EtcdClusterPacket +} + func (*StatusPacket) Name() string { return "Status" } func (*StatusPacket) Kind() byte { return StatusMsg } @@ -338,6 +378,9 @@ func (*NewBlockHashesPacket) Kind() byte { return NewBlockHashesMsg } func (*TransactionsPacket) Name() string { return "Transactions" } func (*TransactionsPacket) Kind() byte { return TransactionsMsg } +func (*TransactionsExPacket) Name() string { return "TransactionsEx" } +func (*TransactionsExPacket) Kind() byte { return TransactionsExMsg } + func (*GetBlockHeadersPacket) Name() string { return "GetBlockHeaders" } func (*GetBlockHeadersPacket) Kind() byte { return GetBlockHeadersMsg } @@ -373,3 +416,15 @@ func (*GetPooledTransactionsPacket) Kind() byte { return GetPooledTransactions func (*PooledTransactionsPacket) Name() string { return "PooledTransactions" } func (*PooledTransactionsPacket) Kind() byte { return PooledTransactionsMsg } + +func (*GetStatusExPacket) Name() string { return "GetStatusEx" } +func (*GetStatusExPacket) Kind() byte { return GetStatusExMsg } + +func (*StatusExPacket) Name() string { return "StatusEx" } +func (*StatusExPacket) Kind() byte { return StatusExMsg } + +func (*EtcdAddMemberPacket) Name() string { return "EtcdAddMember" } +func (*EtcdAddMemberPacket) Kind() byte { return EtcdAddMemberMsg } + +func (*EtcdClusterPacket) Name() string { return "EtcdCluster" } +func (*EtcdClusterPacket) Kind() byte { return EtcdClusterMsg }