-
-
Notifications
You must be signed in to change notification settings - Fork 287
/
attestationDuties.ts
209 lines (182 loc) Β· 8.15 KB
/
attestationDuties.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
import {computeEpochAtSlot, isAggregatorFromCommitteeLength} from "@chainsafe/lodestar-beacon-state-transition";
import {BLSSignature, Epoch, Root, Slot, ssz, ValidatorIndex} from "@chainsafe/lodestar-types";
import {Api, routes} from "@chainsafe/lodestar-api";
import {toHexString} from "@chainsafe/ssz";
import {IndicesService} from "./indices";
import {IClock, extendError, ILoggerVc} from "../util";
import {ValidatorStore} from "./validatorStore";
/** Only retain `HISTORICAL_DUTIES_EPOCHS` duties prior to the current epoch. */
const HISTORICAL_DUTIES_EPOCHS = 2;
/** Neatly joins the server-generated `AttesterData` with the locally-generated `selectionProof`. */
export type AttDutyAndProof = {
duty: routes.validator.AttesterDuty;
/** This value is only set to not null if the proof indicates that the validator is an aggregator. */
selectionProof: BLSSignature | null;
};
// To assist with readability
type AttDutyAtEpoch = {dependentRoot: Root; dutyAndProof: AttDutyAndProof};
export class AttestationDutiesService {
/** Maps a validator public key to their duties for each epoch */
private readonly dutiesByEpochByIndex = new Map<ValidatorIndex, Map<Epoch, AttDutyAtEpoch>>();
constructor(
private readonly logger: ILoggerVc,
private readonly api: Api,
clock: IClock,
private readonly validatorStore: ValidatorStore,
private readonly indicesService: IndicesService
) {
// Running this task every epoch is safe since a re-org of two epochs is very unlikely
// TODO: If the re-org event is reliable consider re-running then
clock.runEveryEpoch(this.runDutiesTasks);
}
/** Returns all `ValidatorDuty` for the given `slot` */
getDutiesAtSlot(slot: Slot): AttDutyAndProof[] {
const epoch = computeEpochAtSlot(slot);
const duties: AttDutyAndProof[] = [];
for (const dutiesByEpoch of this.dutiesByEpochByIndex.values()) {
const dutyAtEpoch = dutiesByEpoch.get(epoch);
if (dutyAtEpoch && dutyAtEpoch.dutyAndProof.duty.slot === slot) {
duties.push(dutyAtEpoch.dutyAndProof);
}
}
return duties;
}
private runDutiesTasks = async (epoch: Epoch): Promise<void> => {
await Promise.all([
// Run pollBeaconAttesters immediately for all known local indices
this.pollBeaconAttesters(epoch, this.indicesService.getAllLocalIndices()).catch((e: Error) => {
this.logger.error("Error on poll attesters", {epoch}, e);
}),
// At the same time fetch any remaining unknown validator indices, then poll duties for those newIndices only
this.indicesService
.pollValidatorIndices()
.then((newIndices) => this.pollBeaconAttesters(epoch, newIndices))
.catch((e: Error) => {
this.logger.error("Error on poll indices and attesters", {epoch}, e);
}),
]);
// After both, prune
this.pruneOldDuties(epoch);
};
/**
* Query the beacon node for attestation duties for any known validators.
*
* This function will perform (in the following order):
*
* 1. Poll for current-epoch duties and update the local duties map.
* 2. As above, but for the next-epoch.
* 3. Push out any attestation subnet subscriptions to the BN.
* 4. Prune old entries from duties.
*/
private async pollBeaconAttesters(currentEpoch: Epoch, indexArr: ValidatorIndex[]): Promise<void> {
const nextEpoch = currentEpoch + 1;
// No need to bother the BN if we don't have any validators.
if (indexArr.length === 0) {
return;
}
for (const epoch of [currentEpoch, nextEpoch]) {
// Download the duties and update the duties for the current and next epoch.
await this.pollBeaconAttestersForEpoch(epoch, indexArr).catch((e: Error) => {
this.logger.error("Failed to download attester duties", {epoch}, e);
});
}
const beaconCommitteeSubscriptions: routes.validator.BeaconCommitteeSubscription[] = [];
// For this epoch and the next epoch, produce any beacon committee subscriptions.
//
// We are *always* pushing out subscriptions, even if we've subscribed before. This is
// potentially excessive on the BN in normal cases, but it will help with fast re-subscriptions
// if the BN goes offline or we swap to a different one.
const indexSet = new Set(indexArr);
for (const epoch of [currentEpoch, nextEpoch]) {
for (const dutiesByEpoch of this.dutiesByEpochByIndex.values()) {
const dutyAtEpoch = dutiesByEpoch.get(epoch);
if (dutyAtEpoch) {
const {duty, selectionProof} = dutyAtEpoch.dutyAndProof;
if (indexSet.has(duty.validatorIndex)) {
beaconCommitteeSubscriptions.push({
validatorIndex: duty.validatorIndex,
committeesAtSlot: duty.committeesAtSlot,
committeeIndex: duty.committeeIndex,
slot: duty.slot,
isAggregator: selectionProof !== null,
});
}
}
}
}
// If there are any subscriptions, push them out to the beacon node.
if (beaconCommitteeSubscriptions.length > 0) {
// TODO: Should log or throw?
await this.api.validator.prepareBeaconCommitteeSubnet(beaconCommitteeSubscriptions).catch((e: Error) => {
throw extendError(e, "Failed to subscribe to beacon committee subnets");
});
}
}
/**
* For the given `indexArr`, download the duties for the given `epoch` and store them in duties.
*/
private async pollBeaconAttestersForEpoch(epoch: Epoch, indexArr: ValidatorIndex[]): Promise<void> {
// Don't fetch duties for epochs before genesis. However, should fetch epoch 0 duties at epoch -1
if (epoch < 0) {
return;
}
// TODO: Implement dependentRoot logic
const attesterDuties = await this.api.validator.getAttesterDuties(epoch, indexArr).catch((e: Error) => {
throw extendError(e, "Failed to obtain attester duty");
});
const dependentRoot = attesterDuties.dependentRoot;
const relevantDuties = attesterDuties.data.filter((duty) =>
this.validatorStore.hasVotingPubkey(toHexString(duty.pubkey))
);
this.logger.debug("Downloaded attester duties", {
epoch,
dependentRoot: toHexString(dependentRoot),
count: relevantDuties.length,
});
let alreadyWarnedReorg = false;
for (const duty of relevantDuties) {
let dutiesByEpoch = this.dutiesByEpochByIndex.get(duty.validatorIndex);
if (!dutiesByEpoch) {
dutiesByEpoch = new Map<Epoch, AttDutyAtEpoch>();
this.dutiesByEpochByIndex.set(duty.validatorIndex, dutiesByEpoch);
}
// Only update the duties if either is true:
//
// - There were no known duties for this epoch.
// - The dependent root has changed, signalling a re-org.
const prior = dutiesByEpoch.get(epoch);
const dependentRootChanged = prior && !ssz.Root.equals(prior.dependentRoot, dependentRoot);
if (!prior || dependentRootChanged) {
const dutyAndProof = await this.getDutyAndProof(duty);
// Using `alreadyWarnedReorg` avoids excessive logs.
dutiesByEpoch.set(epoch, {dependentRoot, dutyAndProof});
if (prior && dependentRootChanged && !alreadyWarnedReorg) {
alreadyWarnedReorg = true;
this.logger.warn("Attester duties re-org. This may happen from time to time", {
priorDependentRoot: toHexString(prior.dependentRoot),
dependentRoot: toHexString(dependentRoot),
});
}
}
}
}
private async getDutyAndProof(duty: routes.validator.AttesterDuty): Promise<AttDutyAndProof> {
const selectionProof = await this.validatorStore.signAttestationSelectionProof(duty.pubkey, duty.slot);
const isAggregator = isAggregatorFromCommitteeLength(duty.committeeLength, selectionProof);
return {
duty,
// selectionProof === null is used to check if is aggregator
selectionProof: isAggregator ? selectionProof : null,
};
}
/** Run once per epoch to prune duties map */
private pruneOldDuties(currentEpoch: Epoch): void {
for (const attMap of this.dutiesByEpochByIndex.values()) {
for (const epoch of attMap.keys()) {
if (epoch + HISTORICAL_DUTIES_EPOCHS < currentEpoch) {
attMap.delete(epoch);
}
}
}
}
}