Skip to content

Commit ba9c593

Browse files
authored
performance: agreement state serialization using msgp (#4644)
1 parent fb1f498 commit ba9c593

40 files changed

+11177
-3405
lines changed

agreement/actions.go

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@ import (
2626
)
2727

2828
//go:generate stringer -type=actionType
29-
//msgp:ignore actionType
30-
type actionType int
29+
type actionType uint8
3130

3231
const (
3332
noop actionType = iota
@@ -103,7 +102,7 @@ type networkAction struct {
103102

104103
UnauthenticatedVotes []unauthenticatedVote
105104

106-
Err serializableError
105+
Err *serializableError
107106
}
108107

109108
func (a networkAction) t() actionType {
@@ -181,7 +180,7 @@ type cryptoAction struct {
181180
Period period
182181
Step step
183182
Pinned bool
184-
TaskIndex int
183+
TaskIndex uint64
185184
}
186185

187186
func (a cryptoAction) t() actionType {
@@ -388,7 +387,7 @@ func (a pseudonodeAction) do(ctx context.Context, s *Service) {
388387
case nil:
389388
// no error.
390389
persistCompleteEvents := s.persistState(persistStateDone)
391-
// we want to place there two one after the other. That way, the second would not get executed up until the first one is complete.
390+
// we want to place these two one after the other. That way, the second would not get executed up until the first one is complete.
392391
s.demux.prioritize(persistCompleteEvents)
393392
s.demux.prioritize(voteEvents)
394393
default:
@@ -403,12 +402,12 @@ func (a pseudonodeAction) do(ctx context.Context, s *Service) {
403402
}
404403
}
405404

406-
func ignoreAction(e messageEvent, err serializableError) action {
407-
return networkAction{T: ignore, Err: err, h: e.Input.MessageHandle}
405+
func ignoreAction(e messageEvent, err *serializableError) action {
406+
return networkAction{T: ignore, Err: err, h: e.Input.messageHandle}
408407
}
409408

410-
func disconnectAction(e messageEvent, err serializableError) action {
411-
return networkAction{T: disconnect, Err: err, h: e.Input.MessageHandle}
409+
func disconnectAction(e messageEvent, err *serializableError) action {
410+
return networkAction{T: disconnect, Err: err, h: e.Input.messageHandle}
412411
}
413412

414413
func broadcastAction(tag protocol.Tag, o interface{}) action {
@@ -427,7 +426,7 @@ func broadcastAction(tag protocol.Tag, o interface{}) action {
427426
}
428427

429428
func relayAction(e messageEvent, tag protocol.Tag, o interface{}) action {
430-
a := networkAction{T: relay, h: e.Input.MessageHandle, Tag: tag}
429+
a := networkAction{T: relay, h: e.Input.messageHandle, Tag: tag}
431430
// TODO would be good to have compiler check this (and related) type switch
432431
// by specializing one method per type
433432
switch tag {
@@ -441,7 +440,7 @@ func relayAction(e messageEvent, tag protocol.Tag, o interface{}) action {
441440
return a
442441
}
443442

444-
func verifyVoteAction(e messageEvent, r round, p period, taskIndex int) action {
443+
func verifyVoteAction(e messageEvent, r round, p period, taskIndex uint64) action {
445444
return cryptoAction{T: verifyVote, M: e.Input, Round: r, Period: p, TaskIndex: taskIndex}
446445
}
447446

@@ -479,7 +478,7 @@ type checkpointAction struct {
479478
Round round
480479
Period period
481480
Step step
482-
Err serializableError
481+
Err *serializableError
483482
done chan error // an output channel to let the pseudonode that we're done processing. We don't want to serialize that, since it's not needed in recovery/autopsy
484483
}
485484

agreement/actiontype_string.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

agreement/asyncVoteVerifier.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ type asyncVerifyVoteRequest struct {
2929
l LedgerReader
3030
uv *unauthenticatedVote
3131
uev *unauthenticatedEquivocationVote
32-
index int
32+
index uint64
3333
message message
3434

3535
// a channel that holds the response
@@ -39,7 +39,7 @@ type asyncVerifyVoteRequest struct {
3939
type asyncVerifyVoteResponse struct {
4040
v vote
4141
ev equivocationVote
42-
index int
42+
index uint64
4343
message message
4444
err error
4545
cancelled bool
@@ -131,7 +131,7 @@ func (avv *AsyncVoteVerifier) executeEqVoteVerification(task interface{}) interf
131131
}
132132
}
133133

134-
func (avv *AsyncVoteVerifier) verifyVote(verctx context.Context, l LedgerReader, uv unauthenticatedVote, index int, message message, out chan<- asyncVerifyVoteResponse) error {
134+
func (avv *AsyncVoteVerifier) verifyVote(verctx context.Context, l LedgerReader, uv unauthenticatedVote, index uint64, message message, out chan<- asyncVerifyVoteResponse) error {
135135
select {
136136
case <-avv.ctx.Done(): // if we're quitting, don't enqueue the request
137137
// case <-verctx.Done(): DO NOT DO THIS! otherwise we will lose the vote (and forget to clean up)!
@@ -151,7 +151,7 @@ func (avv *AsyncVoteVerifier) verifyVote(verctx context.Context, l LedgerReader,
151151
return nil
152152
}
153153

154-
func (avv *AsyncVoteVerifier) verifyEqVote(verctx context.Context, l LedgerReader, uev unauthenticatedEquivocationVote, index int, message message, out chan<- asyncVerifyVoteResponse) error {
154+
func (avv *AsyncVoteVerifier) verifyEqVote(verctx context.Context, l LedgerReader, uev unauthenticatedEquivocationVote, index uint64, message message, out chan<- asyncVerifyVoteResponse) error {
155155
select {
156156
case <-avv.ctx.Done(): // if we're quitting, don't enqueue the request
157157
// case <-verctx.Done(): DO NOT DO THIS! otherwise we will lose the vote (and forget to clean up)!

agreement/bundle.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,8 @@ func (b unauthenticatedBundle) verifyAsync(ctx context.Context, l LedgerReader,
202202

203203
rv := rawVote{Sender: auth.Sender, Round: b.Round, Period: b.Period, Step: b.Step, Proposal: b.Proposal}
204204
uv := unauthenticatedVote{R: rv, Cred: auth.Cred, Sig: auth.Sig}
205-
avv.verifyVote(ctx, l, uv, i, message{}, results)
205+
206+
avv.verifyVote(ctx, l, uv, uint64(i), message{}, results) //nolint:errcheck // verifyVote will call EnqueueBacklog, which blocks until the verify task is queued, or returns an error when ctx.Done(), which we are already checking
206207
}
207208

208209
// create verification requests for equivocation votes
@@ -222,7 +223,8 @@ func (b unauthenticatedBundle) verifyAsync(ctx context.Context, l LedgerReader,
222223
Proposals: auth.Proposals,
223224
Sigs: auth.Sigs,
224225
}
225-
avv.verifyEqVote(ctx, l, uev, i, message{}, results)
226+
avv.verifyEqVote(ctx, l, uev, uint64(i), message{}, results) //nolint:errcheck // verifyVote will call EnqueueBacklog, which blocks until the verify task is queued, or returns an error when ctx.Done(), which we are already checking
227+
226228
}
227229

228230
return func() (bundle, error) {

agreement/cryptoVerifier.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -82,37 +82,41 @@ type (
8282
Quit()
8383
}
8484

85+
//msgp:ignore cryptoVoteRequest
8586
cryptoVoteRequest struct {
8687
message // the message we would like to verify.
87-
TaskIndex int // Caller specific number that would be passed back in the asyncVerifyVoteResponse.TaskIndex field
88+
TaskIndex uint64 // Caller specific number that would be passed back in the asyncVerifyVoteResponse.TaskIndex field
8889
Round round // The round that we're going to test against.
8990
Period period // The period associated with the message we're going to test.
9091
ctx context.Context // A context for this request, if the context is cancelled then the request is stale.
9192
}
9293

94+
//msgp:ignore cryptoProposalRequest
9395
cryptoProposalRequest struct {
9496
message // the message we would like to verify.
95-
TaskIndex int // Caller specific number that would be passed back in the cryptoResult.TaskIndex field
97+
TaskIndex uint64 // Caller specific number that would be passed back in the cryptoResult.TaskIndex field
9698
Round round // The round that we're going to test against.
9799
Period period // The period associated with the message we're going to test.
98100
Pinned bool // A flag that is set if this is a pinned value for the given round.
99101
ctx context.Context // A context for this request, if the context is cancelled then the request is stale.
100102
}
101103

104+
//msgp:ignore cryptoBundleRequest
102105
cryptoBundleRequest struct {
103106
message // the message we would like to verify.
104-
TaskIndex int // Caller specific number that would be passed back in the asyncVerifyVoteResponse.TaskIndex field
107+
TaskIndex uint64 // Caller specific number that would be passed back in the asyncVerifyVoteResponse.TaskIndex field
105108
Round round // The round that we're going to test against.
106109
Period period // The period associated with the message we're going to test.
107110
Certify bool // A flag that set if this is a cert bundle.
108111
ctx context.Context // A context for this request, if the context is cancelled then the request is stale.
109112
}
110113

114+
//msgp:ignore cryptoResult
111115
cryptoResult struct {
112116
message
113-
Err serializableError
114-
TaskIndex int // the TaskIndex that was passed to the cryptoVerifier during the Verify call on the cryptoRequest.TaskIndex
115-
Cancelled bool // whether the corresponding request was cancelled before verification completed
117+
Err *serializableError
118+
TaskIndex uint64 // the TaskIndex that was passed to the cryptoVerifier during the Verify call on the cryptoRequest.TaskIndex
119+
Cancelled bool // whether the corresponding request was cancelled before verification completed
116120
}
117121

118122
// A poolCryptoVerifier uses asynchronous goroutines to implement cryptoVerifier.
@@ -146,9 +150,10 @@ type (
146150
out chan cryptoResult
147151
}
148152

153+
//msgp:ignore bundleFuture
149154
bundleFuture struct {
150155
message
151-
index int
156+
index uint64
152157
wait func() (bundle, error)
153158
ctx context.Context
154159
}

agreement/cryptoVerifier_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func makeMessage(msgHandle int, tag protocol.Tag, sender basics.Address, l Ledge
9393
}
9494

9595
return message{
96-
MessageHandle: MessageHandle(msgHandle),
96+
messageHandle: MessageHandle(msgHandle),
9797
Tag: tag,
9898
UnauthenticatedVote: makeUnauthenticatedVote(l, sender, selection, voting, Round, Period, Step, proposal),
9999
}
@@ -103,13 +103,13 @@ func makeMessage(msgHandle int, tag protocol.Tag, sender basics.Address, l Ledge
103103
Block: e,
104104
}
105105
return message{
106-
MessageHandle: MessageHandle(msgHandle),
106+
messageHandle: MessageHandle(msgHandle),
107107
Tag: tag,
108108
UnauthenticatedProposal: payload,
109109
}
110110
default: // protocol.VoteBundleTag
111111
return message{
112-
MessageHandle: MessageHandle(msgHandle),
112+
messageHandle: MessageHandle(msgHandle),
113113
Tag: tag,
114114
UnauthenticatedBundle: unauthenticatedBundle{
115115
Round: Round,
@@ -180,9 +180,9 @@ func TestCryptoVerifierBuffers(t *testing.T) {
180180
for _, msgType := range msgTypes {
181181
for i := getSelectorCapacity(msgType) * 5; i > 0; i-- {
182182
msg := <-verifier.Verified(msgType)
183-
_, has := usedMsgIDs[msg.MessageHandle]
183+
_, has := usedMsgIDs[msg.messageHandle]
184184
assert.True(t, has)
185-
delete(usedMsgIDs, msg.MessageHandle)
185+
delete(usedMsgIDs, msg.messageHandle)
186186
}
187187
assert.False(t, verifier.ChannelFull(msgType))
188188
assert.Zero(t, len(verifier.Verified(msgType)))
@@ -230,8 +230,8 @@ func TestCryptoVerifierBuffers(t *testing.T) {
230230
}
231231
msgIDMutex.Lock()
232232
defer msgIDMutex.Unlock()
233-
_, has := usedMsgIDs[msg.MessageHandle]
234-
delete(usedMsgIDs, msg.MessageHandle)
233+
_, has := usedMsgIDs[msg.messageHandle]
234+
delete(usedMsgIDs, msg.messageHandle)
235235
return assert.True(t, has)
236236
}
237237

@@ -333,7 +333,7 @@ func BenchmarkCryptoVerifierProposalVertification(b *testing.B) {
333333
c := verifier.Verified(protocol.ProposalPayloadTag)
334334
request := cryptoProposalRequest{
335335
message: message{
336-
MessageHandle: MessageHandle(0),
336+
messageHandle: MessageHandle(0),
337337
Tag: protocol.ProposalPayloadTag,
338338
UnauthenticatedProposal: proposals[0].unauthenticatedProposal,
339339
},
@@ -402,11 +402,11 @@ func TestCryptoVerifierVerificationFailures(t *testing.T) {
402402
cryptoVerifier := makeCryptoVerifier(nil, nil, voteVerifier, logging.TestingLog(t))
403403
defer cryptoVerifier.Quit()
404404

405-
cryptoVerifier.VerifyVote(context.Background(), cryptoVoteRequest{message: message{Tag: protocol.AgreementVoteTag}, Round: basics.Round(8), TaskIndex: 14})
405+
cryptoVerifier.VerifyVote(context.Background(), cryptoVoteRequest{message: message{Tag: protocol.AgreementVoteTag}, Round: basics.Round(8), TaskIndex: uint64(14)})
406406
// read the failed response from VerifiedVotes:
407407
votesout := cryptoVerifier.VerifiedVotes()
408408
voteResponse := <-votesout
409409
require.Equal(t, context.Canceled, voteResponse.err)
410410
require.True(t, voteResponse.cancelled)
411-
require.Equal(t, 14, voteResponse.index)
411+
require.Equal(t, uint64(14), voteResponse.index)
412412
}

agreement/demux.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,11 +140,11 @@ func (d *demux) tokenizeMessages(ctx context.Context, net Network, tag protocol.
140140
var msg message
141141
switch tag {
142142
case protocol.AgreementVoteTag:
143-
msg = message{MessageHandle: raw.MessageHandle, Tag: tag, UnauthenticatedVote: o.(unauthenticatedVote)}
143+
msg = message{messageHandle: raw.MessageHandle, Tag: tag, UnauthenticatedVote: o.(unauthenticatedVote)}
144144
case protocol.VoteBundleTag:
145-
msg = message{MessageHandle: raw.MessageHandle, Tag: tag, UnauthenticatedBundle: o.(unauthenticatedBundle)}
145+
msg = message{messageHandle: raw.MessageHandle, Tag: tag, UnauthenticatedBundle: o.(unauthenticatedBundle)}
146146
case protocol.ProposalPayloadTag:
147-
msg = message{MessageHandle: raw.MessageHandle, Tag: tag, CompoundMessage: o.(compoundMessage)}
147+
msg = message{messageHandle: raw.MessageHandle, Tag: tag, CompoundMessage: o.(compoundMessage)}
148148
default:
149149
err := fmt.Errorf("bad message tag: %v", tag)
150150
d.UpdateEventsQueue(fmt.Sprintf("Tokenizing-%s", tag), 0)
@@ -167,7 +167,7 @@ func (d *demux) tokenizeMessages(ctx context.Context, net Network, tag protocol.
167167
}
168168

169169
// verifyVote enqueues a vote message to be verified.
170-
func (d *demux) verifyVote(ctx context.Context, m message, taskIndex int, r round, p period) {
170+
func (d *demux) verifyVote(ctx context.Context, m message, taskIndex uint64, r round, p period) {
171171
d.UpdateEventsQueue(eventQueueCryptoVerifierVote, 1)
172172
d.monitor.inc(cryptoVerifierCoserviceType)
173173
d.crypto.VerifyVote(ctx, cryptoVoteRequest{message: m, TaskIndex: taskIndex, Round: r, Period: p})
@@ -367,7 +367,7 @@ func setupCompoundMessage(l LedgerReader, m message) (res externalEvent) {
367367
return
368368
}
369369

370-
tailmsg := message{MessageHandle: m.MessageHandle, Tag: protocol.ProposalPayloadTag, UnauthenticatedProposal: compound.Proposal}
370+
tailmsg := message{messageHandle: m.messageHandle, Tag: protocol.ProposalPayloadTag, UnauthenticatedProposal: compound.Proposal}
371371
synthetic := messageEvent{T: payloadPresent, Input: tailmsg}
372372
proto, err := l.ConsensusVersion(ParamsRound(synthetic.ConsensusRound()))
373373
synthetic = synthetic.AttachConsensusVersion(ConsensusVersionView{Err: makeSerErr(err), Version: proto}).(messageEvent)

agreement/errors.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,38 +16,39 @@
1616

1717
package agreement
1818

19-
import "fmt"
19+
import (
20+
"fmt"
21+
)
2022

2123
// serializableError, or state machine error, is a serializable error that
2224
// is correctly written to cadaver files.
23-
type serializableErrorUnderlying string
24-
type serializableError = *serializableErrorUnderlying
25+
type serializableError string
2526

2627
// implement error interface
27-
func (e serializableErrorUnderlying) Error() string {
28+
func (e serializableError) Error() string {
2829
return string(e)
2930
}
3031

31-
func (e serializableErrorUnderlying) String() string {
32+
func (e serializableError) String() string {
3233
return e.Error()
3334
}
3435

3536
// makeSerErrStr returns an serializableError that formats as the given text.
36-
func makeSerErrStr(text string) serializableError {
37-
s := serializableErrorUnderlying(text)
37+
func makeSerErrStr(text string) *serializableError {
38+
s := serializableError(text)
3839
return &s
3940
}
4041

41-
func makeSerErrf(format string, a ...interface{}) serializableError {
42-
s := serializableErrorUnderlying(fmt.Sprintf(format, a...))
42+
func makeSerErrf(format string, a ...interface{}) *serializableError {
43+
s := serializableError(fmt.Sprintf(format, a...))
4344
return &s
4445
}
4546

4647
// makeSerErr returns an serializableError that formats as the given error.
47-
func makeSerErr(err error) serializableError {
48+
func makeSerErr(err error) *serializableError {
4849
if err == nil {
4950
return nil
5051
}
51-
s := serializableErrorUnderlying(err.Error())
52+
s := serializableError(err.Error())
5253
return &s
5354
}

0 commit comments

Comments
 (0)