Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generalize builder #1767

Merged
merged 14 commits into from
Nov 14, 2024
22 changes: 0 additions & 22 deletions internal/builder/dependencies.go

This file was deleted.

13 changes: 8 additions & 5 deletions internal/builder/manual.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,21 @@ import (
"context"

"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils/logging"
)

var _ Builder = (*Manual)(nil)

type Manual struct {
vm VM
engineCh chan<- common.Message
logger logging.Logger
doneBuild chan struct{}
}

func NewManual(vm VM) *Manual {
func NewManual(engineCh chan<- common.Message, logger logging.Logger) *Manual {
return &Manual{
vm: vm,
engineCh: engineCh,
logger: logger,
doneBuild: make(chan struct{}),
}
}
Expand All @@ -32,9 +35,9 @@ func (*Manual) Queue(context.Context) {}

func (b *Manual) Force(context.Context) error {
select {
case b.vm.EngineChan() <- common.PendingTxs:
case b.engineCh <- common.PendingTxs:
default:
b.vm.Logger().Debug("dropping message to consensus engine")
b.logger.Debug("dropping message to consensus engine")
}
return nil
}
Expand Down
58 changes: 34 additions & 24 deletions internal/builder/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/timer"
"go.uber.org/zap"
)
Expand All @@ -19,22 +20,32 @@ import (
// TODO: consider replacing this with AvalancheGo block build metering
const minBuildGap int64 = 25 // ms

var _ Builder = (*Time)(nil)
type Mempool interface {
Len(context.Context) int // items
}

type GetPreferedTimestampAndBlockGap func(int64) (int64, int64)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could we add variable names and a comment to make this more easily readable and an error, so that we don't need to use a special case of the latter return value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure!


// Time tells the engine when to build blocks and gossip transactions
type Time struct {
vm VM
doneBuild chan struct{}
engineCh chan<- common.Message
logger logging.Logger
mempool Mempool
getPreferedTimestampAndBlockGap GetPreferedTimestampAndBlockGap
doneBuild chan struct{}

timer *timer.Timer
lastQueue int64
waiting atomic.Bool
}

func NewTime(vm VM) *Time {
func NewTime(engineCh chan<- common.Message, logger logging.Logger, mempool Mempool, getPreferedTimestampAndBlockGap GetPreferedTimestampAndBlockGap) *Time {
b := &Time{
vm: vm,
doneBuild: make(chan struct{}),
engineCh: engineCh,
logger: logger,
mempool: mempool,
getPreferedTimestampAndBlockGap: getPreferedTimestampAndBlockGap,
doneBuild: make(chan struct{}),
}
b.timer = timer.NewTimer(b.handleTimerNotify)
return b
Expand All @@ -47,16 +58,15 @@ func (b *Time) Run() {

func (b *Time) handleTimerNotify() {
if err := b.Force(context.TODO()); err != nil {
b.vm.Logger().Warn("unable to build", zap.Error(err))
b.logger.Warn("unable to build", zap.Error(err))
} else {
txs := b.vm.Mempool().Len(context.TODO())
b.vm.Logger().Debug("trigger to notify", zap.Int("txs", txs))
txs := b.mempool.Len(context.TODO())
b.logger.Debug("trigger to notify", zap.Int("txs", txs))
}
b.waiting.Store(false)
}

func (b *Time) nextTime(now int64, preferred int64) int64 {
gap := b.vm.Rules(now).GetMinBlockGap()
func (b *Time) nextTime(now, preferred, gap int64) int64 {
next := max(b.lastQueue+minBuildGap, preferred+gap)
if next < now {
return -1
Expand All @@ -66,39 +76,39 @@ func (b *Time) nextTime(now int64, preferred int64) int64 {

func (b *Time) Queue(ctx context.Context) {
if !b.waiting.CompareAndSwap(false, true) {
b.vm.Logger().Debug("unable to acquire waiting lock")
b.logger.Debug("unable to acquire waiting lock")
return
}
preferredBlk, err := b.vm.PreferredBlock(context.TODO())
if err != nil {
b.waiting.Store(false)
b.vm.Logger().Warn("unable to load preferred block", zap.Error(err))
now := time.Now().UnixMilli()
preferred, gap := b.getPreferedTimestampAndBlockGap(now)
if gap < 0 {
// unable to retrieve block.
aaronbuchwald marked this conversation as resolved.
Show resolved Hide resolved
b.logger.Warn("unable to get preferred timestamp and block gap")
return
}
now := time.Now().UnixMilli()
next := b.nextTime(now, preferredBlk.Tmstmp)
next := b.nextTime(now, preferred, gap)
if next < 0 {
if err := b.Force(ctx); err != nil {
b.vm.Logger().Warn("unable to build", zap.Error(err))
b.logger.Warn("unable to build", zap.Error(err))
} else {
txs := b.vm.Mempool().Len(context.TODO())
b.vm.Logger().Debug("notifying to build without waiting", zap.Int("txs", txs))
txs := b.mempool.Len(context.TODO())
b.logger.Debug("notifying to build without waiting", zap.Int("txs", txs))
}
b.waiting.Store(false)
return
}
sleep := next - now
sleepDur := time.Duration(sleep * int64(time.Millisecond))
b.timer.SetTimeoutIn(sleepDur)
b.vm.Logger().Debug("waiting to notify to build", zap.Duration("t", sleepDur))
b.logger.Debug("waiting to notify to build", zap.Duration("t", sleepDur))
}

func (b *Time) Force(context.Context) error {
select {
case b.vm.EngineChan() <- common.PendingTxs:
case b.engineCh <- common.PendingTxs:
b.lastQueue = time.Now().UnixMilli()
default:
b.vm.Logger().Debug("dropping message to consensus engine")
b.logger.Debug("dropping message to consensus engine")
}
return nil
}
Expand Down
1 change: 0 additions & 1 deletion internal/gossiper/dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ type VM interface {
Proposers(ctx context.Context, diff int, depth int) (set.Set[ids.NodeID], error)
IsValidator(context.Context, ids.NodeID) (bool, error)
Logger() logging.Logger
PreferredBlock(context.Context) (*chain.ExecutionBlock, error)
ActionCodec() *codec.TypeParser[chain.Action]
AuthCodec() *codec.TypeParser[chain.Auth]
NodeID() ids.NodeID
Expand Down
14 changes: 0 additions & 14 deletions vm/resolutions.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/ava-labs/avalanchego/api/metrics"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/snow/engine/snowman/block"
"github.com/ava-labs/avalanchego/snow/validators"
"github.com/ava-labs/avalanchego/trace"
Expand All @@ -34,7 +33,6 @@ import (

var (
_ gossiper.VM = (*VM)(nil)
_ builder.VM = (*VM)(nil)
_ block.ChainVM = (*VM)(nil)
_ block.StateSyncableVM = (*VM)(nil)
)
Expand Down Expand Up @@ -310,14 +308,6 @@ func (vm *VM) NodeID() ids.NodeID {
return vm.snowCtx.NodeID
}

func (vm *VM) PreferredBlock(ctx context.Context) (*chain.ExecutionBlock, error) {
blk, err := vm.GetStatefulBlock(ctx, vm.preferred)
if err != nil {
return nil, err
}
return blk.ExecutionBlock, nil
}

func (vm *VM) PreferredHeight(ctx context.Context) (uint64, error) {
preferredBlk, err := vm.GetStatefulBlock(ctx, vm.preferred)
if err != nil {
Expand All @@ -330,10 +320,6 @@ func (vm *VM) StopChan() chan struct{} {
return vm.stop
}

func (vm *VM) EngineChan() chan<- common.Message {
return vm.toEngine
}

// Used for integration and load testing
func (vm *VM) Builder() builder.Builder {
return vm.builder
Expand Down
12 changes: 10 additions & 2 deletions vm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,6 @@ func (vm *VM) Initialize(
}

// Set defaults
vm.builder = builder.NewTime(vm)
vm.gossiper = txGossiper
options := &Options{}
for _, Option := range vm.options {
Expand Down Expand Up @@ -324,6 +323,15 @@ func (vm *VM) Initialize(

vm.mempool = mempool.New[*chain.Transaction](vm.tracer, vm.config.MempoolSize, vm.config.MempoolSponsorSize)

vm.builder = builder.NewTime(toEngine, snowCtx.Log, vm.mempool, func(t int64) (int64, int64) {
blk, err := vm.GetStatefulBlock(context.TODO(), vm.preferred)
if err != nil {
vm.snowCtx.Log.Warn("unable to load preferred block", zap.Error(err))
return 0, -1
}
return blk.Tmstmp, vm.ruleFactory.GetRules(t).GetMinBlockGap()
})

vm.chainTimeValidityWindow = chain.NewTimeValidityWindow(vm.snowCtx.Log, vm.tracer, vm)
registerer := prometheus.NewRegistry()
if err := vm.snowCtx.Metrics.Register("chain", registerer); err != nil {
Expand Down Expand Up @@ -524,7 +532,7 @@ func (vm *VM) applyOptions(o *Options) {
vm.blockSubscriptionFactories = o.blockSubscriptionFactories
vm.vmAPIHandlerFactories = o.vmAPIHandlerFactories
if o.builder {
vm.builder = builder.NewManual(vm)
vm.builder = builder.NewManual(vm.toEngine, vm.snowCtx.Log)
}
if o.gossiper {
vm.gossiper = gossiper.NewManual(vm)
Expand Down
Loading