Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions agreement/asyncVoteVerifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (avv *AsyncVoteVerifier) executeEqVoteVerification(task interface{}) interf
}
}

func (avv *AsyncVoteVerifier) verifyVote(verctx context.Context, l LedgerReader, uv unauthenticatedVote, index int, message message, out chan<- asyncVerifyVoteResponse) {
func (avv *AsyncVoteVerifier) verifyVote(verctx context.Context, l LedgerReader, uv unauthenticatedVote, index int, message message, out chan<- asyncVerifyVoteResponse) error {
select {
case <-avv.ctx.Done(): // if we're quitting, don't enqueue the request
// case <-verctx.Done(): DO NOT DO THIS! otherwise we will lose the vote (and forget to clean up)!
Expand All @@ -140,16 +140,18 @@ func (avv *AsyncVoteVerifier) verifyVote(verctx context.Context, l LedgerReader,
// if we're done while waiting for room in the requests channel, don't queue the request
req := asyncVerifyVoteRequest{ctx: verctx, l: l, uv: &uv, index: index, message: message, out: out}
avv.wg.Add(1)
if avv.backlogExecPool.EnqueueBacklog(avv.ctx, avv.executeVoteVerification, req, avv.execpoolOut) != nil {
if err := avv.backlogExecPool.EnqueueBacklog(avv.ctx, avv.executeVoteVerification, req, avv.execpoolOut); err != nil {
// we want to call "wg.Done()" here to "fix" the accounting of the number of pending tasks.
// if we got a non-nil, it means that our context has expired, which means that we won't see this task
// getting to the verification function.
avv.wg.Done()
return err
}
}
return nil
}

func (avv *AsyncVoteVerifier) verifyEqVote(verctx context.Context, l LedgerReader, uev unauthenticatedEquivocationVote, index int, message message, out chan<- asyncVerifyVoteResponse) {
func (avv *AsyncVoteVerifier) verifyEqVote(verctx context.Context, l LedgerReader, uev unauthenticatedEquivocationVote, index int, message message, out chan<- asyncVerifyVoteResponse) error {
select {
case <-avv.ctx.Done(): // if we're quitting, don't enqueue the request
// case <-verctx.Done(): DO NOT DO THIS! otherwise we will lose the vote (and forget to clean up)!
Expand All @@ -158,13 +160,15 @@ func (avv *AsyncVoteVerifier) verifyEqVote(verctx context.Context, l LedgerReade
// if we're done while waiting for room in the requests channel, don't queue the request
req := asyncVerifyVoteRequest{ctx: verctx, l: l, uev: &uev, index: index, message: message, out: out}
avv.wg.Add(1)
if avv.backlogExecPool.EnqueueBacklog(avv.ctx, avv.executeEqVoteVerification, req, avv.execpoolOut) != nil {
if err := avv.backlogExecPool.EnqueueBacklog(avv.ctx, avv.executeEqVoteVerification, req, avv.execpoolOut); err != nil {
// we want to call "wg.Done()" here to "fix" the accounting of the number of pending tasks.
// if we got a non-nil, it means that our context has expired, which means that we won't see this task
// getting to the verification function.
avv.wg.Done()
return err
}
}
return nil
}

// Quit tells the AsyncVoteVerifier to shutdown and waits until all workers terminate.
Expand Down
9 changes: 8 additions & 1 deletion agreement/cryptoVerifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,14 @@ func (c *poolCryptoVerifier) voteFillWorker(toBundleWait chan<- bundleFuture) {
}

uv := votereq.message.UnauthenticatedVote
c.voteVerifier.verifyVote(votereq.ctx, c.ledger, uv, votereq.TaskIndex, votereq.message, c.votes.out)
err := c.voteVerifier.verifyVote(votereq.ctx, c.ledger, uv, votereq.TaskIndex, votereq.message, c.votes.out)
if err != nil && c.votes.out != nil {
select {
case c.votes.out <- asyncVerifyVoteResponse{index: votereq.TaskIndex, err: err}:
default:
c.log.Infof("poolCryptoVerifier.voteFillWorker unable to write failed enqueue response to output channel")
}
}
case bundlereq, ok := <-bundlesin:
if !ok {
bundlesin = nil
Expand Down
97 changes: 90 additions & 7 deletions agreement/pseudonode.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import (

// TODO put these in config
const (
pseudonodeVerificationBacklog = 32
pseudonodeVerificationBacklog = 32
maxPseudonodeOutputWaitDuration = 2 * time.Second
)

var errPseudonodeBacklogFull = fmt.Errorf("pseudonode input channel is full")
Expand Down Expand Up @@ -367,13 +368,20 @@ func (t pseudonodeVotesTask) execute(verifier *AsyncVoteVerifier, quit chan stru
unverifiedVotes := t.node.makeVotes(t.round, t.period, t.step, t.prop, t.participation)
t.node.log.Infof("pseudonode: made %v votes", len(unverifiedVotes))
results := make(chan asyncVerifyVoteResponse, len(unverifiedVotes))
orderedResults := make([]asyncVerifyVoteResponse, len(unverifiedVotes))
asyncVerifyingVotes := len(unverifiedVotes)
for i, uv := range unverifiedVotes {
msg := message{Tag: protocol.AgreementVoteTag, UnauthenticatedVote: uv}
verifier.verifyVote(context.TODO(), t.node.ledger, uv, i, msg, results)
err := verifier.verifyVote(context.TODO(), t.node.ledger, uv, i, msg, results)
if err != nil {
orderedResults[i].err = err
t.node.log.Infof("pseudonode.makeVotes: failed to enqueue vote verification for (%d, %d): %v", t.round, t.period, err)
asyncVerifyingVotes--
continue
}
}

orderedResults := make([]asyncVerifyVoteResponse, len(unverifiedVotes))
for i := 0; i < len(unverifiedVotes); i++ {
for i := 0; i < asyncVerifyingVotes; i++ {
resp := <-results
orderedResults[resp.index] = resp
}
Expand Down Expand Up @@ -441,14 +449,37 @@ func (t pseudonodeVotesTask) execute(verifier *AsyncVoteVerifier, quit chan stru
t.node.monitor.dec(pseudonodeCoserviceType)

// push results into channel.
var outputTimeout <-chan time.Time
for _, r := range verifiedResults {
select {
case t.out <- messageEvent{T: voteVerified, Input: r.message, Err: makeSerErr(r.err)}:
continue
case <-quit:
return
case <-t.context.Done():
// we done care about the output anymore; just exit.
return
default:
}

if outputTimeout == nil {
outputTimeout = time.After(maxPseudonodeOutputWaitDuration)
}

select {
case t.out <- messageEvent{T: voteVerified, Input: r.message, Err: makeSerErr(r.err)}:
continue
case <-quit:
return
case <-t.context.Done():
// we done care about the output anymore; just exit.
return
case <-outputTimeout:
// we've been waiting for too long for this vote to be written to the output.
t.node.log.Infof("pseudonode.makeVotes: unable to write vote to output channel for round %d, period %d", t.round, t.period)
tmpTimeoutCh := make(chan time.Time)
close(tmpTimeoutCh)
outputTimeout = tmpTimeoutCh
}
}
}
Expand Down Expand Up @@ -477,13 +508,20 @@ func (t pseudonodeProposalsTask) execute(verifier *AsyncVoteVerifier, quit chan
// For now, don't log at all, and revisit when the metric becomes more important.

results := make(chan asyncVerifyVoteResponse, len(votes))
cryptoOutputs := make([]asyncVerifyVoteResponse, len(votes))
asyncVerifyingVotes := len(votes)
for i, uv := range votes {
msg := message{Tag: protocol.AgreementVoteTag, UnauthenticatedVote: uv}
verifier.verifyVote(context.TODO(), t.node.ledger, uv, i, msg, results)
err := verifier.verifyVote(context.TODO(), t.node.ledger, uv, i, msg, results)
if err != nil {
cryptoOutputs[i].err = err
t.node.log.Infof("pseudonode.makeProposals: failed to enqueue vote verification for (%d, %d): %v", t.round, t.period, err)
asyncVerifyingVotes--
continue
}
}

cryptoOutputs := make([]asyncVerifyVoteResponse, len(votes))
for i := 0; i < len(votes); i++ {
for i := 0; i < asyncVerifyingVotes; i++ {
resp := <-results
cryptoOutputs[resp.index] = resp
}
Expand Down Expand Up @@ -527,27 +565,72 @@ func (t pseudonodeProposalsTask) execute(verifier *AsyncVoteVerifier, quit chan
}
t.node.monitor.dec(pseudonodeCoserviceType)

var outputTimeout <-chan time.Time
// push results into channel.
for _, r := range verifiedVotes {
select {
case t.out <- messageEvent{T: voteVerified, Input: r.message, Err: makeSerErr(r.err)}:
continue
case <-quit:
return
case <-t.context.Done():
// we done care about the output anymore; just exit.
return
default:
}
if outputTimeout == nil {
outputTimeout = time.After(maxPseudonodeOutputWaitDuration)
}

// the out channel was full. Place a limit on the time we want to wait for this channel.
select {
case t.out <- messageEvent{T: voteVerified, Input: r.message, Err: makeSerErr(r.err)}:
continue
case <-quit:
return
case <-t.context.Done():
// we done care about the output anymore; just exit.
return
case <-outputTimeout:
// we've been waiting for too long for this vote to be written to the output.
t.node.log.Infof("pseudonode.makeProposals: unable to write proposal vote to output channel for round %d, period %d", t.round, t.period)
tmpTimeoutCh := make(chan time.Time)
close(tmpTimeoutCh)
outputTimeout = tmpTimeoutCh
}
}

for _, payload := range verifiedPayloads {
msg := message{Tag: protocol.ProposalPayloadTag, UnauthenticatedProposal: payload.u(), Proposal: payload}
select {
case t.out <- messageEvent{T: payloadVerified, Input: msg}:
continue
case <-quit:
return
case <-t.context.Done():
// we done care about the output anymore; just exit.
return
default:
}

if outputTimeout == nil {
outputTimeout = time.After(maxPseudonodeOutputWaitDuration)
}
// the out channel was full. Place a limit on the time we want to wait for this channel.
select {
case t.out <- messageEvent{T: payloadVerified, Input: msg}:
continue
case <-quit:
return
case <-t.context.Done():
// we done care about the output anymore; just exit.
return
case <-outputTimeout:
// we've been waiting for too long for this vote to be written to the output.
t.node.log.Infof("pseudonode.makeProposals: unable to write proposal payload to output channel for round %d, period %d", t.round, t.period)
tmpTimeoutCh := make(chan time.Time)
close(tmpTimeoutCh)
outputTimeout = tmpTimeoutCh
}
}
}
2 changes: 1 addition & 1 deletion agreement/pseudonode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func compareEventChannels(t *testing.T, ch1, ch2 <-chan externalEvent) bool {
}
}
default:
assert.NoError(t, fmt.Errorf("Unexpected tag %v encountered", ev1.Input.Tag))
assert.NoError(t, fmt.Errorf("Unexpected tag '%v' encountered", ev1.Input.Tag))
}
}
return true
Expand Down