Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generalize builder #1767

Merged
merged 14 commits into from
Nov 14, 2024
22 changes: 0 additions & 22 deletions internal/builder/dependencies.go

This file was deleted.

13 changes: 8 additions & 5 deletions internal/builder/manual.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,21 @@ import (
"context"

"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils/logging"
)

var _ Builder = (*Manual)(nil)

type Manual struct {
vm VM
engineCh chan<- common.Message
logger logging.Logger
doneBuild chan struct{}
}

func NewManual(vm VM) *Manual {
func NewManual(engineCh chan<- common.Message, logger logging.Logger) *Manual {
return &Manual{
vm: vm,
engineCh: engineCh,
logger: logger,
doneBuild: make(chan struct{}),
}
}
Expand All @@ -32,9 +35,9 @@ func (*Manual) Queue(context.Context) {}

func (b *Manual) Force(context.Context) error {
select {
case b.vm.EngineChan() <- common.PendingTxs:
case b.engineCh <- common.PendingTxs:
default:
b.vm.Logger().Debug("dropping message to consensus engine")
b.logger.Debug("dropping message to consensus engine")
}
return nil
}
Expand Down
58 changes: 36 additions & 22 deletions internal/builder/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/timer"
"go.uber.org/zap"
)
Expand All @@ -19,22 +20,35 @@ import (
// TODO: consider replacing this with AvalancheGo block build metering
const minBuildGap int64 = 25 // ms

var _ Builder = (*Time)(nil)
type Mempool interface {
Len(context.Context) int // items
}

// GetPreferredTimestampAndBlockGap function accepts the current timestamp and returns
// the preferred block's timestamp and the current rule's block gap.
// If it fails to get the block, it should return a non-nil error.
type GetPreferredTimestampAndBlockGap func(now int64) (preferredTimestamp int64, blockGap int64, err error)

// Time tells the engine when to build blocks and gossip transactions
type Time struct {
vm VM
doneBuild chan struct{}
engineCh chan<- common.Message
logger logging.Logger
mempool Mempool
getPreferredTimestampAndBlockGap GetPreferredTimestampAndBlockGap
doneBuild chan struct{}

timer *timer.Timer
lastQueue int64
waiting atomic.Bool
}

func NewTime(vm VM) *Time {
func NewTime(engineCh chan<- common.Message, logger logging.Logger, mempool Mempool, getPreferredTimestampAndBlockGap GetPreferredTimestampAndBlockGap) *Time {
b := &Time{
vm: vm,
doneBuild: make(chan struct{}),
engineCh: engineCh,
logger: logger,
mempool: mempool,
getPreferredTimestampAndBlockGap: getPreferredTimestampAndBlockGap,
doneBuild: make(chan struct{}),
}
b.timer = timer.NewTimer(b.handleTimerNotify)
return b
Expand All @@ -47,16 +61,15 @@ func (b *Time) Run() {

func (b *Time) handleTimerNotify() {
if err := b.Force(context.TODO()); err != nil {
b.vm.Logger().Warn("unable to build", zap.Error(err))
b.logger.Warn("unable to build", zap.Error(err))
} else {
txs := b.vm.Mempool().Len(context.TODO())
b.vm.Logger().Debug("trigger to notify", zap.Int("txs", txs))
txs := b.mempool.Len(context.TODO())
b.logger.Debug("trigger to notify", zap.Int("txs", txs))
}
b.waiting.Store(false)
}

func (b *Time) nextTime(now int64, preferred int64) int64 {
gap := b.vm.Rules(now).GetMinBlockGap()
func (b *Time) nextTime(now, preferred, gap int64) int64 {
next := max(b.lastQueue+minBuildGap, preferred+gap)
if next < now {
return -1
Expand All @@ -66,39 +79,40 @@ func (b *Time) nextTime(now int64, preferred int64) int64 {

func (b *Time) Queue(ctx context.Context) {
if !b.waiting.CompareAndSwap(false, true) {
b.vm.Logger().Debug("unable to acquire waiting lock")
b.logger.Debug("unable to acquire waiting lock")
return
}
preferredBlk, err := b.vm.PreferredBlock(context.TODO())
now := time.Now().UnixMilli()
preferred, gap, err := b.getPreferredTimestampAndBlockGap(now)
if err != nil {
// unable to retrieve block.
aaronbuchwald marked this conversation as resolved.
Show resolved Hide resolved
b.waiting.Store(false)
b.vm.Logger().Warn("unable to load preferred block", zap.Error(err))
b.logger.Warn("unable to get preferred timestamp and block gap", zap.Error(err))
return
}
now := time.Now().UnixMilli()
next := b.nextTime(now, preferredBlk.Tmstmp)
next := b.nextTime(now, preferred, gap)
if next < 0 {
if err := b.Force(ctx); err != nil {
b.vm.Logger().Warn("unable to build", zap.Error(err))
b.logger.Warn("unable to build", zap.Error(err))
} else {
txs := b.vm.Mempool().Len(context.TODO())
b.vm.Logger().Debug("notifying to build without waiting", zap.Int("txs", txs))
txs := b.mempool.Len(context.TODO())
b.logger.Debug("notifying to build without waiting", zap.Int("txs", txs))
}
b.waiting.Store(false)
return
}
sleep := next - now
sleepDur := time.Duration(sleep * int64(time.Millisecond))
b.timer.SetTimeoutIn(sleepDur)
b.vm.Logger().Debug("waiting to notify to build", zap.Duration("t", sleepDur))
b.logger.Debug("waiting to notify to build", zap.Duration("t", sleepDur))
}

func (b *Time) Force(context.Context) error {
select {
case b.vm.EngineChan() <- common.PendingTxs:
case b.engineCh <- common.PendingTxs:
b.lastQueue = time.Now().UnixMilli()
default:
b.vm.Logger().Debug("dropping message to consensus engine")
b.logger.Debug("dropping message to consensus engine")
}
return nil
}
Expand Down
1 change: 0 additions & 1 deletion internal/gossiper/dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ type VM interface {
Proposers(ctx context.Context, diff int, depth int) (set.Set[ids.NodeID], error)
IsValidator(context.Context, ids.NodeID) (bool, error)
Logger() logging.Logger
PreferredBlock(context.Context) (*chain.ExecutionBlock, error)
ActionCodec() *codec.TypeParser[chain.Action]
AuthCodec() *codec.TypeParser[chain.Auth]
NodeID() ids.NodeID
Expand Down
14 changes: 0 additions & 14 deletions vm/resolutions.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/ava-labs/avalanchego/api/metrics"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/snow/engine/snowman/block"
"github.com/ava-labs/avalanchego/snow/validators"
"github.com/ava-labs/avalanchego/trace"
Expand All @@ -34,7 +33,6 @@ import (

var (
_ gossiper.VM = (*VM)(nil)
_ builder.VM = (*VM)(nil)
_ block.ChainVM = (*VM)(nil)
_ block.StateSyncableVM = (*VM)(nil)
)
Expand Down Expand Up @@ -310,14 +308,6 @@ func (vm *VM) NodeID() ids.NodeID {
return vm.snowCtx.NodeID
}

func (vm *VM) PreferredBlock(ctx context.Context) (*chain.ExecutionBlock, error) {
blk, err := vm.GetStatefulBlock(ctx, vm.preferred)
if err != nil {
return nil, err
}
return blk.ExecutionBlock, nil
}

func (vm *VM) PreferredHeight(ctx context.Context) (uint64, error) {
preferredBlk, err := vm.GetStatefulBlock(ctx, vm.preferred)
if err != nil {
Expand All @@ -330,10 +320,6 @@ func (vm *VM) StopChan() chan struct{} {
return vm.stop
}

func (vm *VM) EngineChan() chan<- common.Message {
return vm.toEngine
}

// Used for integration and load testing
func (vm *VM) Builder() builder.Builder {
return vm.builder
Expand Down
11 changes: 9 additions & 2 deletions vm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,6 @@ func (vm *VM) Initialize(
}

// Set defaults
vm.builder = builder.NewTime(vm)
vm.gossiper = txGossiper
options := &Options{}
for _, Option := range vm.options {
Expand Down Expand Up @@ -324,6 +323,14 @@ func (vm *VM) Initialize(

vm.mempool = mempool.New[*chain.Transaction](vm.tracer, vm.config.MempoolSize, vm.config.MempoolSponsorSize)

vm.builder = builder.NewTime(toEngine, snowCtx.Log, vm.mempool, func(t int64) (int64, int64, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm moving this below where we apply options means that we will set the builder to this value even if the option to override the builder is specified. Moving this back above options causes the integration tests (which use the manual builder) to fail.

Seems this change may have introduced a bug in the manual builder that is covered up by this change. We could use builder.NewTime rather than manual implementation (clearly we don't need the manual implementation).

blk, err := vm.GetStatefulBlock(context.TODO(), vm.preferred)
if err != nil {
return 0, 0, err
}
return blk.Tmstmp, vm.ruleFactory.GetRules(t).GetMinBlockGap(), nil
})

vm.chainTimeValidityWindow = chain.NewTimeValidityWindow(vm.snowCtx.Log, vm.tracer, vm)
registerer := prometheus.NewRegistry()
if err := vm.snowCtx.Metrics.Register("chain", registerer); err != nil {
Expand Down Expand Up @@ -524,7 +531,7 @@ func (vm *VM) applyOptions(o *Options) {
vm.blockSubscriptionFactories = o.blockSubscriptionFactories
vm.vmAPIHandlerFactories = o.vmAPIHandlerFactories
if o.builder {
vm.builder = builder.NewManual(vm)
vm.builder = builder.NewManual(vm.toEngine, vm.snowCtx.Log)
}
if o.gossiper {
vm.gossiper = gossiper.NewManual(vm)
Expand Down
Loading