Skip to content

Add metric to track the stake weight of block providers #2376

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion snow/engine/snowman/issuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
// issuer issues [blk] into to consensus after its dependencies are met.
type issuer struct {
t *Transitive
nodeID ids.NodeID // nodeID of the peer that provided this block
blk snowman.Block
abandoned bool
deps set.Set[ids.ID]
Expand Down Expand Up @@ -51,5 +52,5 @@ func (i *issuer) Update(ctx context.Context) {
return
}
// Issue the block into consensus
i.t.errs.Add(i.t.deliver(ctx, i.blk, i.push))
i.t.errs.Add(i.t.deliver(ctx, i.nodeID, i.blk, i.push))
}
8 changes: 8 additions & 0 deletions snow/engine/snowman/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type metrics struct {
numProcessingAncestorFetchesUnneeded prometheus.Counter
getAncestorsBlks metric.Averager
selectedVoteIndex metric.Averager
issuerStake metric.Averager
}

func (m *metrics) Initialize(namespace string, reg prometheus.Registerer) error {
Expand Down Expand Up @@ -115,6 +116,13 @@ func (m *metrics) Initialize(namespace string, reg prometheus.Registerer) error
reg,
&errs,
)
m.issuerStake = metric.NewAveragerWithErrs(
namespace,
"issuer_stake",
"stake weight of the peer who provided a block that was issued into consensus",
reg,
&errs,
)

