Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 34 additions & 10 deletions packages/validator/src/services/attestationDuties.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,16 +219,7 @@ export class AttestationDutiesService {
}

// If there are any subscriptions, push them out to the beacon node.
if (beaconCommitteeSubscriptions.length > 0) {
const subscriptionsBatches = batchItems(beaconCommitteeSubscriptions, {batchSize: SUBSCRIPTIONS_PER_REQUEST});
const responses = await Promise.all(
subscriptionsBatches.map((subscriptions) => this.api.validator.prepareBeaconCommitteeSubnet({subscriptions}))
);

for (const res of responses) {
res.assertOk();
}
}
await this.subscribeToBeaconCommitteeSubnets(beaconCommitteeSubscriptions);
}

/**
Expand Down Expand Up @@ -371,6 +362,39 @@ export class AttestationDutiesService {
.catch((e: Error) => {
this.logger.error("Failed to redownload attester duties when reorg happens", logContext, e);
});

const beaconCommitteeSubscriptions: routes.validator.BeaconCommitteeSubscription[] = [];
const epochDuties = this.dutiesByIndexByEpoch.get(dutyEpoch)?.dutiesByIndex;

if (epochDuties) {
for (const {duty, selectionProof} of epochDuties.values()) {
beaconCommitteeSubscriptions.push({
validatorIndex: duty.validatorIndex,
committeesAtSlot: duty.committeesAtSlot,
committeeIndex: duty.committeeIndex,
slot: duty.slot,
isAggregator: selectionProof !== null,
});
}
}

// Previous subscriptions are no longer valid and need to be updated
await this.subscribeToBeaconCommitteeSubnets(beaconCommitteeSubscriptions);
}

private async subscribeToBeaconCommitteeSubnets(
beaconCommitteeSubscriptions: routes.validator.BeaconCommitteeSubscription[]
): Promise<void> {
if (beaconCommitteeSubscriptions.length > 0) {
const subscriptionsBatches = batchItems(beaconCommitteeSubscriptions, {batchSize: SUBSCRIPTIONS_PER_REQUEST});
const responses = await Promise.all(
subscriptionsBatches.map((subscriptions) => this.api.validator.prepareBeaconCommitteeSubnet({subscriptions}))
);

for (const res of responses) {
res.assertOk();
}
}
}

private async getDutyAndProof(duty: routes.validator.AttesterDuty): Promise<AttDutyAndProof> {
Expand Down
213 changes: 212 additions & 1 deletion packages/validator/test/unit/services/attestationDuties.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import {SecretKey} from "@chainsafe/blst";
import {toHexString} from "@chainsafe/ssz";
import {routes} from "@lodestar/api";
import {chainConfig} from "@lodestar/config/default";
import {SLOTS_PER_EPOCH} from "@lodestar/params";
import {computeEpochAtSlot} from "@lodestar/state-transition";
import {ssz} from "@lodestar/types";
import {toRootHex} from "@lodestar/utils";
import {AttestationDutiesService} from "../../../src/services/attestationDuties.js";
import {ChainHeaderTracker} from "../../../src/services/chainHeaderTracker.js";
import {ChainHeaderTracker, HeadEventData} from "../../../src/services/chainHeaderTracker.js";
import {SyncingStatusTracker} from "../../../src/services/syncingStatusTracker.js";
import {ValidatorStore} from "../../../src/services/validatorStore.js";
import {getApiClientStub, mockApiResponse} from "../../utils/apiStub.js";
Expand Down Expand Up @@ -241,4 +243,213 @@ describe("AttestationDutiesService", () => {

expect(api.validator.prepareBeaconCommitteeSubnet).toHaveBeenCalledOnce();
});

describe("Reorg handling", () => {
const oldDependentRoot = toRootHex(Buffer.alloc(32, 1));
const newDependentRoot = toRootHex(Buffer.alloc(32, 2));
const headBlockRoot = toRootHex(Buffer.alloc(32, 3));

let clock: ClockMock;
let dutiesService: AttestationDutiesService;
let onNewHeadCallback: (headEvent: HeadEventData) => Promise<void>;

beforeEach(() => {
api.validator.prepareBeaconCommitteeSubnet.mockResolvedValue(mockApiResponse({}));

clock = new ClockMock();
const syncingStatusTracker = new SyncingStatusTracker(loggerVc, api, clock, null);

vi.spyOn(chainHeadTracker, "runOnNewHead");
chainHeadTracker.runOnNewHead.mockImplementation((callback) => {
onNewHeadCallback = callback;
});

dutiesService = new AttestationDutiesService(
loggerVc,
api,
clock,
validatorStore,
chainHeadTracker,
syncingStatusTracker,
null
);
});

it("Should resubscribe to beacon subnets when current epoch dependent root changes", async () => {
const slot = 5;
const currentEpoch = computeEpochAtSlot(slot);

const duty: routes.validator.AttesterDuty = {
slot,
committeeIndex: 1,
committeeLength: 120,
committeesAtSlot: 120,
validatorCommitteeIndex: 1,
validatorIndex: index,
pubkey: pubkeys[0],
};

api.validator.getAttesterDuties.mockResolvedValue(
mockApiResponse({
data: [duty],
meta: {dependentRoot: oldDependentRoot, executionOptimistic: false},
})
);

await clock.tickEpochFns(currentEpoch, controller.signal);

expect(dutiesService["dutiesByIndexByEpoch"].get(currentEpoch)?.dutiesByIndex.get(index)?.duty).toEqual(duty);
expect(api.validator.prepareBeaconCommitteeSubnet).toHaveBeenCalledTimes(1);

const reorgedDuty: routes.validator.AttesterDuty = {...duty, slot: slot + 1, committeeIndex: 3};
api.validator.getAttesterDuties.mockResolvedValue(
mockApiResponse({
data: [reorgedDuty],
meta: {dependentRoot: newDependentRoot, executionOptimistic: false},
})
);

await onNewHeadCallback({
slot,
head: headBlockRoot,
previousDutyDependentRoot: newDependentRoot,
currentDutyDependentRoot: oldDependentRoot,
});

expect(api.validator.prepareBeaconCommitteeSubnet).toHaveBeenCalledTimes(2);
expect(api.validator.prepareBeaconCommitteeSubnet).toHaveBeenLastCalledWith({
subscriptions: [
{
validatorIndex: reorgedDuty.validatorIndex,
committeesAtSlot: reorgedDuty.committeesAtSlot,
committeeIndex: reorgedDuty.committeeIndex,
slot: reorgedDuty.slot,
isAggregator: false,
},
],
});
expect(dutiesService["dutiesByIndexByEpoch"].get(currentEpoch)?.dutiesByIndex.get(index)?.duty).toEqual(
reorgedDuty
);
});

it("Should resubscribe to beacon subnets when next epoch dependent root changes", async () => {
const slot = 5;
const currentEpoch = computeEpochAtSlot(slot);
const nextEpoch = currentEpoch + 1;

const currentEpochDuty: routes.validator.AttesterDuty = {
slot,
committeeIndex: 1,
committeeLength: 120,
committeesAtSlot: 120,
validatorCommitteeIndex: 1,
validatorIndex: index,
pubkey: pubkeys[0],
};

const nextEpochDuty: routes.validator.AttesterDuty = {
slot: slot + SLOTS_PER_EPOCH,
committeeIndex: 2,
committeeLength: 120,
committeesAtSlot: 120,
validatorCommitteeIndex: 1,
validatorIndex: index,
pubkey: pubkeys[0],
};

// First call for current epoch
api.validator.getAttesterDuties.mockResolvedValueOnce(
mockApiResponse({
data: [currentEpochDuty],
meta: {dependentRoot: oldDependentRoot, executionOptimistic: false},
})
);

// Second call for next epoch
api.validator.getAttesterDuties.mockResolvedValueOnce(
mockApiResponse({
data: [nextEpochDuty],
meta: {dependentRoot: oldDependentRoot, executionOptimistic: false},
})
);

await clock.tickEpochFns(currentEpoch, controller.signal);

expect(dutiesService["dutiesByIndexByEpoch"].get(currentEpoch)?.dutiesByIndex.get(index)?.duty).toEqual(
currentEpochDuty
);
expect(dutiesService["dutiesByIndexByEpoch"].get(nextEpoch)?.dutiesByIndex.get(index)?.duty).toEqual(
nextEpochDuty
);
expect(api.validator.prepareBeaconCommitteeSubnet).toHaveBeenCalledTimes(1);

const reorgedNextEpochDuty: routes.validator.AttesterDuty = {...nextEpochDuty, committeeIndex: 4};
api.validator.getAttesterDuties.mockResolvedValue(
mockApiResponse({
data: [reorgedNextEpochDuty],
meta: {dependentRoot: newDependentRoot, executionOptimistic: false},
})
);

await onNewHeadCallback({
slot,
head: headBlockRoot,
previousDutyDependentRoot: oldDependentRoot,
currentDutyDependentRoot: newDependentRoot,
});

expect(api.validator.prepareBeaconCommitteeSubnet).toHaveBeenCalledTimes(2);
expect(api.validator.prepareBeaconCommitteeSubnet).toHaveBeenLastCalledWith({
subscriptions: [
{
validatorIndex: reorgedNextEpochDuty.validatorIndex,
committeesAtSlot: reorgedNextEpochDuty.committeesAtSlot,
committeeIndex: reorgedNextEpochDuty.committeeIndex,
slot: reorgedNextEpochDuty.slot,
isAggregator: false,
},
],
});
expect(dutiesService["dutiesByIndexByEpoch"].get(nextEpoch)?.dutiesByIndex.get(index)?.duty).toEqual(
reorgedNextEpochDuty
);
});

it("Should not resubscribe to beacon subnets when dependent root is unchanged", async () => {
const slot = 5;
const currentEpoch = computeEpochAtSlot(slot);

const duty: routes.validator.AttesterDuty = {
slot,
committeeIndex: 1,
committeeLength: 120,
committeesAtSlot: 120,
validatorCommitteeIndex: 1,
validatorIndex: index,
pubkey: pubkeys[0],
};

api.validator.getAttesterDuties.mockResolvedValue(
mockApiResponse({
data: [duty],
meta: {dependentRoot: oldDependentRoot, executionOptimistic: false},
})
);

await clock.tickEpochFns(currentEpoch, controller.signal);

expect(dutiesService["dutiesByIndexByEpoch"].get(currentEpoch)?.dutiesByIndex.get(index)?.duty).toEqual(duty);
const initialCalls = api.validator.prepareBeaconCommitteeSubnet.mock.calls.length;

await onNewHeadCallback({
slot,
head: headBlockRoot,
previousDutyDependentRoot: oldDependentRoot,
currentDutyDependentRoot: oldDependentRoot,
});

expect(api.validator.prepareBeaconCommitteeSubnet).toHaveBeenCalledTimes(initialCalls);
});
});
});
Loading