Skip to content

Commit

Permalink
Simplify validity window interface (#1875)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronbuchwald authored Jan 17, 2025
1 parent ab02143 commit ffd9f98
Show file tree
Hide file tree
Showing 12 changed files with 358 additions and 249 deletions.
5 changes: 2 additions & 3 deletions chain/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,7 @@ func (c *Builder) BuildBlock(ctx context.Context, parentOutputBlock *OutputBlock
changesEstimate := min(mempoolSize, maxViewPreallocation)

var (
ts = tstate.New(changesEstimate)
oldestAllowed = nextTime - r.GetValidityWindow()
ts = tstate.New(changesEstimate)

// restorable txs after block attempt finishes
restorableLock sync.Mutex
Expand Down Expand Up @@ -172,7 +171,7 @@ func (c *Builder) BuildBlock(ctx context.Context, parentOutputBlock *OutputBlock
// Perform a batch repeat check
// IsRepeat only returns an error if we fail to fetch the full validity window of blocks.
// This should only happen after startup, so we add the transactions back to the mempool.
dup, err := c.validityWindow.IsRepeat(ctx, parent, txs, oldestAllowed)
dup, err := c.validityWindow.IsRepeat(ctx, parent, nextTime, txs)
if err != nil {
restorable = append(restorable, txs...)
break
Expand Down
3 changes: 1 addition & 2 deletions chain/dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,12 @@ type ValidityWindow interface {
VerifyExpiryReplayProtection(
ctx context.Context,
blk validitywindow.ExecutionBlock[*Transaction],
oldestAllowed int64,
) error
Accept(blk validitywindow.ExecutionBlock[*Transaction])
IsRepeat(
ctx context.Context,
parentBlk validitywindow.ExecutionBlock[*Transaction],
currentTimestamp int64,
txs []*Transaction,
oldestAllowed int64,
) (set.Bits, error)
}
6 changes: 1 addition & 5 deletions chain/pre_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,7 @@ func (p *PreExecutor) PreExecute(
}

// Find repeats
oldestAllowed := now - r.GetValidityWindow()
if oldestAllowed < 0 {
oldestAllowed = 0
}
repeatErrs, err := p.validityWindow.IsRepeat(ctx, parentBlk, []*Transaction{tx}, oldestAllowed)
repeatErrs, err := p.validityWindow.IsRepeat(ctx, parentBlk, now, []*Transaction{tx})
if err != nil {
return err
}
Expand Down
3 changes: 1 addition & 2 deletions chain/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,7 @@ func (p *Processor) Execute(
}

if isNormalOp {
oldestAllowed := max(0, b.Tmstmp-r.GetValidityWindow())
if err := p.validityWindow.VerifyExpiryReplayProtection(ctx, b, oldestAllowed); err != nil {
if err := p.validityWindow.VerifyExpiryReplayProtection(ctx, b); err != nil {
return nil, fmt.Errorf("%w: %w", ErrDuplicateTx, err)
}
}
Expand Down
24 changes: 0 additions & 24 deletions internal/validitywindow/dependencies.go

This file was deleted.

24 changes: 10 additions & 14 deletions internal/validitywindow/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,29 @@ import (
"github.com/ava-labs/hypersdk/internal/emap"
)

// GetValidityWindowFunc is a callback function provided by the NewSyncer caller, returning the
// validity window duration for the given timestamp.
type GetValidityWindowFunc func(int64) int64

// Syncer marks sequential blocks as accepted until it has observed a full validity window
// and signals to the caller that it can begin processing blocks from that block forward.
type Syncer[Container emap.Item] struct {
chainIndex ChainIndex[Container]
timeValidityWindow *TimeValidityWindow[Container]
getValidityWindow GetValidityWindowFunc
initialBlock ExecutionBlock[Container]
type Syncer[T emap.Item] struct {
chainIndex ChainIndex[T]
timeValidityWindow *TimeValidityWindow[T]
getValidityWindow GetTimeValidityWindowFunc
initialBlock ExecutionBlock[T]
}

func NewSyncer[Container emap.Item](chainIndex ChainIndex[Container], timeValidityWindow *TimeValidityWindow[Container], getValidityWindow GetValidityWindowFunc) *Syncer[Container] {
return &Syncer[Container]{
func NewSyncer[T emap.Item](chainIndex ChainIndex[T], timeValidityWindow *TimeValidityWindow[T], getValidityWindow GetTimeValidityWindowFunc) *Syncer[T] {
return &Syncer[T]{
chainIndex: chainIndex,
timeValidityWindow: timeValidityWindow,
getValidityWindow: getValidityWindow,
}
}

func (s *Syncer[Container]) start(ctx context.Context, lastAcceptedBlock ExecutionBlock[Container]) (bool, error) {
func (s *Syncer[T]) start(ctx context.Context, lastAcceptedBlock ExecutionBlock[T]) (bool, error) {
s.initialBlock = lastAcceptedBlock
// Attempt to backfill the validity window
var (
parent = s.initialBlock
parents = []ExecutionBlock[Container]{parent}
parents = []ExecutionBlock[T]{parent}
seenValidityWindow = false
validityWindow = s.getValidityWindow(lastAcceptedBlock.GetTimestamp())
err error
Expand Down Expand Up @@ -64,7 +60,7 @@ func (s *Syncer[Container]) start(ctx context.Context, lastAcceptedBlock Executi
return seenValidityWindow, nil
}

func (s *Syncer[Container]) Accept(ctx context.Context, blk ExecutionBlock[Container]) (bool, error) {
func (s *Syncer[T]) Accept(ctx context.Context, blk ExecutionBlock[T]) (bool, error) {
if s.initialBlock == nil {
return s.start(ctx, blk)
}
Expand Down
117 changes: 81 additions & 36 deletions internal/validitywindow/validitywindow.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"sync"
"time"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/trace"
Expand All @@ -18,91 +19,131 @@ import (
"github.com/ava-labs/hypersdk/internal/emap"
)

var ErrDuplicateContainer = errors.New("duplicate container")
var (
_ Interface[emap.Item] = (*TimeValidityWindow[emap.Item])(nil)
ErrDuplicateContainer = errors.New("duplicate container")
)

type GetTimeValidityWindowFunc func(timestamp int64) int64

type ExecutionBlock[T emap.Item] interface {
GetID() ids.ID
GetParent() ids.ID
GetTimestamp() int64
GetHeight() uint64
GetContainers() []T
Contains(ids.ID) bool
}

type ChainIndex[T emap.Item] interface {
GetExecutionBlock(ctx context.Context, blkID ids.ID) (ExecutionBlock[T], error)
}

type Interface[T emap.Item] interface {
Accept(blk ExecutionBlock[T])
VerifyExpiryReplayProtection(ctx context.Context, blk ExecutionBlock[T]) error
IsRepeat(ctx context.Context, parentBlk ExecutionBlock[T], currentTimestamp int64, containers []T) (set.Bits, error)
}

type TimeValidityWindow[Container emap.Item] struct {
type TimeValidityWindow[T emap.Item] struct {
log logging.Logger
tracer trace.Tracer

lock sync.Mutex
chainIndex ChainIndex[Container]
seen *emap.EMap[Container]
chainIndex ChainIndex[T]
seen *emap.EMap[T]
lastAcceptedBlockHeight uint64
getTimeValidityWindow GetTimeValidityWindowFunc
}

func NewTimeValidityWindow[Container emap.Item](log logging.Logger, tracer trace.Tracer, chainIndex ChainIndex[Container]) *TimeValidityWindow[Container] {
return &TimeValidityWindow[Container]{
log: log,
tracer: tracer,
chainIndex: chainIndex,
seen: emap.NewEMap[Container](),
func NewTimeValidityWindow[T emap.Item](
log logging.Logger,
tracer trace.Tracer,
chainIndex ChainIndex[T],
getTimeValidityWindowF GetTimeValidityWindowFunc,
) *TimeValidityWindow[T] {
return &TimeValidityWindow[T]{
log: log,
tracer: tracer,
chainIndex: chainIndex,
seen: emap.NewEMap[T](),
getTimeValidityWindow: getTimeValidityWindowF,
}
}

func (v *TimeValidityWindow[Container]) Accept(blk ExecutionBlock[Container]) {
func (v *TimeValidityWindow[T]) Accept(blk ExecutionBlock[T]) {
// Grab the lock before modifiying seen
v.lock.Lock()
defer v.lock.Unlock()

evicted := v.seen.SetMin(blk.GetTimestamp())
v.log.Debug("txs evicted from seen", zap.Int("len", len(evicted)))
v.log.Debug("accepting block to validity window",
zap.Stringer("blkID", blk.GetID()),
zap.Time("minTimestamp", time.UnixMilli(blk.GetTimestamp())),
zap.Int("evicted", len(evicted)),
)
v.seen.Add(blk.GetContainers())
v.lastAcceptedBlockHeight = blk.GetHeight()
}

func (v *TimeValidityWindow[Container]) VerifyExpiryReplayProtection(
func (v *TimeValidityWindow[T]) VerifyExpiryReplayProtection(
ctx context.Context,
blk ExecutionBlock[Container],
oldestAllowed int64,
blk ExecutionBlock[T],
) error {
_, span := v.tracer.Start(ctx, "Chain.VerifyExpiryReplayProtection")
defer span.End()

if blk.GetHeight() <= v.lastAcceptedBlockHeight {
return nil
}

// make sure we have no repeats within the block itself.
blkContainerIDs := set.NewSet[ids.ID](len(blk.GetContainers()))
for _, container := range blk.GetContainers() {
containerID := container.GetID()
if blkContainerIDs.Contains(containerID) {
return fmt.Errorf("%w: %s", ErrDuplicateContainer, containerID)
}
blkContainerIDs.Add(containerID)
}

parent, err := v.chainIndex.GetExecutionBlock(ctx, blk.GetParent())
if err != nil {
return err
}

oldestAllowed := v.calculateOldestAllowed(blk.GetTimestamp())
dup, err := v.isRepeat(ctx, parent, oldestAllowed, blk.GetContainers(), true)
if err != nil {
return err
}
if dup.Len() > 0 {
return fmt.Errorf("%w: duplicate bytes %q for %d txs", ErrDuplicateContainer, dup.Bytes(), len(blk.GetContainers()))
}
// make sure we have no repeats within the block itself.
blkContainerIDs := set.NewSet[ids.ID](len(blk.GetContainers()))
for _, container := range blk.GetContainers() {
id := container.GetID()
if blkContainerIDs.Contains(id) {
return fmt.Errorf("%w: duplicate in block", ErrDuplicateContainer)
}
blkContainerIDs.Add(id)
return fmt.Errorf("%w: contains %d duplicates out of %d containers", ErrDuplicateContainer, dup.BitLen(), len(blk.GetContainers()))
}
return nil
}

func (v *TimeValidityWindow[Container]) IsRepeat(
func (v *TimeValidityWindow[T]) IsRepeat(
ctx context.Context,
parentBlk ExecutionBlock[Container],
txs []Container,
oldestAllowed int64,
parentBlk ExecutionBlock[T],
currentTimestamp int64,
containers []T,
) (set.Bits, error) {
return v.isRepeat(ctx, parentBlk, oldestAllowed, txs, false)
_, span := v.tracer.Start(ctx, "Chain.IsRepeat")
defer span.End()
oldestAllowed := v.calculateOldestAllowed(currentTimestamp)
return v.isRepeat(ctx, parentBlk, oldestAllowed, containers, false)
}

func (v *TimeValidityWindow[Container]) isRepeat(
func (v *TimeValidityWindow[T]) isRepeat(
ctx context.Context,
ancestorBlk ExecutionBlock[Container],
ancestorBlk ExecutionBlock[T],
oldestAllowed int64,
containers []Container,
containers []T,
stop bool,
) (set.Bits, error) {
marker := set.NewBits()

_, span := v.tracer.Start(ctx, "Chain.isRepeat")
defer span.End()

v.lock.Lock()
defer v.lock.Unlock()

Expand Down Expand Up @@ -134,3 +175,7 @@ func (v *TimeValidityWindow[Container]) isRepeat(
}
}
}

func (v *TimeValidityWindow[T]) calculateOldestAllowed(timestamp int64) int64 {
return max(0, timestamp-v.getTimeValidityWindow(timestamp))
}
Loading

0 comments on commit ffd9f98

Please sign in to comment.