Skip to content

Commit

Permalink
feat: support fetching historical proposer duties (#7130)
Browse files Browse the repository at this point in the history
* feat: support fetching historical proposer duties

* Clone state before creating cached beacon state

* Fix error message

* Update test cases

* Skip syncing pubkeys and sync committe cache

* Update proposers tests epoch

* Use switch/case instead of if/else

* Add comment to clarify when head state can be used

* Use loadState instead of creating a separate ViewDU

* Clarify not yet initialized error for prev proposer duties

* Assert loaded state epoch matches requested
  • Loading branch information
nflaig authored Oct 11, 2024
1 parent ac6edd3 commit cbc7c90
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 34 deletions.
74 changes: 60 additions & 14 deletions packages/beacon-node/src/api/impl/validator/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import {PubkeyIndexMap} from "@chainsafe/pubkey-index-map";
import {routes} from "@lodestar/api";
import {ApplicationMethods} from "@lodestar/api/server";
import {
Expand All @@ -10,6 +11,8 @@ import {
computeEpochAtSlot,
getCurrentSlot,
beaconBlockToBlinded,
createCachedBeaconState,
loadState,
} from "@lodestar/state-transition";
import {
GENESIS_SLOT,
Expand Down Expand Up @@ -62,6 +65,7 @@ import {validateSyncCommitteeGossipContributionAndProof} from "../../../chain/va
import {CommitteeSubscription} from "../../../network/subnets/index.js";
import {ApiModules} from "../types.js";
import {RegenCaller} from "../../../chain/regen/index.js";
import {getStateResponseWithRegen} from "../beacon/state/utils.js";
import {validateGossipFnRetryUnknownRoot} from "../../../network/processor/gossipHandlers.js";
import {SCHEDULER_LOOKAHEAD_FACTOR} from "../../../chain/prepareNextSlot.js";
import {ChainEvent, CheckpointHex, CommonBlockBody} from "../../../chain/index.js";
Expand Down Expand Up @@ -888,15 +892,16 @@ export function getValidatorApi(
async getProposerDuties({epoch}) {
notWhileSyncing();

// Early check that epoch is within [current_epoch, current_epoch + 1], or allow for pre-genesis
// Early check that epoch is no more than current_epoch + 1, or allow for pre-genesis
const currentEpoch = currentEpochWithDisparity();
const nextEpoch = currentEpoch + 1;
if (currentEpoch >= 0 && epoch !== currentEpoch && epoch !== nextEpoch) {
throw Error(`Requested epoch ${epoch} must equal current ${currentEpoch} or next epoch ${nextEpoch}`);
if (currentEpoch >= 0 && epoch > nextEpoch) {
throw new ApiError(400, `Requested epoch ${epoch} must not be more than one epoch in the future`);
}

const head = chain.forkChoice.getHead();
let state: CachedBeaconStateAllForks | undefined = undefined;
const startSlot = computeStartSlotAtEpoch(epoch);
const slotMs = config.SECONDS_PER_SLOT * 1000;
const prepareNextSlotLookAheadMs = slotMs / SCHEDULER_LOOKAHEAD_FACTOR;
const toNextEpochMs = msToNextEpoch();
Expand All @@ -914,21 +919,63 @@ export function getValidatorApi(
}

if (!state) {
state = await chain.getHeadStateAtCurrentEpoch(RegenCaller.getDuties);
if (epoch >= currentEpoch - 1) {
// Cached beacon state stores proposers for previous, current and next epoch. The
// requested epoch is within that range, we can use the head state at current epoch
state = await chain.getHeadStateAtCurrentEpoch(RegenCaller.getDuties);
} else {
const res = await getStateResponseWithRegen(chain, startSlot);

const stateViewDU =
res.state instanceof Uint8Array
? loadState(config, chain.getHeadState(), res.state).state
: res.state.clone();

state = createCachedBeaconState(
stateViewDU,
{
config: chain.config,
// Not required to compute proposers
pubkey2index: new PubkeyIndexMap(),
index2pubkey: [],
},
{skipSyncPubkeys: true, skipSyncCommitteeCache: true}
);

if (state.epochCtx.epoch !== epoch) {
throw Error(`Loaded state epoch ${state.epochCtx.epoch} does not match requested epoch ${epoch}`);
}
}
}

const stateEpoch = state.epochCtx.epoch;
let indexes: ValidatorIndex[] = [];

if (epoch === stateEpoch) {
indexes = state.epochCtx.getBeaconProposers();
} else if (epoch === stateEpoch + 1) {
// Requesting duties for next epoch is allow since they can be predicted with high probabilities.
// @see `epochCtx.getBeaconProposersNextEpoch` JSDocs for rationale.
indexes = state.epochCtx.getBeaconProposersNextEpoch();
} else {
// Should never happen, epoch is checked to be in bounds above
throw Error(`Proposer duties for epoch ${epoch} not supported, current epoch ${stateEpoch}`);
switch (epoch) {
case stateEpoch:
indexes = state.epochCtx.getBeaconProposers();
break;

case stateEpoch + 1:
// Requesting duties for next epoch is allowed since they can be predicted with high probabilities.
// @see `epochCtx.getBeaconProposersNextEpoch` JSDocs for rationale.
indexes = state.epochCtx.getBeaconProposersNextEpoch();
break;

case stateEpoch - 1: {
const indexesPrevEpoch = state.epochCtx.getBeaconProposersPrevEpoch();
if (indexesPrevEpoch === null) {
// Should not happen as previous proposer duties should be initialized for head state
// and if we load state from `Uint8Array` it will always be the state of requested epoch
throw Error(`Proposer duties for previous epoch ${epoch} not yet initialized`);
}
indexes = indexesPrevEpoch;
break;
}

default:
// Should never happen, epoch is checked to be in bounds above
throw Error(`Proposer duties for epoch ${epoch} not supported, current epoch ${stateEpoch}`);
}

// NOTE: this is the fastest way of getting compressed pubkeys.
Expand All @@ -937,7 +984,6 @@ export function getValidatorApi(
// TODO: Add a flag to just send 0x00 as pubkeys since the Lodestar validator does not need them.
const pubkeys = getPubkeysForIndices(state.validators, indexes);

const startSlot = computeStartSlotAtEpoch(epoch);
const duties: routes.validator.ProposerDuty[] = [];
for (let i = 0; i < SLOTS_PER_EPOCH; i++) {
duties.push({slot: startSlot + i, validatorIndex: indexes[i], pubkey: pubkeys[i]});
Expand Down
4 changes: 4 additions & 0 deletions packages/beacon-node/test/mocks/mockedBeaconChain.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {vi, Mocked, Mock} from "vitest";
import {PubkeyIndexMap} from "@chainsafe/pubkey-index-map";
import {config as defaultConfig} from "@lodestar/config/default";
import {ChainForkConfig} from "@lodestar/config";
import {ForkChoice, ProtoBlock, EpochDifference} from "@lodestar/fork-choice";
Expand Down Expand Up @@ -126,6 +127,8 @@ vi.mock("../../src/chain/chain.js", async (importActual) => {
// @ts-expect-error
beaconProposerCache: new BeaconProposerCache(),
shufflingCache: new ShufflingCache(),
pubkey2index: new PubkeyIndexMap(),
index2pubkey: [],
produceCommonBlockBody: vi.fn(),
getProposerHead: vi.fn(),
produceBlock: vi.fn(),
Expand All @@ -135,6 +138,7 @@ vi.mock("../../src/chain/chain.js", async (importActual) => {
predictProposerHead: vi.fn(),
getHeadStateAtCurrentEpoch: vi.fn(),
getHeadState: vi.fn(),
getStateBySlot: vi.fn(),
updateBuilderStatus: vi.fn(),
processBlock: vi.fn(),
regenStateForAttestationVerification: vi.fn(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {routes} from "@lodestar/api";
import {config} from "@lodestar/config/default";
import {MAX_EFFECTIVE_BALANCE, SLOTS_PER_EPOCH} from "@lodestar/params";
import {BeaconStateAllForks} from "@lodestar/state-transition";
import {Slot} from "@lodestar/types";
import {ApiTestModules, getApiTestModules} from "../../../../../utils/api.js";
import {FAR_FUTURE_EPOCH} from "../../../../../../src/constants/index.js";
import {SYNC_TOLERANCE_EPOCHS, getValidatorApi} from "../../../../../../src/api/impl/validator/index.js";
Expand All @@ -13,19 +14,34 @@ import {SyncState} from "../../../../../../src/sync/interface.js";
import {defaultApiOptions} from "../../../../../../src/api/options.js";

describe("get proposers api impl", function () {
const currentEpoch = 2;
const currentSlot = SLOTS_PER_EPOCH * currentEpoch;

let api: ReturnType<typeof getValidatorApi>;
let modules: ApiTestModules;
let state: BeaconStateAllForks;
let cachedState: ReturnType<typeof createCachedBeaconStateTest>;

beforeEach(function () {
vi.useFakeTimers({now: 0});
vi.advanceTimersByTime(currentSlot * config.SECONDS_PER_SLOT * 1000);
modules = getApiTestModules({clock: "real"});
api = getValidatorApi(defaultApiOptions, modules);

initializeState(currentSlot);

modules.chain.getHeadStateAtCurrentEpoch.mockResolvedValue(cachedState);
modules.forkChoice.getHead.mockReturnValue(zeroProtoBlock);
modules.forkChoice.getFinalizedBlock.mockReturnValue(zeroProtoBlock);
modules.db.block.get.mockResolvedValue({message: {stateRoot: Buffer.alloc(32)}} as any);

vi.spyOn(modules.sync, "state", "get").mockReturnValue(SyncState.Synced);
});

function initializeState(slot: Slot): void {
state = generateState(
{
slot: 0,
slot,
validators: generateValidators(25, {
effectiveBalance: MAX_EFFECTIVE_BALANCE,
activationEpoch: 0,
Expand All @@ -37,14 +53,10 @@ describe("get proposers api impl", function () {
);
cachedState = createCachedBeaconStateTest(state, config);

modules.chain.getHeadStateAtCurrentEpoch.mockResolvedValue(cachedState);
modules.forkChoice.getHead.mockReturnValue(zeroProtoBlock);
modules.db.block.get.mockResolvedValue({message: {stateRoot: Buffer.alloc(32)}} as any);

vi.spyOn(modules.sync, "state", "get").mockReturnValue(SyncState.Synced);
vi.spyOn(cachedState.epochCtx, "getBeaconProposersNextEpoch");
vi.spyOn(cachedState.epochCtx, "getBeaconProposers");
});
vi.spyOn(cachedState.epochCtx, "getBeaconProposersPrevEpoch");
}

afterEach(() => {
vi.useRealTimers();
Expand All @@ -54,7 +66,7 @@ describe("get proposers api impl", function () {
vi.advanceTimersByTime((SYNC_TOLERANCE_EPOCHS * SLOTS_PER_EPOCH + 1) * config.SECONDS_PER_SLOT * 1000);
vi.spyOn(modules.sync, "state", "get").mockReturnValue(SyncState.SyncingHead);

await expect(api.getProposerDuties({epoch: 1})).rejects.toThrow("Node is syncing - headSlot 0 currentSlot 9");
await expect(api.getProposerDuties({epoch: 1})).rejects.toThrow("Node is syncing - headSlot 0 currentSlot 25");
});

it("should raise error if node stalled", async () => {
Expand All @@ -65,34 +77,61 @@ describe("get proposers api impl", function () {
});

it("should get proposers for current epoch", async () => {
const {data: result} = (await api.getProposerDuties({epoch: 0})) as {data: routes.validator.ProposerDutyList};
const {data: result} = (await api.getProposerDuties({epoch: currentEpoch})) as {
data: routes.validator.ProposerDutyList;
};

expect(result.length).toBe(SLOTS_PER_EPOCH);
expect(cachedState.epochCtx.getBeaconProposers).toHaveBeenCalledOnce();
expect(cachedState.epochCtx.getBeaconProposersNextEpoch).not.toHaveBeenCalled();
expect(result.map((p) => p.slot)).toEqual(Array.from({length: SLOTS_PER_EPOCH}, (_, i) => i));
expect(cachedState.epochCtx.getBeaconProposersPrevEpoch).not.toHaveBeenCalled();
expect(result.map((p) => p.slot)).toEqual(
Array.from({length: SLOTS_PER_EPOCH}, (_, i) => currentEpoch * SLOTS_PER_EPOCH + i)
);
});

it("should get proposers for next epoch", async () => {
const {data: result} = (await api.getProposerDuties({epoch: 1})) as {data: routes.validator.ProposerDutyList};
const nextEpoch = currentEpoch + 1;
const {data: result} = (await api.getProposerDuties({epoch: nextEpoch})) as {
data: routes.validator.ProposerDutyList;
};

expect(result.length).toBe(SLOTS_PER_EPOCH);
expect(cachedState.epochCtx.getBeaconProposers).not.toHaveBeenCalled();
expect(cachedState.epochCtx.getBeaconProposersNextEpoch).toHaveBeenCalledOnce();
expect(result.map((p) => p.slot)).toEqual(Array.from({length: SLOTS_PER_EPOCH}, (_, i) => SLOTS_PER_EPOCH + i));
expect(cachedState.epochCtx.getBeaconProposersPrevEpoch).not.toHaveBeenCalled();
expect(result.map((p) => p.slot)).toEqual(
Array.from({length: SLOTS_PER_EPOCH}, (_, i) => nextEpoch * SLOTS_PER_EPOCH + i)
);
});

it("should get proposers for historical epoch", async () => {
const historicalEpoch = currentEpoch - 2;
initializeState(currentSlot - 2 * SLOTS_PER_EPOCH);
modules.chain.getStateBySlot.mockResolvedValue({state, executionOptimistic: false, finalized: true});

const {data: result} = (await api.getProposerDuties({epoch: historicalEpoch})) as {
data: routes.validator.ProposerDutyList;
};

expect(result.length).toBe(SLOTS_PER_EPOCH);
// Spy won't be called as `getProposerDuties` will create a new cached beacon state
expect(result.map((p) => p.slot)).toEqual(
Array.from({length: SLOTS_PER_EPOCH}, (_, i) => historicalEpoch * SLOTS_PER_EPOCH + i)
);
});

it("should raise error for more than one epoch in the future", async () => {
await expect(api.getProposerDuties({epoch: 2})).rejects.toThrow(
"Requested epoch 2 must equal current 0 or next epoch 1"
await expect(api.getProposerDuties({epoch: currentEpoch + 2})).rejects.toThrow(
"Requested epoch 4 must not be more than one epoch in the future"
);
});

it("should have different proposer validator public keys for current and next epoch", async () => {
const {data: currentProposers} = (await api.getProposerDuties({epoch: 0})) as {
const {data: currentProposers} = (await api.getProposerDuties({epoch: currentEpoch})) as {
data: routes.validator.ProposerDutyList;
};
const {data: nextProposers} = (await api.getProposerDuties({epoch: 1})) as {
const {data: nextProposers} = (await api.getProposerDuties({epoch: currentEpoch + 1})) as {
data: routes.validator.ProposerDutyList;
};

Expand All @@ -101,21 +140,21 @@ describe("get proposers api impl", function () {
});

it("should have different proposer validator indexes for current and next epoch", async () => {
const {data: currentProposers} = (await api.getProposerDuties({epoch: 0})) as {
const {data: currentProposers} = (await api.getProposerDuties({epoch: currentEpoch})) as {
data: routes.validator.ProposerDutyList;
};
const {data: nextProposers} = (await api.getProposerDuties({epoch: 1})) as {
const {data: nextProposers} = (await api.getProposerDuties({epoch: currentEpoch + 1})) as {
data: routes.validator.ProposerDutyList;
};

expect(currentProposers.map((p) => p.validatorIndex)).not.toEqual(nextProposers.map((p) => p.validatorIndex));
});

it("should have different proposer slots for current and next epoch", async () => {
const {data: currentProposers} = (await api.getProposerDuties({epoch: 0})) as {
const {data: currentProposers} = (await api.getProposerDuties({epoch: currentEpoch})) as {
data: routes.validator.ProposerDutyList;
};
const {data: nextProposers} = (await api.getProposerDuties({epoch: 1})) as {
const {data: nextProposers} = (await api.getProposerDuties({epoch: currentEpoch + 1})) as {
data: routes.validator.ProposerDutyList;
};

Expand Down
4 changes: 4 additions & 0 deletions packages/state-transition/src/cache/epochCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,10 @@ export class EpochCache {
return this.proposers;
}

getBeaconProposersPrevEpoch(): ValidatorIndex[] | null {
return this.proposersPrevEpoch;
}

/**
* We allow requesting proposal duties 1 epoch in the future as in normal network conditions it's possible to predict
* the correct shuffling with high probability. While knowing the proposers in advance is not useful for consensus,
Expand Down

0 comments on commit cbc7c90

Please sign in to comment.