Skip to content

Commit

Permalink
Update attestor and aggregator respect epbs intervals (#14454)
Browse files Browse the repository at this point in the history
  • Loading branch information
terencechain authored and potuz committed Nov 1, 2024
1 parent 8be174b commit a98d6e6
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 24 deletions.
20 changes: 11 additions & 9 deletions validator/client/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (v *validator) SubmitAggregateAndProof(ctx context.Context, slot primitives
// As specified in spec, an aggregator should wait until two thirds of the way through slot
// to broadcast the best aggregate to the global aggregate channel.
// https://github.com/ethereum/consensus-specs/blob/v0.9.3/specs/validator/0_beacon-chain-validator.md#broadcast-aggregate
v.waitToSlotTwoThirds(ctx, slot)
v.waitAggregatorDuty(ctx, slot)

postElectra := slots.ToEpoch(slot) >= params.BeaconConfig().ElectraForkEpoch

Expand Down Expand Up @@ -199,16 +199,18 @@ func (v *validator) signSlotWithSelectionProof(ctx context.Context, pubKey [fiel
return sig.Marshal(), nil
}

// waitToSlotTwoThirds waits until two third through the current slot period
// such that any attestations from this slot have time to reach the beacon node
// before creating the aggregated attestation.
func (v *validator) waitToSlotTwoThirds(ctx context.Context, slot primitives.Slot) {
ctx, span := trace.StartSpan(ctx, "validator.waitToSlotTwoThirds")
// waitAggregatorDuty waits for aggregator duty given the slot intervals.
func (v *validator) waitAggregatorDuty(ctx context.Context, slot primitives.Slot) {
ctx, span := trace.StartSpan(ctx, "validator.waitAggregatorDuty")
defer span.End()

oneThird := slots.DivideSlotBy(3 /* one third of slot duration */)
twoThird := oneThird + oneThird
delay := twoThird
var delay time.Duration
if slots.ToEpoch(slot) >= params.BeaconConfig().EPBSForkEpoch {
delay = slots.DivideSlotBy(int64(params.BeaconConfig().IntervalsPerSlotEPBS))
} else {
delay = slots.DivideSlotBy(int64(params.BeaconConfig().IntervalsPerSlot))
}
delay *= 2

startTime := slots.StartTime(v.genesisTime, slot)
finalTime := startTime.Add(delay)
Expand Down
24 changes: 22 additions & 2 deletions validator/client/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,13 +261,33 @@ func TestWaitForSlotTwoThird_WaitCorrectly(t *testing.T) {
timeToSleep := oneThird + oneThird

twoThirdTime := currentTime.Add(timeToSleep)
validator.waitToSlotTwoThirds(context.Background(), numOfSlots)
validator.waitAggregatorDuty(context.Background(), numOfSlots)
currentTime = time.Now()
assert.Equal(t, twoThirdTime.Unix(), currentTime.Unix())
})
}
}

func TestServer_WaitAggregatorDutyEpbs_CanWait(t *testing.T) {
params.SetupTestConfigCleanup(t)
cfg := params.BeaconConfig()
cfg.EPBSForkEpoch = 0
params.OverrideBeaconConfig(cfg)

validator, _, _, finish := setup(t, true)
defer finish()
currentTime := time.Now()
numOfSlots := primitives.Slot(4)
validator.genesisTime = uint64(currentTime.Unix()) - uint64(numOfSlots.Mul(params.BeaconConfig().SecondsPerSlot))
delay := slots.DivideSlotBy(int64(params.BeaconConfig().IntervalsPerSlotEPBS))
timeToSleep := delay + delay

twoThirdTime := currentTime.Add(timeToSleep)
validator.waitAggregatorDuty(context.Background(), numOfSlots)
currentTime = time.Now()
assert.Equal(t, twoThirdTime.Unix(), currentTime.Unix())
}

func TestWaitForSlotTwoThird_DoneContext_ReturnsImmediately(t *testing.T) {
for _, isSlashingProtectionMinimal := range [...]bool{false, true} {
t.Run(fmt.Sprintf("SlashingProtectionMinimal:%v", isSlashingProtectionMinimal), func(t *testing.T) {
Expand All @@ -280,7 +300,7 @@ func TestWaitForSlotTwoThird_DoneContext_ReturnsImmediately(t *testing.T) {
expectedTime := time.Now()
ctx, cancel := context.WithCancel(context.Background())
cancel()
validator.waitToSlotTwoThirds(ctx, numOfSlots)
validator.waitAggregatorDuty(ctx, numOfSlots)
currentTime = time.Now()
assert.Equal(t, expectedTime.Unix(), currentTime.Unix())
})
Expand Down
19 changes: 11 additions & 8 deletions validator/client/attest.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (v *validator) SubmitAttestation(ctx context.Context, slot primitives.Slot,
defer span.End()
span.SetAttributes(trace.StringAttribute("validator", fmt.Sprintf("%#x", pubKey)))

v.waitOneThirdOrValidBlock(ctx, slot)
v.waitAttesterDuty(ctx, slot)

var b strings.Builder
if err := b.WriteByte(byte(iface.RoleAttester)); err != nil {
Expand Down Expand Up @@ -282,20 +282,23 @@ func (v *validator) setHighestSlot(slot primitives.Slot) {
}
}

// waitOneThirdOrValidBlock waits until (a) or (b) whichever comes first:
//
// (a) the validator has received a valid block that is the same slot as input slot
// (b) one-third of the slot has transpired (SECONDS_PER_SLOT / 3 seconds after the start of slot)
func (v *validator) waitOneThirdOrValidBlock(ctx context.Context, slot primitives.Slot) {
ctx, span := trace.StartSpan(ctx, "validator.waitOneThirdOrValidBlock")
// waitAttesterDuty waits for attestor duty given the slot intervals and if current slot block has arrived.
func (v *validator) waitAttesterDuty(ctx context.Context, slot primitives.Slot) {
ctx, span := trace.StartSpan(ctx, "validator.waitAttesterDuty")
defer span.End()

// Don't need to wait if requested slot is the same as highest valid slot.
if slot <= v.highestSlot() {
return
}

delay := slots.DivideSlotBy(3 /* a third of the slot duration */)
var delay time.Duration
if slots.ToEpoch(slot) >= params.BeaconConfig().EPBSForkEpoch {
delay = slots.DivideSlotBy(int64(params.BeaconConfig().IntervalsPerSlotEPBS))
} else {
delay = slots.DivideSlotBy(int64(params.BeaconConfig().IntervalsPerSlot))
}

startTime := slots.StartTime(v.genesisTime, slot)
finalTime := startTime.Add(delay)
wait := prysmTime.Until(finalTime)
Expand Down
30 changes: 27 additions & 3 deletions validator/client/attest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,13 +614,37 @@ func TestServer_WaitToSlotOneThird_CanWait(t *testing.T) {

timeToSleep := params.BeaconConfig().SecondsPerSlot / 3
oneThird := currentTime + timeToSleep
v.waitOneThirdOrValidBlock(context.Background(), currentSlot)
v.waitAttesterDuty(context.Background(), currentSlot)

if oneThird != uint64(time.Now().Unix()) {
t.Errorf("Wanted %d time for slot one third but got %d", oneThird, currentTime)
}
}

func TestServer_WaitAttesterDutyEpbs_CanWait(t *testing.T) {
params.SetupTestConfigCleanup(t)
cfg := params.BeaconConfig()
cfg.EPBSForkEpoch = 0
params.OverrideBeaconConfig(cfg)

currentTime := uint64(time.Now().Unix())
currentSlot := primitives.Slot(4)
genesisTime := currentTime - uint64(currentSlot.Mul(params.BeaconConfig().SecondsPerSlot))

v := &validator{
genesisTime: genesisTime,
slotFeed: new(event.Feed),
}

timeToSleep := params.BeaconConfig().SecondsPerSlot / params.BeaconConfig().IntervalsPerSlotEPBS
dutyTime := currentTime + timeToSleep
v.waitAttesterDuty(context.Background(), currentSlot)

if dutyTime != uint64(time.Now().Unix()) {
t.Errorf("Wanted %d time for slot one third but got %d", dutyTime, currentTime)
}
}

func TestServer_WaitToSlotOneThird_SameReqSlot(t *testing.T) {
currentTime := uint64(time.Now().Unix())
currentSlot := primitives.Slot(4)
Expand All @@ -632,7 +656,7 @@ func TestServer_WaitToSlotOneThird_SameReqSlot(t *testing.T) {
highestValidSlot: currentSlot,
}

v.waitOneThirdOrValidBlock(context.Background(), currentSlot)
v.waitAttesterDuty(context.Background(), currentSlot)

if currentTime != uint64(time.Now().Unix()) {
t.Errorf("Wanted %d time for slot one third but got %d", uint64(time.Now().Unix()), currentTime)
Expand Down Expand Up @@ -660,7 +684,7 @@ func TestServer_WaitToSlotOneThird_ReceiveBlockSlot(t *testing.T) {
wg.Done()
}()

v.waitOneThirdOrValidBlock(context.Background(), currentSlot)
v.waitAttesterDuty(context.Background(), currentSlot)

if currentTime != uint64(time.Now().Unix()) {
t.Errorf("Wanted %d time for slot one third but got %d", uint64(time.Now().Unix()), currentTime)
Expand Down
4 changes: 2 additions & 2 deletions validator/client/sync_committee.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (v *validator) SubmitSyncCommitteeMessage(ctx context.Context, slot primiti
defer span.End()
span.SetAttributes(trace.StringAttribute("validator", fmt.Sprintf("%#x", pubKey)))

v.waitOneThirdOrValidBlock(ctx, slot)
v.waitAttesterDuty(ctx, slot)

res, err := v.validatorClient.SyncMessageBlockRoot(ctx, &emptypb.Empty{})
if err != nil {
Expand Down Expand Up @@ -125,7 +125,7 @@ func (v *validator) SubmitSignedContributionAndProof(ctx context.Context, slot p
return
}

v.waitToSlotTwoThirds(ctx, slot)
v.waitAggregatorDuty(ctx, slot)

for i, comIdx := range indexRes.Indices {
isAggregator, err := altair.IsSyncCommitteeAggregator(selectionProofs[i])
Expand Down

0 comments on commit a98d6e6

Please sign in to comment.