Skip to content

Commit 355d62e

Browse files
committed
Remove dead code
1 parent 2916083 commit 355d62e

File tree

8 files changed

+75
-137
lines changed

8 files changed

+75
-137
lines changed

components/mocks/mockNetwork.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,8 @@ func (network *MockNetwork) RegisterHandlers(dispatch []network.TaggedMessageHan
9191
func (network *MockNetwork) ClearHandlers() {
9292
}
9393

94-
// RegisterProcessors - empty implementation.
95-
func (network *MockNetwork) RegisterProcessors(dispatch []network.TaggedMessageProcessor) {
94+
// RegisterValidatorHandlers - empty implementation.
95+
func (network *MockNetwork) RegisterValidatorHandlers(dispatch []network.TaggedMessageValidatorHandler) {
9696
}
9797

9898
// ClearProcessors - empty implementation

data/txHandler.go

Lines changed: 19 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -262,16 +262,14 @@ func (handler *TxHandler) Start() {
262262

263263
// libp2p pubsub validator and handler abstracted as TaggedMessageProcessor
264264
// TODO: rename to validators
265-
handler.net.RegisterProcessors([]network.TaggedMessageProcessor{
265+
handler.net.RegisterValidatorHandlers([]network.TaggedMessageValidatorHandler{
266266
{
267267
Tag: protocol.TxnTag,
268268
// create anonymous struct to hold the two functions and satisfy the network.MessageProcessor interface
269269
MessageHandler: struct {
270-
network.ProcessorValidateFunc
271-
network.ProcessorHandleFunc
270+
network.ValidateHandleFunc
272271
}{
273-
network.ProcessorValidateFunc(handler.validateIncomingTxMessage),
274-
network.ProcessorHandleFunc(handler.processIncomingTxMessage),
272+
network.ValidateHandleFunc(handler.validateIncomingTxMessage),
275273
},
276274
},
277275
})
@@ -788,31 +786,24 @@ func (handler *TxHandler) processIncomingTxn(rawmsg network.IncomingMessage) net
788786
return network.OutgoingMessage{Action: network.Ignore}
789787
}
790788

791-
type validatedIncomingTxMessage struct {
792-
rawmsg network.IncomingMessage
793-
unverifiedTxGroup []transactions.SignedTxn
794-
msgKey *crypto.Digest
795-
canonicalKey *crypto.Digest
796-
}
797-
798789
// validateIncomingTxMessage is the validator for the MessageProcessor implementation used by P2PNetwork.
799-
func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessage) network.ValidatedMessage {
790+
func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessage) network.OutgoingMessage {
800791
msgKey, isDup := handler.incomingMsgDupCheck(rawmsg.Data)
801792
if isDup {
802-
return network.ValidatedMessage{Action: network.Ignore, ValidatedMessage: nil}
793+
return network.OutgoingMessage{Action: network.Ignore}
803794
}
804795

805796
unverifiedTxGroup, consumed, invalid := decodeMsg(rawmsg.Data)
806797
if invalid {
807798
// invalid encoding or exceeding txgroup, disconnect from this peer
808-
return network.ValidatedMessage{Action: network.Disconnect, ValidatedMessage: nil}
799+
return network.OutgoingMessage{Action: network.Disconnect}
809800
}
810801

811802
canonicalKey, drop := handler.incomingTxGroupDupRateLimit(unverifiedTxGroup, consumed, rawmsg.Sender)
812803
if drop {
813804
// this re-serialized txgroup was detected as a duplicate by the canonical message cache,
814805
// or it was rate-limited by the per-app rate limiter
815-
return network.ValidatedMessage{Action: network.Ignore, ValidatedMessage: nil}
806+
return network.OutgoingMessage{Action: network.Ignore}
816807
}
817808

818809
// apply backlog worker logic
@@ -827,9 +818,8 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa
827818

828819
if handler.checkAlreadyCommitted(wi) {
829820
transactionMessagesAlreadyCommitted.Inc(nil)
830-
return network.ValidatedMessage{
831-
Action: network.Ignore,
832-
ValidatedMessage: nil,
821+
return network.OutgoingMessage{
822+
Action: network.Ignore,
833823
}
834824
}
835825

@@ -842,9 +832,8 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa
842832
if wi.Err != nil {
843833
handler.postProcessReportErrors(wi.Err)
844834
logging.Base().Warnf("Received a malformed tx group %v: %v", m.unverifiedTxGroup, wi.Err)
845-
return network.ValidatedMessage{
846-
Action: network.Disconnect,
847-
ValidatedMessage: nil,
835+
return network.OutgoingMessage{
836+
Action: network.Disconnect,
848837
}
849838
}
850839
// at this point, we've verified the transaction, so we can safely treat the transaction as a verified transaction.
@@ -855,9 +844,8 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa
855844
if err != nil {
856845
handler.rememberReportErrors(err)
857846
logging.Base().Debugf("could not remember tx: %v", err)
858-
return network.ValidatedMessage{
859-
Action: network.Ignore,
860-
ValidatedMessage: nil,
847+
return network.OutgoingMessage{
848+
Action: network.Ignore,
861849
}
862850
}
863851

@@ -868,32 +856,23 @@ func (handler *TxHandler) validateIncomingTxMessage(rawmsg network.IncomingMessa
868856
if err != nil {
869857
logging.Base().Infof("unable to pin transaction: %v", err)
870858
}
871-
return network.ValidatedMessage{
872-
Action: network.Accept,
873-
ValidatedMessage: nil,
859+
return network.OutgoingMessage{
860+
Action: network.Accept,
874861
}
875862

876863
case <-handler.streamVerifierDropped2:
877864
transactionMessagesDroppedFromBacklog.Inc(nil)
878-
return network.ValidatedMessage{
879-
Action: network.Ignore,
880-
ValidatedMessage: nil,
865+
return network.OutgoingMessage{
866+
Action: network.Ignore,
881867
}
882868
case <-handler.ctx.Done():
883869
transactionMessagesDroppedFromBacklog.Inc(nil)
884-
return network.ValidatedMessage{
885-
Action: network.Ignore,
886-
ValidatedMessage: nil,
870+
return network.OutgoingMessage{
871+
Action: network.Ignore,
887872
}
888873
}
889874
}
890875

891-
// processIncomingTxMessage is the handler for the MessageProcessor implementation used by P2PNetwork.
892-
func (handler *TxHandler) processIncomingTxMessage(validatedMessage network.ValidatedMessage) network.OutgoingMessage {
893-
// process is noop, all work is done in validateIncomingTxMessage above
894-
return network.OutgoingMessage{Action: network.Ignore}
895-
}
896-
897876
var errBackLogFullLocal = errors.New("backlog full")
898877

899878
// LocalTransaction is a special shortcut handler for local transactions and intended to be used

network/gossipNode.go

Lines changed: 15 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,10 @@ type GossipNode interface {
8080
// ClearHandlers deregisters all the existing message handlers.
8181
ClearHandlers()
8282

83-
// RegisterProcessors adds to the set of given message processors.
84-
RegisterProcessors(dispatch []TaggedMessageProcessor)
83+
// RegisterValidatorHandlers adds to the set of given message validation handlers.
84+
// A difference with regular handlers is validation ones perform synchronous validation.
85+
// Currently used as p2p pubsub topic validators.
86+
RegisterValidatorHandlers(dispatch []TaggedMessageValidatorHandler)
8587

8688
// ClearProcessors deregisters all the existing message processors.
8789
ClearProcessors()
@@ -156,14 +158,6 @@ type OutgoingMessage struct {
156158
OnRelease func()
157159
}
158160

159-
// ValidatedMessage is a message that has been validated and is ready to be processed.
160-
// Think as an intermediate one between IncomingMessage and OutgoingMessage
161-
type ValidatedMessage struct {
162-
Action ForwardingPolicy
163-
Tag Tag
164-
ValidatedMessage interface{}
165-
}
166-
167161
// ForwardingPolicy is an enum indicating to whom we should send a message
168162
//
169163
//msgp:ignore ForwardingPolicy
@@ -202,28 +196,19 @@ func (f HandlerFunc) Handle(message IncomingMessage) OutgoingMessage {
202196
return f(message)
203197
}
204198

205-
// MessageProcessor takes a IncomingMessage (e.g., vote, transaction), processes it, and returns what (if anything)
199+
// MessageValidatorHandler takes a IncomingMessage (e.g., vote, transaction), processes it, and returns what (if anything)
206200
// to send to the network in response.
207-
// This is an extension of the MessageHandler that works in two stages: validate ->[result]-> handle.
208-
type MessageProcessor interface {
209-
Validate(message IncomingMessage) ValidatedMessage
210-
Handle(message ValidatedMessage) OutgoingMessage
201+
// it supposed to perform synchronous validation and return the result of the validation
202+
// so that network knows immediately if the message should be be broadcasted or not.
203+
type MessageValidatorHandler interface {
204+
ValidateHandle(message IncomingMessage) OutgoingMessage
211205
}
212206

213-
// ProcessorValidateFunc represents an implementation of the MessageProcessor interface
214-
type ProcessorValidateFunc func(message IncomingMessage) ValidatedMessage
215-
216-
// ProcessorHandleFunc represents an implementation of the MessageProcessor interface
217-
type ProcessorHandleFunc func(message ValidatedMessage) OutgoingMessage
218-
219-
// Validate implements MessageProcessor.Validate, calling the validator with the IncomingMessage and returning the action
220-
// and validation extra data that can be use as the handler input.
221-
func (f ProcessorValidateFunc) Validate(message IncomingMessage) ValidatedMessage {
222-
return f(message)
223-
}
207+
// ValidateHandleFunc represents an implementation of the MessageProcessor interface
208+
type ValidateHandleFunc func(message IncomingMessage) OutgoingMessage
224209

225-
// Handle implements MessageProcessor.Handle calling the handler with the ValidatedMessage and returning the OutgoingMessage
226-
func (f ProcessorHandleFunc) Handle(message ValidatedMessage) OutgoingMessage {
210+
// ValidateHandle implements MessageValidatorHandler.ValidateHandle, calling the validator with the IncomingMessage and returning the action.
211+
func (f ValidateHandleFunc) ValidateHandle(message IncomingMessage) OutgoingMessage {
227212
return f(message)
228213
}
229214

@@ -235,9 +220,9 @@ type taggedMessageDispatcher[T any] struct {
235220
// TaggedMessageHandler receives one type of broadcast messages
236221
type TaggedMessageHandler = taggedMessageDispatcher[MessageHandler]
237222

238-
// TaggedMessageProcessor receives one type of broadcast messages
223+
// TaggedMessageValidatorHandler receives one type of broadcast messages
239224
// and performs two stage processing: validating and handling
240-
type TaggedMessageProcessor = taggedMessageDispatcher[MessageProcessor]
225+
type TaggedMessageValidatorHandler = taggedMessageDispatcher[MessageValidatorHandler]
241226

242227
// Propagate is a convenience function to save typing in the common case of a message handler telling us to propagate an incoming message
243228
// "return network.Propagate(msg)" instead of "return network.OutgoingMsg{network.Broadcast, msg.Tag, msg.Data}"

network/hybridNetwork.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -180,10 +180,10 @@ func (n *HybridP2PNetwork) ClearHandlers() {
180180
n.wsNetwork.ClearHandlers()
181181
}
182182

183-
// RegisterProcessors adds to the set of given message processors.
184-
func (n *HybridP2PNetwork) RegisterProcessors(dispatch []TaggedMessageProcessor) {
185-
n.p2pNetwork.RegisterProcessors(dispatch)
186-
n.wsNetwork.RegisterProcessors(dispatch)
183+
// RegisterValidatorHandlers adds to the set of given message processors.
184+
func (n *HybridP2PNetwork) RegisterValidatorHandlers(dispatch []TaggedMessageValidatorHandler) {
185+
n.p2pNetwork.RegisterValidatorHandlers(dispatch)
186+
n.wsNetwork.RegisterValidatorHandlers(dispatch)
187187
}
188188

189189
// ClearProcessors deregisters all the existing message processors.

network/multiplexer.go

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,8 @@ func (m *Multiplexer) getHandler(tag Tag) (MessageHandler, bool) {
6161
}
6262

6363
// Retrieves the processor for the given message Tag from the processors array.
64-
func (m *Multiplexer) getProcessor(tag Tag) (MessageProcessor, bool) {
65-
return getHandler[MessageProcessor](&m.msgProcessors, tag)
64+
func (m *Multiplexer) getProcessor(tag Tag) (MessageValidatorHandler, bool) {
65+
return getHandler[MessageValidatorHandler](&m.msgProcessors, tag)
6666
}
6767

6868
// Handle is the "input" side of the multiplexer. It dispatches the message to the previously defined handler.
@@ -74,17 +74,9 @@ func (m *Multiplexer) Handle(msg IncomingMessage) OutgoingMessage {
7474
}
7575

7676
// Validate is an alternative "input" side of the multiplexer. It dispatches the message to the previously defined validator.
77-
func (m *Multiplexer) Validate(msg IncomingMessage) ValidatedMessage {
77+
func (m *Multiplexer) Validate(msg IncomingMessage) OutgoingMessage {
7878
if handler, ok := m.getProcessor(msg.Tag); ok {
79-
return handler.Validate(msg)
80-
}
81-
return ValidatedMessage{}
82-
}
83-
84-
// Process is the second step of message handling after validation. It dispatches the message to the previously defined processor.
85-
func (m *Multiplexer) Process(msg ValidatedMessage) OutgoingMessage {
86-
if handler, ok := m.getProcessor(msg.Tag); ok {
87-
return handler.Handle(msg)
79+
return handler.ValidateHandle(msg)
8880
}
8981
return OutgoingMessage{}
9082
}
@@ -110,8 +102,8 @@ func (m *Multiplexer) RegisterHandlers(dispatch []TaggedMessageHandler) {
110102
registerMultiplexer(&m.msgHandlers, dispatch)
111103
}
112104

113-
// RegisterProcessors registers the set of given message handlers.
114-
func (m *Multiplexer) RegisterProcessors(dispatch []TaggedMessageProcessor) {
105+
// RegisterValidatorHandlers registers the set of given message handlers.
106+
func (m *Multiplexer) RegisterValidatorHandlers(dispatch []TaggedMessageValidatorHandler) {
115107
registerMultiplexer(&m.msgProcessors, dispatch)
116108
}
117109

@@ -145,5 +137,5 @@ func (m *Multiplexer) ClearHandlers(excludeTags []Tag) {
145137

146138
// ClearProcessors deregisters all the existing message handlers other than the one provided in the excludeTags list
147139
func (m *Multiplexer) ClearProcessors(excludeTags []Tag) {
148-
clearMultiplexer[MessageProcessor](&m.msgProcessors, excludeTags)
140+
clearMultiplexer[MessageValidatorHandler](&m.msgProcessors, excludeTags)
149141
}

network/p2pNetwork.go

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -685,9 +685,9 @@ func (n *P2PNetwork) ClearHandlers() {
685685
n.handler.ClearHandlers([]Tag{})
686686
}
687687

688-
// RegisterProcessors adds to the set of given message handlers.
689-
func (n *P2PNetwork) RegisterProcessors(dispatch []TaggedMessageProcessor) {
690-
n.handler.RegisterProcessors(dispatch)
688+
// RegisterValidatorHandlers adds to the set of given message handlers.
689+
func (n *P2PNetwork) RegisterValidatorHandlers(dispatch []TaggedMessageValidatorHandler) {
690+
n.handler.RegisterValidatorHandlers(dispatch)
691691
}
692692

693693
// ClearProcessors deregisters all the existing message handlers.
@@ -880,7 +880,8 @@ func (n *P2PNetwork) txTopicHandleLoop() {
880880
n.log.Debugf("Subscribed to topic %s", p2p.TXTopicName)
881881

882882
for {
883-
msg, err := sub.Next(n.ctx)
883+
// msg from sub.Next not used since all work done by txTopicValidator
884+
_, err := sub.Next(n.ctx)
884885
if err != nil {
885886
if err != pubsub.ErrSubscriptionCancelled && err != context.Canceled {
886887
n.log.Errorf("Error reading from subscription %v, peerId %s", err, n.service.ID())
@@ -889,13 +890,6 @@ func (n *P2PNetwork) txTopicHandleLoop() {
889890
sub.Cancel()
890891
return
891892
}
892-
// if there is a self-sent the message no need to process it.
893-
if msg.ReceivedFrom == n.service.ID() {
894-
continue
895-
}
896-
897-
_ = n.handler.Process(msg.ValidatorData.(ValidatedMessage))
898-
899893
// participation or configuration change, cancel subscription and quit
900894
if !n.wantTXGossip.Load() {
901895
n.log.Debugf("Cancelling subscription to topic %s due participation change", p2p.TXTopicName)

0 commit comments

Comments
 (0)