errs.Add(
reg.Register(m.bootstrapFinished),
Expand Down
26 changes: 15 additions & 11 deletions snow/engine/snowman/transitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ func (t *Transitive) Start(ctx context.Context, startReqID uint32) error {
default:
for _, blk := range options {
// note that deliver will set the VM's preference
if err := t.deliver(ctx, blk, false); err != nil {
if err := t.deliver(ctx, t.Ctx.NodeID, blk, false); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly assume this is only after BuildBlock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is called during the startup of the engine. This piece of code is really only for completeness w.r.t. oracle blocks. Specifically, if the ProposalBlock is accepted during bootstrapping but neither of the Option blocks are - then we initially add the options with our preferences. For large networks like mainnet / fuji this code path probably isn't needed because other nodes would gossip the Option blocks and we can just adopt either preference... But this avoids any weird livelock or preference biasing cases.

As far as this goes though - the only options would either be considering ourselves as the block builder or trying to somehow track where we got this block from during bootstrapping (which could cross the restart boundary)

return err
}
}
Expand Down Expand Up @@ -650,7 +650,7 @@ func (t *Transitive) issueFrom(ctx context.Context, nodeID ids.NodeID, blk snowm
// issue [blk] and its ancestors to consensus.
blkID := blk.ID()
for !t.wasIssued(blk) {
if err := t.issue(ctx, blk, false); err != nil {
if err := t.issue(ctx, nodeID, blk, false); err != nil {
return false, err
}

Expand Down Expand Up @@ -690,7 +690,7 @@ func (t *Transitive) issueWithAncestors(ctx context.Context, blk snowman.Block)
// issue [blk] and its ancestors into consensus
status := blk.Status()
for status.Fetched() && !t.wasIssued(blk) {
err := t.issue(ctx, blk, true)
err := t.issue(ctx, t.Ctx.NodeID, blk, true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming this is only called after BuildBlock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct - issueWithAncestors is only called by buildBlocks.

if err != nil {
return false, err
}
Expand Down Expand Up @@ -732,7 +732,7 @@ func (t *Transitive) wasIssued(blk snowman.Block) bool {
// Issue [blk] to consensus once its ancestors have been issued.
// If [push] is true, a push query will be used. Otherwise, a pull query will be
// used.
func (t *Transitive) issue(ctx context.Context, blk snowman.Block, push bool) error {
func (t *Transitive) issue(ctx context.Context, nodeID ids.NodeID, blk snowman.Block, push bool) error {
blkID := blk.ID()

// mark that the block is queued to be added to consensus once its ancestors have been
Expand All @@ -743,9 +743,10 @@ func (t *Transitive) issue(ctx context.Context, blk snowman.Block, push bool) er

// Will add [blk] to consensus once its ancestors have been
i := &issuer{
t: t,
blk: blk,
push: push,
t: t,
nodeID: nodeID,
blk: blk,
push: push,
}

// block on the parent if needed
Expand Down Expand Up @@ -849,7 +850,7 @@ func (t *Transitive) sendQuery(
// issue [blk] to consensus
// If [push] is true, a push query will be used. Otherwise, a pull query will be
// used.
func (t *Transitive) deliver(ctx context.Context, blk snowman.Block, push bool) error {
func (t *Transitive) deliver(ctx context.Context, nodeID ids.NodeID, blk snowman.Block, push bool) error {
blkID := blk.ID()
if t.Consensus.Decided(blk) || t.Consensus.Processing(blkID) {
return nil
Expand All @@ -875,7 +876,7 @@ func (t *Transitive) deliver(ctx context.Context, blk snowman.Block, push bool)
// By ensuring that the parent is either processing or accepted, it is
// guaranteed that the parent was successfully verified. This means that
// calling Verify on this block is allowed.
blkAdded, err := t.addUnverifiedBlockToConsensus(ctx, blk)
blkAdded, err := t.addUnverifiedBlockToConsensus(ctx, nodeID, blk)
if err != nil {
return err
}
Expand All @@ -899,7 +900,7 @@ func (t *Transitive) deliver(ctx context.Context, blk snowman.Block, push bool)
}

for _, blk := range options {
blkAdded, err := t.addUnverifiedBlockToConsensus(ctx, blk)
blkAdded, err := t.addUnverifiedBlockToConsensus(ctx, nodeID, blk)
if err != nil {
return err
}
Expand Down Expand Up @@ -979,12 +980,13 @@ func (t *Transitive) addToNonVerifieds(blk snowman.Block) {

// addUnverifiedBlockToConsensus returns whether the block was added and an
// error if one occurred while adding it to consensus.
func (t *Transitive) addUnverifiedBlockToConsensus(ctx context.Context, blk snowman.Block) (bool, error) {
func (t *Transitive) addUnverifiedBlockToConsensus(ctx context.Context, nodeID ids.NodeID, blk snowman.Block) (bool, error) {
blkID := blk.ID()

// make sure this block is valid
if err := blk.Verify(ctx); err != nil {
t.Ctx.Log.Debug("block verification failed",
zap.Stringer("nodeID", nodeID),
zap.Stringer("blkID", blkID),
zap.Error(err),
)
Expand All @@ -997,7 +999,9 @@ func (t *Transitive) addUnverifiedBlockToConsensus(ctx context.Context, blk snow
t.nonVerifieds.Remove(blkID)
t.nonVerifiedCache.Evict(blkID)
t.metrics.numNonVerifieds.Set(float64(t.nonVerifieds.Len()))
t.metrics.issuerStake.Observe(float64(t.Validators.GetWeight(t.Ctx.SubnetID, nodeID)))
t.Ctx.Log.Verbo("adding block to consensus",
zap.Stringer("nodeID", nodeID),
zap.Stringer("blkID", blkID),
)
return true, t.Consensus.Add(ctx, &memoryBlock{
Expand Down
34 changes: 17 additions & 17 deletions snow/engine/snowman/transitive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func TestEngineMultipleQuery(t *testing.T) {
}
}

require.NoError(te.issue(context.Background(), blk0, false))
require.NoError(te.issue(context.Background(), te.Ctx.NodeID, blk0, false))

blk1 := &snowman.TestBlock{
TestDecidable: choices.TestDecidable{
Expand Down Expand Up @@ -522,10 +522,10 @@ func TestEngineBlockedIssue(t *testing.T) {
}
}

require.NoError(te.issue(context.Background(), blk1, false))
require.NoError(te.issue(context.Background(), te.Ctx.NodeID, blk1, false))

blk0.StatusV = choices.Processing
require.NoError(te.issue(context.Background(), blk0, false))
require.NoError(te.issue(context.Background(), te.Ctx.NodeID, blk0, false))

require.Equal(blk1.ID(), te.Consensus.Preference())
}
Expand Down Expand Up @@ -558,7 +558,7 @@ func TestEngineAbandonResponse(t *testing.T) {
return nil, errUnknownBlock
}

require.NoError(te.issue(context.Background(), blk, false))
require.NoError(te.issue(context.Background(), te.Ctx.NodeID, blk, false))
require.NoError(te.QueryFailed(context.Background(), vdr, 1))

require.Empty(te.blocked)
Expand Down Expand Up @@ -797,7 +797,7 @@ func TestVoteCanceling(t *testing.T) {
require.Equal(uint64(1), requestedHeight)
}

require.NoError(te.issue(context.Background(), blk, true))
require.NoError(te.issue(context.Background(), te.Ctx.NodeID, blk, true))

require.Equal(1, te.polls.Len())

Expand Down Expand Up @@ -858,7 +858,7 @@ func TestEngineNoQuery(t *testing.T) {
BytesV: []byte{1},
}

require.NoError(te.issue(context.Background(), blk, false))
require.NoError(te.issue(context.Background(), te.Ctx.NodeID, blk, false))
}

func TestEngineNoRepollQuery(t *testing.T) {
Expand Down Expand Up @@ -961,7 +961,7 @@ func TestEngineAbandonChit(t *testing.T) {
reqID = requestID
}

require.NoError(te.issue(context.Background(), blk, false))
require.NoError(te.issue(context.Background(), te.Ctx.NodeID, blk, false))

fakeBlkID := ids.GenerateTestID()
vm.GetBlockF = func(_ context.Context, id ids.ID) (snowman.Block, error) {
Expand Down Expand Up @@ -1016,7 +1016,7 @@ func TestEngineAbandonChitWithUnexpectedPutBlock(t *testing.T) {
reqID = requestID
}

require.NoError(te.issue(context.Background(), blk, true))
require.NoError(te.issue(context.Background(), te.Ctx.NodeID, blk, true))

fakeBlkID := ids.GenerateTestID()
vm.GetBlockF = func(_ context.Context, id ids.ID) (snowman.Block, error) {
Expand Down Expand Up @@ -1099,7 +1099,7 @@ func TestEngineBlockingChitRequest(t *testing.T) {
return blockingBlk, nil
}

require.NoError(te.issue(context.Background(), parentBlk, false))
require.NoError(te.issue(context.Background(), te.Ctx.NodeID, parentBlk, false))

sender.CantSendChits = false

Expand All @@ -1110,7 +1110,7 @@ func TestEngineBlockingChitRequest(t *testing.T) {
sender.CantSendPullQuery = false

missingBlk.StatusV = choices.Processing
require.NoError(te.issue(context.Background(), missingBlk, false))
require.NoError(te.issue(context.Background(), te.Ctx.NodeID, missingBlk, false))

require.Empty(te.blocked)
}
Expand Down Expand Up @@ -1163,7 +1163,7 @@ func TestEngineBlockingChitResponse(t *testing.T) {
}
}

require.NoError(te.issue(context.Background(), blockingBlk, false))
require.NoError(te.issue(context.Background(), te.Ctx.NodeID, blockingBlk, false))

queryRequestID := new(uint32)
sender.SendPullQueryF = func(_ context.Context, inVdrs set.Set[ids.NodeID], requestID uint32, blkID ids.ID, requestedHeight uint64) {
Expand All @@ -1174,7 +1174,7 @@ func TestEngineBlockingChitResponse(t *testing.T) {
require.Equal(uint64(1), requestedHeight)
}

require.NoError(te.issue(context.Background(), issuedBlk, false))
require.NoError(te.issue(context.Background(), te.Ctx.NodeID, issuedBlk, false))

sender.SendPushQueryF = nil
sender.CantSendPushQuery = false
Expand All @@ -1185,7 +1185,7 @@ func TestEngineBlockingChitResponse(t *testing.T) {
sender.CantSendPullQuery = false

missingBlk.StatusV = choices.Processing
require.NoError(te.issue(context.Background(), missingBlk, false))
require.NoError(te.issue(context.Background(), te.Ctx.NodeID, missingBlk, false))
}

func TestEngineRetryFetch(t *testing.T) {
Expand Down Expand Up @@ -1281,9 +1281,9 @@ func TestEngineUndeclaredDependencyDeadlock(t *testing.T) {
return nil, errUnknownBlock
}
}
require.NoError(te.issue(context.Background(), validBlk, false))
require.NoError(te.issue(context.Background(), te.Ctx.NodeID, validBlk, false))
sender.SendPushQueryF = nil
require.NoError(te.issue(context.Background(), invalidBlk, false))
require.NoError(te.issue(context.Background(), te.Ctx.NodeID, invalidBlk, false))
require.NoError(te.Chits(context.Background(), vdr, *reqID, invalidBlkID, invalidBlkID, invalidBlkID))

require.Equal(choices.Accepted, validBlk.Status())
Expand Down Expand Up @@ -1666,7 +1666,7 @@ func TestEngineDoubleChit(t *testing.T) {
require.Equal(blk.ID(), blkID)
require.Equal(uint64(1), requestedHeight)
}
require.NoError(te.issue(context.Background(), blk, false))
require.NoError(te.issue(context.Background(), te.Ctx.NodeID, blk, false))

vm.GetBlockF = func(_ context.Context, id ids.ID) (snowman.Block, error) {
switch id {
Expand Down Expand Up @@ -2785,7 +2785,7 @@ func TestEngineApplyAcceptedFrontierInQueryFailed(t *testing.T) {
require.Equal(uint64(1), requestedHeight)
}

require.NoError(te.issue(context.Background(), blk, true))
require.NoError(te.issue(context.Background(), te.Ctx.NodeID, blk, true))

vm.GetBlockF = func(_ context.Context, id ids.ID) (snowman.Block, error) {
switch id {
Expand Down