Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: poll proposer duties of next epoch in advance #5794

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 67 additions & 11 deletions packages/beacon-node/src/api/impl/validator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import {ApiModules} from "../types.js";
import {RegenCaller} from "../../../chain/regen/index.js";
import {getValidatorStatus} from "../beacon/state/utils.js";
import {validateGossipFnRetryUnknownRoot} from "../../../network/processor/gossipHandlers.js";
import {SCHEDULER_LOOKAHEAD_FACTOR} from "../../../chain/prepareNextSlot.js";
import {ChainEvent, CheckpointHex} from "../../../chain/index.js";
import {computeSubnetForCommitteesAtSlot, getPubkeysForIndices} from "./utils.js";

/**
Expand Down Expand Up @@ -66,9 +68,10 @@ export function getValidatorApi({
/**
* Validator clock may be advanced from beacon's clock. If the validator requests a resource in a
* future slot, wait some time instead of rejecting the request because it's in the future.
* This value is the same to MAXIMUM_GOSSIP_CLOCK_DISPARITY_SEC.
* For very fast networks, reduce clock disparity to half a slot.
*/
const MAX_API_CLOCK_DISPARITY_SEC = Math.min(1, config.SECONDS_PER_SLOT / 2);
const MAX_API_CLOCK_DISPARITY_SEC = Math.min(0.5, config.SECONDS_PER_SLOT / 2);
const MAX_API_CLOCK_DISPARITY_MS = MAX_API_CLOCK_DISPARITY_SEC * 1000;

