Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update attestor and aggregator to respect epbs intervals #14454

Merged
merged 1 commit into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions validator/client/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (v *validator) SubmitAggregateAndProof(ctx context.Context, slot primitives
// As specified in spec, an aggregator should wait until two thirds of the way through slot
// to broadcast the best aggregate to the global aggregate channel.
// https://github.com/ethereum/consensus-specs/blob/v0.9.3/specs/validator/0_beacon-chain-validator.md#broadcast-aggregate
v.waitToSlotTwoThirds(ctx, slot)
v.waitAggregatorDuty(ctx, slot)

postElectra := slots.ToEpoch(slot) >= params.BeaconConfig().ElectraForkEpoch

Expand Down Expand Up @@ -199,16 +199,18 @@ func (v *validator) signSlotWithSelectionProof(ctx context.Context, pubKey [fiel
return sig.Marshal(), nil
}

// waitToSlotTwoThirds waits until two third through the current slot period
// such that any attestations from this slot have time to reach the beacon node
// before creating the aggregated attestation.
func (v *validator) waitToSlotTwoThirds(ctx context.Context, slot primitives.Slot) {
ctx, span := trace.StartSpan(ctx, "validator.waitToSlotTwoThirds")
// waitAggregatorDuty waits for aggregator duty given the slot intervals.
func (v *validator) waitAggregatorDuty(ctx context.Context, slot primitives.Slot) {
ctx, span := trace.StartSpan(ctx, "validator.waitAggregatorDuty")
defer span.End()

oneThird := slots.DivideSlotBy(3 /* one third of slot duration */)
twoThird := oneThird + oneThird
delay := twoThird
var delay time.Duration
if slots.ToEpoch(slot) >= params.BeaconConfig().EPBSForkEpoch {
delay = slots.DivideSlotBy(int64(params.BeaconConfig().IntervalsPerSlotEPBS))
} else {
delay = slots.DivideSlotBy(int64(params.BeaconConfig().IntervalsPerSlot))
}
delay *= 2

startTime := slots.StartTime(v.genesisTime, slot)
finalTime := startTime.Add(delay)
Expand Down
24 changes: 22 additions & 2 deletions validator/client/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,13 +261,33 @@ func TestWaitForSlotTwoThird_WaitCorrectly(t *testing.T) {
timeToSleep := oneThird + oneThird

twoThirdTime := currentTime.Add(timeToSleep)
validator.waitToSlotTwoThirds(context.Background(), numOfSlots)
validator.waitAggregatorDuty(context.Background(), numOfSlots)
currentTime = time.Now()
assert.Equal(t, twoThirdTime.Unix(), currentTime.Unix())
})
}
}

func TestServer_WaitAggregatorDutyEpbs_CanWait(t *testing.T) {
params.SetupTestConfigCleanup(t)
cfg := params.BeaconConfig()
cfg.EPBSForkEpoch = 0
params.OverrideBeaconConfig(cfg)

validator, _, _, finish := setup(t, true)
defer finish()
currentTime := time.Now()
numOfSlots := primitives.Slot(4)
validator.genesisTime = uint64(currentTime.Unix()) - uint64(numOfSlots.Mul(params.BeaconConfig().SecondsPerSlot))
delay := slots.DivideSlotBy(int64(params.BeaconConfig().IntervalsPerSlotEPBS))
timeToSleep := delay + delay

twoThirdTime := currentTime.Add(timeToSleep)
validator.waitAggregatorDuty(context.Background(), numOfSlots)
currentTime = time.Now()
assert.Equal(t, twoThirdTime.Unix(), currentTime.Unix())
}

func TestWaitForSlotTwoThird_DoneContext_ReturnsImmediately(t *testing.T) {
for _, isSlashingProtectionMinimal := range [...]bool{false, true} {
t.Run(fmt.Sprintf("SlashingProtectionMinimal:%v", isSlashingProtectionMinimal), func(t *testing.T) {
Expand All @@ -280,7 +300,7 @@ func TestWaitForSlotTwoThird_DoneContext_ReturnsImmediately(t *testing.T) {
expectedTime := time.Now()
ctx, cancel := context.WithCancel(context.Background())
cancel()
validator.waitToSlotTwoThirds(ctx, numOfSlots)
validator.waitAggregatorDuty(ctx, numOfSlots)
currentTime = time.Now()
assert.Equal(t, expectedTime.Unix(), currentTime.Unix())
})
Expand Down
19 changes: 11 additions & 8 deletions validator/client/attest.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (v *validator) SubmitAttestation(ctx context.Context, slot primitives.Slot,
defer span.End()
span.SetAttributes(trace.StringAttribute("validator", fmt.Sprintf("%#x", pubKey)))

v.waitOneThirdOrValidBlock(ctx, slot)
v.waitAttesterDuty(ctx, slot)

var b strings.Builder
if err := b.WriteByte(byte(iface.RoleAttester)); err != nil {
Expand Down Expand Up @@ -282,20 +282,23 @@ func (v *validator) setHighestSlot(slot primitives.Slot) {
}
}

// waitOneThirdOrValidBlock waits until (a) or (b) whichever comes first:
//
// (a) the validator has received a valid block that is the same slot as input slot
// (b) one-third of the slot has transpired (SECONDS_PER_SLOT / 3 seconds after the start of slot)
func (v *validator) waitOneThirdOrValidBlock(ctx context.Context, slot primitives.Slot) {
ctx, span := trace.StartSpan(ctx, "validator.waitOneThirdOrValidBlock")
// waitAttesterDuty waits for attestor duty given the slot intervals and if current slot block has arrived.
func (v *validator) waitAttesterDuty(ctx context.Context, slot primitives.Slot) {
ctx, span := trace.StartSpan(ctx, "validator.waitAttesterDuty")
defer span.End()

// Don't need to wait if requested slot is the same as highest valid slot.
if slot <= v.highestSlot() {
return
}

delay := slots.DivideSlotBy(3 /* a third of the slot duration */)
var delay time.Duration
if slots.ToEpoch(slot) >= params.BeaconConfig().EPBSForkEpoch {
delay = slots.DivideSlotBy(int64(params.BeaconConfig().IntervalsPerSlotEPBS))
} else {
delay = slots.DivideSlotBy(int64(params.BeaconConfig().IntervalsPerSlot))
}

startTime := slots.StartTime(v.genesisTime, slot)
finalTime := startTime.Add(delay)
wait := prysmTime.Until(finalTime)
Expand Down
30 changes: 27 additions & 3 deletions validator/client/attest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,13 +614,37 @@ func TestServer_WaitToSlotOneThird_CanWait(t *testing.T) {

timeToSleep := params.BeaconConfig().SecondsPerSlot / 3
oneThird := currentTime + timeToSleep
v.waitOneThirdOrValidBlock(context.Background(), currentSlot)
v.waitAttesterDuty(context.Background(), currentSlot)

if oneThird != uint64(time.Now().Unix()) {
t.Errorf("Wanted %d time for slot one third but got %d", oneThird, currentTime)
}
}

func TestServer_WaitAttesterDutyEpbs_CanWait(t *testing.T) {
params.SetupTestConfigCleanup(t)
cfg := params.BeaconConfig()
cfg.EPBSForkEpoch = 0
params.OverrideBeaconConfig(cfg)

currentTime := uint64(time.Now().Unix())
currentSlot := primitives.Slot(4)
genesisTime := currentTime - uint64(currentSlot.Mul(params.BeaconConfig().SecondsPerSlot))

v := &validator{
genesisTime: genesisTime,
slotFeed: new(event.Feed),
}

timeToSleep := params.BeaconConfig().SecondsPerSlot / params.BeaconConfig().IntervalsPerSlotEPBS
dutyTime := currentTime + timeToSleep
v.waitAttesterDuty(context.Background(), currentSlot)

if dutyTime != uint64(time.Now().Unix()) {
t.Errorf("Wanted %d time for slot one third but got %d", dutyTime, currentTime)
}
}

func TestServer_WaitToSlotOneThird_SameReqSlot(t *testing.T) {
currentTime := uint64(time.Now().Unix())
currentSlot := primitives.Slot(4)
Expand All @@ -632,7 +656,7 @@ func TestServer_WaitToSlotOneThird_SameReqSlot(t *testing.T) {
highestValidSlot: currentSlot,
}

v.waitOneThirdOrValidBlock(context.Background(), currentSlot)
v.waitAttesterDuty(context.Background(), currentSlot)

if currentTime != uint64(time.Now().Unix()) {
t.Errorf("Wanted %d time for slot one third but got %d", uint64(time.Now().Unix()), currentTime)
Expand Down Expand Up @@ -660,7 +684,7 @@ func TestServer_WaitToSlotOneThird_ReceiveBlockSlot(t *testing.T) {
wg.Done()
}()

v.waitOneThirdOrValidBlock(context.Background(), currentSlot)
v.waitAttesterDuty(context.Background(), currentSlot)

if currentTime != uint64(time.Now().Unix()) {
t.Errorf("Wanted %d time for slot one third but got %d", uint64(time.Now().Unix()), currentTime)
Expand Down
4 changes: 2 additions & 2 deletions validator/client/sync_committee.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (v *validator) SubmitSyncCommitteeMessage(ctx context.Context, slot primiti
defer span.End()
span.SetAttributes(trace.StringAttribute("validator", fmt.Sprintf("%#x", pubKey)))

v.waitOneThirdOrValidBlock(ctx, slot)
v.waitAttesterDuty(ctx, slot)

res, err := v.validatorClient.SyncMessageBlockRoot(ctx, &emptypb.Empty{})
if err != nil {
Expand Down Expand Up @@ -125,7 +125,7 @@ func (v *validator) SubmitSignedContributionAndProof(ctx context.Context, slot p
return
}

v.waitToSlotTwoThirds(ctx, slot)
v.waitAggregatorDuty(ctx, slot)

for i, comIdx := range indexRes.Indices {
isAggregator, err := altair.IsSyncCommitteeAggregator(selectionProofs[i])
Expand Down
Loading