Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions agreement/asyncVoteVerifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (avv *AsyncVoteVerifier) executeVoteVerification(task interface{}) interfac
select {
case <-req.ctx.Done():
// request cancelled, return an error response on the channel
return &asyncVerifyVoteResponse{err: req.ctx.Err(), cancelled: true, req: &req}
return &asyncVerifyVoteResponse{err: req.ctx.Err(), cancelled: true, req: &req, index: req.index}
default:
// request was not cancelled, so we verify it here and return the result on the channel
v, err := req.uv.verify(req.l)
Expand All @@ -119,7 +119,7 @@ func (avv *AsyncVoteVerifier) executeEqVoteVerification(task interface{}) interf
select {
case <-req.ctx.Done():
// request cancelled, return an error response on the channel
return &asyncVerifyVoteResponse{err: req.ctx.Err(), cancelled: true, req: &req}
return &asyncVerifyVoteResponse{err: req.ctx.Err(), cancelled: true, req: &req, index: req.index}
default:
// request was not cancelled, so we verify it here and return the result on the channel
ev, err := req.uev.verify(req.l)
Expand All @@ -131,7 +131,7 @@ func (avv *AsyncVoteVerifier) executeEqVoteVerification(task interface{}) interf
}
}

func (avv *AsyncVoteVerifier) verifyVote(verctx context.Context, l LedgerReader, uv unauthenticatedVote, index int, message message, out chan<- asyncVerifyVoteResponse) {
func (avv *AsyncVoteVerifier) verifyVote(verctx context.Context, l LedgerReader, uv unauthenticatedVote, index int, message message, out chan<- asyncVerifyVoteResponse) error {
select {
case <-avv.ctx.Done(): // if we're quitting, don't enqueue the request
// case <-verctx.Done(): DO NOT DO THIS! otherwise we will lose the vote (and forget to clean up)!
Expand All @@ -140,16 +140,18 @@ func (avv *AsyncVoteVerifier) verifyVote(verctx context.Context, l LedgerReader,
// if we're done while waiting for room in the requests channel, don't queue the request
req := asyncVerifyVoteRequest{ctx: verctx, l: l, uv: &uv, index: index, message: message, out: out}
avv.wg.Add(1)
if avv.backlogExecPool.EnqueueBacklog(avv.ctx, avv.executeVoteVerification, req, avv.execpoolOut) != nil {
if err := avv.backlogExecPool.EnqueueBacklog(avv.ctx, avv.executeVoteVerification, req, avv.execpoolOut); err != nil {
Copy link
Contributor

@cce cce Nov 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interestingly, inside of EnqueueBacklog it is also doing a select on case <-avv.ctx.Done() (passed in as enqueueCtx, alongside a select for backlog.ctx.Done()) which will cause EnqueueBacklog to return ctx.Err(). If you have two pending selects like that on the stack, is it deterministic which one will fire first?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is this comment warning "instead, enqueue so the worker will set the error value and return the cancelled vote properly." However if the waiting inside EnqueueBacklog() is interrupted by one of the Done()s it's waiting on, it will never reach the worker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that we have two pending selects isn't an issue. The one in verifyVote would be evaluated when we get to that point, and if it's not canceled yet, it would go to the default statement, executing EnqueueBacklog, where it would also block on the same channel.

As for the comment, I think it's a bug - I think that the intent was to have a fallthrough here. But I would leave that to a separate PR, as it's not really related to this change. ( i.e. in out case, we're passing a TODO context, which would never expire and therefore this statement would never be executed )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are other callers (unauthenticatedBundle.verify and cryptoVerifier.voteFillWorker) passing a valid context into verifyVote — just not pseudonode

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I'm aware of that - that's why I did not attempted to change this code ;-)

// we want to call "wg.Done()" here to "fix" the accounting of the number of pending tasks.
// if we got a non-nil, it means that our context has expired, which means that we won't see this task
// getting to the verification function.
avv.wg.Done()
return err
}
}
return nil
}

func (avv *AsyncVoteVerifier) verifyEqVote(verctx context.Context, l LedgerReader, uev unauthenticatedEquivocationVote, index int, message message, out chan<- asyncVerifyVoteResponse) {
func (avv *AsyncVoteVerifier) verifyEqVote(verctx context.Context, l LedgerReader, uev unauthenticatedEquivocationVote, index int, message message, out chan<- asyncVerifyVoteResponse) error {
select {
case <-avv.ctx.Done(): // if we're quitting, don't enqueue the request
// case <-verctx.Done(): DO NOT DO THIS! otherwise we will lose the vote (and forget to clean up)!
Expand All @@ -158,13 +160,15 @@ func (avv *AsyncVoteVerifier) verifyEqVote(verctx context.Context, l LedgerReade
// if we're done while waiting for room in the requests channel, don't queue the request
req := asyncVerifyVoteRequest{ctx: verctx, l: l, uev: &uev, index: index, message: message, out: out}
avv.wg.Add(1)
if avv.backlogExecPool.EnqueueBacklog(avv.ctx, avv.executeEqVoteVerification, req, avv.execpoolOut) != nil {
if err := avv.backlogExecPool.EnqueueBacklog(avv.ctx, avv.executeEqVoteVerification, req, avv.execpoolOut); err != nil {
// we want to call "wg.Done()" here to "fix" the accounting of the number of pending tasks.
// if we got a non-nil, it means that our context has expired, which means that we won't see this task
// getting to the verification function.
avv.wg.Done()
return err
}
}
return nil
}

// Quit tells the AsyncVoteVerifier to shutdown and waits until all workers terminate.
Expand Down
50 changes: 50 additions & 0 deletions agreement/asyncVoteVerifier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (C) 2019-2021 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package agreement

import (
"context"
"testing"

"github.com/stretchr/testify/require"

"github.com/algorand/go-algorand/test/partitiontest"
"github.com/algorand/go-algorand/util/execpool"
)

type expiredExecPool struct {
execpool.ExecutionPool
}

func (fp *expiredExecPool) EnqueueBacklog(enqueueCtx context.Context, t execpool.ExecFunc, arg interface{}, out chan interface{}) error {
// generate an error, to see if we correctly report that on the verifyVote() call.
return context.Canceled
}

// Test async vote verifier against a full execution pool.
func TestVerificationAgainstFullExecutionPool(t *testing.T) {
partitiontest.PartitionTest(t)
mainPool := execpool.MakePool(t)
defer mainPool.Shutdown()

voteVerifier := MakeAsyncVoteVerifier(&expiredExecPool{mainPool})
defer voteVerifier.Quit()
verifyErr := voteVerifier.verifyVote(context.Background(), nil, unauthenticatedVote{}, 0, message{}, make(chan<- asyncVerifyVoteResponse, 1))
require.Error(t, context.Canceled, verifyErr)
verifyEqVoteErr := voteVerifier.verifyEqVote(context.Background(), nil, unauthenticatedEquivocationVote{}, 0, message{}, make(chan<- asyncVerifyVoteResponse, 1))
require.Error(t, context.Canceled, verifyEqVoteErr)
}
9 changes: 8 additions & 1 deletion agreement/cryptoVerifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,14 @@ func (c *poolCryptoVerifier) voteFillWorker(toBundleWait chan<- bundleFuture) {
}

uv := votereq.message.UnauthenticatedVote
c.voteVerifier.verifyVote(votereq.ctx, c.ledger, uv, votereq.TaskIndex, votereq.message, c.votes.out)
err := c.voteVerifier.verifyVote(votereq.ctx, c.ledger, uv, votereq.TaskIndex, votereq.message, c.votes.out)
if err != nil && c.votes.out != nil {
select {
case c.votes.out <- asyncVerifyVoteResponse{index: votereq.TaskIndex, err: err, cancelled: true}:
default:
c.log.Infof("poolCryptoVerifier.voteFillWorker unable to write failed enqueue response to output channel")
}
}
case bundlereq, ok := <-bundlesin:
if !ok {
bundlesin = nil
Expand Down
25 changes: 25 additions & 0 deletions agreement/cryptoVerifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/algorand/go-deadlock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/crypto"
Expand All @@ -35,6 +36,7 @@ import (
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/test/partitiontest"
"github.com/algorand/go-algorand/util/execpool"
)

var _ = fmt.Printf
Expand Down Expand Up @@ -385,3 +387,26 @@ func BenchmarkCryptoVerifierBundleVertification(b *testing.B) {
<-c
}
}

// TestCryptoVerifierVerificationFailures tests to see that the cryptoVerifier.VerifyVote returns an error in the vote response
// when being unable to enqueue a vote.
func TestCryptoVerifierVerificationFailures(t *testing.T) {
partitiontest.PartitionTest(t)

mainPool := execpool.MakePool(t)
defer mainPool.Shutdown()

voteVerifier := MakeAsyncVoteVerifier(&expiredExecPool{mainPool})
defer voteVerifier.Quit()

cryptoVerifier := makeCryptoVerifier(nil, nil, voteVerifier, logging.TestingLog(t))
defer cryptoVerifier.Quit()

cryptoVerifier.VerifyVote(context.Background(), cryptoVoteRequest{message: message{Tag: protocol.AgreementVoteTag}, Round: basics.Round(8), TaskIndex: 14})
// read the failed response from VerifiedVotes:
votesout := cryptoVerifier.VerifiedVotes()
voteResponse := <-votesout
require.Equal(t, context.Canceled, voteResponse.err)
require.True(t, voteResponse.cancelled)
require.Equal(t, 14, voteResponse.index)
}
114 changes: 79 additions & 35 deletions agreement/pseudonode.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package agreement

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/algorand/go-algorand/data/account"
"github.com/algorand/go-algorand/data/basics"
Expand All @@ -31,13 +33,14 @@ import (

// TODO put these in config
const (
pseudonodeVerificationBacklog = 32
pseudonodeVerificationBacklog = 32
maxPseudonodeOutputWaitDuration = 2 * time.Second
)

var errPseudonodeBacklogFull = fmt.Errorf("pseudonode input channel is full")
var errPseudonodeVerifierClosedChannel = fmt.Errorf("crypto verifier closed the output channel prematurely")
var errPseudonodeNoVotes = fmt.Errorf("no valid participation keys to generate votes for given round")
var errPseudonodeNoProposals = fmt.Errorf("no valid participation keys to generate proposals for given round")
var errPseudonodeVerifierClosedChannel = errors.New("crypto verifier closed the output channel prematurely")
var errPseudonodeNoVotes = errors.New("no valid participation keys to generate votes for given round")
var errPseudonodeNoProposals = errors.New("no valid participation keys to generate proposals for given round")

// A pseudonode creates proposals and votes with a KeyManager which holds participation keys.
//
Expand Down Expand Up @@ -172,7 +175,7 @@ func (n asyncPseudonode) MakeProposals(ctx context.Context, r round, p period) (
return proposalTask.outputChannel(), nil
default:
proposalTask.close()
return nil, errPseudonodeBacklogFull
return nil, fmt.Errorf("unable to make proposal for (%d, %d): %w", r, p, errPseudonodeBacklogFull)
}
}

Expand All @@ -189,7 +192,7 @@ func (n asyncPseudonode) MakeVotes(ctx context.Context, r round, p period, s ste
return proposalTask.outputChannel(), nil
default:
proposalTask.close()
return nil, errPseudonodeBacklogFull
return nil, fmt.Errorf("unable to make vote for (%d, %d, %d): %w", r, p, s, errPseudonodeBacklogFull)
}
}

Expand Down Expand Up @@ -364,13 +367,20 @@ func (t pseudonodeVotesTask) execute(verifier *AsyncVoteVerifier, quit chan stru
unverifiedVotes := t.node.makeVotes(t.round, t.period, t.step, t.prop, t.participation)
t.node.log.Infof("pseudonode: made %v votes", len(unverifiedVotes))
results := make(chan asyncVerifyVoteResponse, len(unverifiedVotes))
orderedResults := make([]asyncVerifyVoteResponse, len(unverifiedVotes))
asyncVerifyingVotes := len(unverifiedVotes)
for i, uv := range unverifiedVotes {
msg := message{Tag: protocol.AgreementVoteTag, UnauthenticatedVote: uv}
verifier.verifyVote(context.TODO(), t.node.ledger, uv, i, msg, results)
err := verifier.verifyVote(context.TODO(), t.node.ledger, uv, i, msg, results)
if err != nil {
orderedResults[i].err = err
t.node.log.Infof("pseudonode.makeVotes: failed to enqueue vote verification for (%d, %d): %v", t.round, t.period, err)
asyncVerifyingVotes--
continue
}
}

orderedResults := make([]asyncVerifyVoteResponse, len(unverifiedVotes))
for i := 0; i < len(unverifiedVotes); i++ {
for i := 0; i < asyncVerifyingVotes; i++ {
resp := <-results
orderedResults[resp.index] = resp
}
Expand Down Expand Up @@ -437,16 +447,26 @@ func (t pseudonodeVotesTask) execute(verifier *AsyncVoteVerifier, quit chan stru
}
t.node.monitor.dec(pseudonodeCoserviceType)

outputTimeout := time.After(maxPseudonodeOutputWaitDuration)

// push results into channel.
verifiedVotesLoop:
for _, r := range verifiedResults {
select {
case t.out <- messageEvent{T: voteVerified, Input: r.message, Err: makeSerErr(r.err)}:
t.node.keys.Record(r.v.R.Sender, r.v.R.Round, account.Vote)
case <-quit:
return
case <-t.context.Done():
// we done care about the output anymore; just exit.
return
for {
select {
case t.out <- messageEvent{T: voteVerified, Input: r.message, Err: makeSerErr(r.err)}:
t.node.keys.Record(r.v.R.Sender, r.v.R.Round, account.Vote)
continue verifiedVotesLoop
case <-quit:
return
case <-t.context.Done():
// we done care about the output anymore; just exit.
return
case <-outputTimeout:
// we've been waiting for too long for this vote to be written to the output.
t.node.log.Warnf("pseudonode.makeVotes: unable to write vote to output channel for round %d, period %d", t.round, t.period)
outputTimeout = nil
}
}
}
}
Expand Down Expand Up @@ -475,13 +495,20 @@ func (t pseudonodeProposalsTask) execute(verifier *AsyncVoteVerifier, quit chan
// For now, don't log at all, and revisit when the metric becomes more important.

results := make(chan asyncVerifyVoteResponse, len(votes))
cryptoOutputs := make([]asyncVerifyVoteResponse, len(votes))
asyncVerifyingVotes := len(votes)
for i, uv := range votes {
msg := message{Tag: protocol.AgreementVoteTag, UnauthenticatedVote: uv}
verifier.verifyVote(context.TODO(), t.node.ledger, uv, i, msg, results)
err := verifier.verifyVote(context.TODO(), t.node.ledger, uv, i, msg, results)
if err != nil {
cryptoOutputs[i].err = err
t.node.log.Infof("pseudonode.makeProposals: failed to enqueue vote verification for (%d, %d): %v", t.round, t.period, err)
asyncVerifyingVotes--
continue
}
}

cryptoOutputs := make([]asyncVerifyVoteResponse, len(votes))
for i := 0; i < len(votes); i++ {
for i := 0; i < asyncVerifyingVotes; i++ {
resp := <-results
cryptoOutputs[resp.index] = resp
}
Expand Down Expand Up @@ -525,28 +552,45 @@ func (t pseudonodeProposalsTask) execute(verifier *AsyncVoteVerifier, quit chan
}
t.node.monitor.dec(pseudonodeCoserviceType)

outputTimeout := time.After(maxPseudonodeOutputWaitDuration)
// push results into channel.
verifiedVotesLoop:
for _, r := range verifiedVotes {
select {
case t.out <- messageEvent{T: voteVerified, Input: r.message, Err: makeSerErr(r.err)}:
t.node.keys.Record(r.v.R.Sender, r.v.R.Round, account.BlockProposal)
case <-quit:
return
case <-t.context.Done():
// we done care about the output anymore; just exit.
return
for {
select {
case t.out <- messageEvent{T: voteVerified, Input: r.message, Err: makeSerErr(r.err)}:
t.node.keys.Record(r.v.R.Sender, r.v.R.Round, account.BlockProposal)
continue verifiedVotesLoop
case <-quit:
return
case <-t.context.Done():
// we done care about the output anymore; just exit.
return
case <-outputTimeout:
// we've been waiting for too long for this vote to be written to the output.
t.node.log.Warnf("pseudonode.makeProposals: unable to write proposal vote to output channel for round %d, period %d", t.round, t.period)
outputTimeout = nil
}
}
}

verifiedPayloadsLoop:
for _, payload := range verifiedPayloads {
msg := message{Tag: protocol.ProposalPayloadTag, UnauthenticatedProposal: payload.u(), Proposal: payload}
select {
case t.out <- messageEvent{T: payloadVerified, Input: msg}:
case <-quit:
return
case <-t.context.Done():
// we done care about the output anymore; just exit.
return
for {
select {
case t.out <- messageEvent{T: payloadVerified, Input: msg}:
continue verifiedPayloadsLoop
case <-quit:
return
case <-t.context.Done():
// we done care about the output anymore; just exit.
return
case <-outputTimeout:
// we've been waiting for too long for this vote to be written to the output.
t.node.log.Warnf("pseudonode.makeProposals: unable to write proposal payload to output channel for round %d, period %d", t.round, t.period)
outputTimeout = nil
}
}
}
}
Loading