/** Compute and cache the genesis block root */
Expand Down Expand Up @@ -118,19 +121,55 @@ export function getValidatorApi({
* Prevents a validator from not being able to get the attestater duties correctly if the beacon and validator clocks are off
*/
async function waitForNextClosestEpoch(): Promise<void> {
const toNextEpochMs = msToNextEpoch();
if (toNextEpochMs > 0 && toNextEpochMs < MAX_API_CLOCK_DISPARITY_MS) {
const nextEpoch = chain.clock.currentEpoch + 1;
await chain.clock.waitForSlot(computeStartSlotAtEpoch(nextEpoch));
}
}

/**
* Compute ms to the next epoch.
*/
function msToNextEpoch(): number {
const nextEpoch = chain.clock.currentEpoch + 1;
const secPerEpoch = SLOTS_PER_EPOCH * config.SECONDS_PER_SLOT;
const nextEpochStartSec = chain.genesisTime + nextEpoch * secPerEpoch;
const msToNextEpoch = nextEpochStartSec * 1000 - Date.now();
if (msToNextEpoch > 0 && msToNextEpoch < MAX_API_CLOCK_DISPARITY_MS) {
await chain.clock.waitForSlot(computeStartSlotAtEpoch(nextEpoch));
}
return nextEpochStartSec * 1000 - Date.now();
}

function currentEpochWithDisparity(): Epoch {
return computeEpochAtSlot(getCurrentSlot(config, chain.genesisTime - MAX_API_CLOCK_DISPARITY_SEC));
}

/**
* This function is called 1s before next epoch, usually at that time PrepareNextSlotScheduler finishes
* so we should have checkpoint state, otherwise wait for up to `timeoutMs`.
*/
async function waitForCheckpointState(
cpHex: CheckpointHex,
timeoutMs: number
): Promise<CachedBeaconStateAllForks | null> {
const cpState = chain.regen.getCheckpointStateSync(cpHex);
if (cpState) {
return cpState;
}
const cp = {
epoch: cpHex.epoch,
root: fromHexString(cpHex.rootHex),
};
// if not, wait for ChainEvent.checkpoint event until timeoutMs
return new Promise<CachedBeaconStateAllForks | null>((resolve) => {
const timer = setTimeout(() => resolve(null), timeoutMs);
chain.emitter.on(ChainEvent.checkpoint, (eventCp, cpState) => {
if (ssz.phase0.Checkpoint.equals(eventCp, cp)) {
clearTimeout(timer);
resolve(cpState);
}
});
});
}

/**
* Reject any request while the node is syncing
*/
Expand Down Expand Up @@ -387,15 +426,32 @@ export function getValidatorApi({

// Early check that epoch is within [current_epoch, current_epoch + 1], or allow for pre-genesis
const currentEpoch = currentEpochWithDisparity();
if (currentEpoch >= 0 && epoch !== currentEpoch && epoch !== currentEpoch + 1) {
throw Error(`Requested epoch ${epoch} must equal current ${currentEpoch} or next epoch ${currentEpoch + 1}`);
const nextEpoch = currentEpoch + 1;
if (currentEpoch >= 0 && epoch !== currentEpoch && epoch !== nextEpoch) {
throw Error(`Requested epoch ${epoch} must equal current ${currentEpoch} or next epoch ${nextEpoch}`);
}

// May request for an epoch that's in the future, for getBeaconProposersNextEpoch()
await waitForNextClosestEpoch();

const head = chain.forkChoice.getHead();
const state = await chain.getHeadStateAtCurrentEpoch(RegenCaller.getDuties);
let state: CachedBeaconStateAllForks | undefined = undefined;
const slotMs = config.SECONDS_PER_SLOT * 1000;
const prepareNextSlotLookAheadMs = slotMs / SCHEDULER_LOOKAHEAD_FACTOR;
const toNextEpochMs = msToNextEpoch();
// validators may request next epoch's duties when it's close to next epoch
// this is to avoid missed block proposal due to 0 epoch look ahead
if (epoch === nextEpoch && toNextEpochMs < prepareNextSlotLookAheadMs) {
// wait for maximum 1 slot for cp state which is the timeout of validator api
const cpState = await waitForCheckpointState({rootHex: head.blockRoot, epoch}, slotMs);
if (cpState) {
state = cpState;
metrics?.duties.requestNextEpochProposalDutiesHit.inc();
} else {
metrics?.duties.requestNextEpochProposalDutiesMiss.inc();
}
}

if (!state) {
state = await chain.getHeadStateAtCurrentEpoch(RegenCaller.getDuties);
}

const stateEpoch = state.epochCtx.epoch;
let indexes: ValidatorIndex[] = [];
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/chain/prepareNextSlot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {IBeaconChain} from "./interface.js";
import {RegenCaller} from "./regen/index.js";

/* With 12s slot times, this scheduler will run 4s before the start of each slot (`12 / 3 = 4`). */
const SCHEDULER_LOOKAHEAD_FACTOR = 3;
export const SCHEDULER_LOOKAHEAD_FACTOR = 3;

/* We don't want to do more epoch transition than this */
const PREPARE_EPOCH_LIMIT = 1;
Expand Down
11 changes: 11 additions & 0 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,17 @@ export function createLodestarMetrics(
}),
},

duties: {
requestNextEpochProposalDutiesHit: register.gauge({
name: "lodestar_duties_request_next_epoch_proposal_duties_hit_total",
help: "Total count of requestNextEpochProposalDuties hit",
}),
requestNextEpochProposalDutiesMiss: register.gauge({
name: "lodestar_duties_request_next_epoch_proposal_duties_miss_total",
help: "Total count of requestNextEpochProposalDuties miss",
}),
},

// Beacon state transition metrics

epochTransitionTime: register.histogram({
Expand Down
1 change: 1 addition & 0 deletions packages/validator/src/services/block.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ export class BlockProposingService {
private readonly metrics: Metrics | null
) {
this.dutiesService = new BlockDutiesService(
config,
logger,
api,
clock,
Expand Down
40 changes: 35 additions & 5 deletions packages/validator/src/services/blockDuties.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
import {toHexString} from "@chainsafe/ssz";
import {computeEpochAtSlot} from "@lodestar/state-transition";
import {computeEpochAtSlot, computeStartSlotAtEpoch} from "@lodestar/state-transition";
import {BLSPubkey, Epoch, RootHex, Slot} from "@lodestar/types";
import {Api, ApiError, routes} from "@lodestar/api";
import {sleep} from "@lodestar/utils";
import {ChainConfig} from "@lodestar/config";
import {IClock, differenceHex, LoggerVc} from "../util/index.js";
import {PubkeyHex} from "../types.js";
import {Metrics} from "../metrics.js";
import {ValidatorStore} from "./validatorStore.js";

/** This polls block duties 1s before the next epoch */
// TODO: change to 6 to do it 2s before the next epoch
// once we have some improvement on epoch transition time
// see https://github.com/ChainSafe/lodestar/issues/5792#issuecomment-1647457442
const BLOCK_DUTIES_LOOKAHEAD_FACTOR = 12;
/** Only retain `HISTORICAL_DUTIES_EPOCHS` duties prior to the current epoch */
const HISTORICAL_DUTIES_EPOCHS = 2;
// Re-declaring to not have to depend on `lodestar-params` just for this 0
Expand All @@ -24,9 +31,10 @@ export class BlockDutiesService {
private readonly proposers = new Map<Epoch, BlockDutyAtEpoch>();

constructor(
private readonly config: ChainConfig,
private readonly logger: LoggerVc,
private readonly api: Api,
clock: IClock,
private readonly clock: IClock,
private readonly validatorStore: ValidatorStore,
private readonly metrics: Metrics | null,
notifyBlockProductionFn: NotifyBlockProductionFn
Expand Down Expand Up @@ -75,7 +83,7 @@ export class BlockDutiesService {
}
}

private runBlockDutiesTask = async (slot: Slot): Promise<void> => {
private runBlockDutiesTask = async (slot: Slot, signal: AbortSignal): Promise<void> => {
try {
if (slot < 0) {
// Before genesis, fetch the genesis duties but don't notify block production
Expand All @@ -84,7 +92,7 @@ export class BlockDutiesService {
await this.pollBeaconProposers(GENESIS_EPOCH);
}
} else {
await this.pollBeaconProposersAndNotify(slot);
await this.pollBeaconProposersAndNotify(slot, signal);
}
} catch (e) {
this.logger.error("Error on pollBeaconProposers", {}, e as Error);
Expand Down Expand Up @@ -117,8 +125,17 @@ export class BlockDutiesService {
* through the slow path every time. I.e., the proposal will only happen after we've been able to
* download and process the duties from the BN. This means it is very important to ensure this
* function is as fast as possible.
* - Starting from Jul 2023, we poll proposers 1s before the next epoch thanks to PrepareNextSlotScheduler
* usually finishes in 3s.
*/
private async pollBeaconProposersAndNotify(currentSlot: Slot): Promise<void> {
private async pollBeaconProposersAndNotify(currentSlot: Slot, signal: AbortSignal): Promise<void> {
const nextEpoch = computeEpochAtSlot(currentSlot) + 1;
const isLastSlotEpoch = computeStartSlotAtEpoch(nextEpoch) === currentSlot + 1;
if (isLastSlotEpoch) {
// no need to await for other steps, just poll proposers for next epoch
void this.pollBeaconProposersNextEpoch(currentSlot, nextEpoch, signal);
}

// Notify the block proposal service for any proposals that we have in our cache.
const initialBlockProposers = this.getblockProposersAtSlot(currentSlot);
if (initialBlockProposers.length > 0) {
Expand All @@ -145,6 +162,19 @@ export class BlockDutiesService {
}
}

/**
* This is to avoid some delay on the first slot of the opoch when validators has proposal duties.
* See https://github.com/ChainSafe/lodestar/issues/5792
*/
private async pollBeaconProposersNextEpoch(currentSlot: Slot, nextEpoch: Epoch, signal: AbortSignal): Promise<void> {
const nextSlot = currentSlot + 1;
const lookAheadMs = (this.config.SECONDS_PER_SLOT * 1000) / BLOCK_DUTIES_LOOKAHEAD_FACTOR;
await sleep(this.clock.msToSlot(nextSlot) - lookAheadMs, signal);
this.logger.debug("Polling proposers for next epoch", {nextEpoch, nextSlot});
// Poll proposers for the next epoch
await this.pollBeaconProposers(nextEpoch);
}

private async pollBeaconProposers(epoch: Epoch): Promise<void> {
// Only download duties and push out additional block production events if we have some validators.
if (!this.validatorStore.hasSomeValidators()) {
Expand Down
31 changes: 28 additions & 3 deletions packages/validator/test/unit/services/blockDuties.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import bls from "@chainsafe/bls";
import {toHexString} from "@chainsafe/ssz";
import {RootHex} from "@lodestar/types";
import {HttpStatusCode, routes} from "@lodestar/api";
import {chainConfig} from "@lodestar/config/default";
import {toHex} from "@lodestar/utils";
import {BlockDutiesService} from "../../../src/services/blockDuties.js";
import {ValidatorStore} from "../../../src/services/validatorStore.js";
Expand Down Expand Up @@ -49,7 +50,15 @@ describe("BlockDutiesService", function () {
const notifyBlockProductionFn = sinon.stub(); // Returns void

const clock = new ClockMock();
const dutiesService = new BlockDutiesService(loggerVc, api, clock, validatorStore, null, notifyBlockProductionFn);
const dutiesService = new BlockDutiesService(
chainConfig,
loggerVc,
api,
clock,
validatorStore,
null,
notifyBlockProductionFn
);

// Trigger clock onSlot for slot 0
await clock.tickSlotFns(0, controller.signal);
Expand Down Expand Up @@ -84,7 +93,15 @@ describe("BlockDutiesService", function () {

// Clock will call runAttesterDutiesTasks() immediately
const clock = new ClockMock();
const dutiesService = new BlockDutiesService(loggerVc, api, clock, validatorStore, null, notifyBlockProductionFn);
const dutiesService = new BlockDutiesService(
chainConfig,
loggerVc,
api,
clock,
validatorStore,
null,
notifyBlockProductionFn
);

// Trigger clock onSlot for slot 0
api.validator.getProposerDuties.resolves({
Expand Down Expand Up @@ -151,7 +168,15 @@ describe("BlockDutiesService", function () {
const notifyBlockProductionFn = sinon.stub(); // Returns void

const clock = new ClockMock();
const dutiesService = new BlockDutiesService(loggerVc, api, clock, validatorStore, null, notifyBlockProductionFn);
const dutiesService = new BlockDutiesService(
chainConfig,
loggerVc,
api,
clock,
validatorStore,
null,
notifyBlockProductionFn
);

// Trigger clock onSlot for slot 0
await clock.tickSlotFns(0, controller.signal);
Expand Down
Loading