Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: verify attestation gossip messages in batch #5729

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,14 @@ export async function importBlock(

this.metrics?.importBlock.bySource.inc({source});
this.logger.verbose("Added block to forkchoice and state cache", {slot: block.message.slot, root: blockRootHex});
this.emitter.emit(routes.events.EventType.block, {
block: toHexString(this.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message)),
slot: block.message.slot,
executionOptimistic: blockSummary != null && isOptimisticBlock(blockSummary),
});
// We want to import block asap so call all event handler in the next event loop
setTimeout(() => {
this.emitter.emit(routes.events.EventType.block, {
block: toHexString(this.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message)),
slot: block.message.slot,
executionOptimistic: blockSummary != null && isOptimisticBlock(blockSummary),
});
}, 0);

// 3. Import attestations to fork choice
//
Expand Down
3 changes: 2 additions & 1 deletion packages/beacon-node/src/chain/bls/multithread/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
// THe worker is not able to deserialize from uncompressed
// `Error: err _wrapDeserialize`
this.format = implementation === "blst-native" ? PointFormat.uncompressed : PointFormat.compressed;
this.workers = this.createWorkers(implementation, defaultPoolSize);
// 1 worker for the main thread
this.workers = this.createWorkers(implementation, defaultPoolSize - 1);

if (metrics) {
metrics.blsThreadPool.queueLength.addCollect(() => {
Expand Down
5 changes: 5 additions & 0 deletions packages/beacon-node/src/chain/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export type IChainOptions = BlockProcessOpts &
/** Option to load a custom kzg trusted setup in txt format */
trustedSetup?: string;
broadcastValidationStrictness?: string;
minSameMessageSignatureSetsToBatch: number;
};

export type BlockProcessOpts = {
Expand Down Expand Up @@ -83,4 +84,8 @@ export const defaultChainOptions: IChainOptions = {
// for attestation validation, having this value ensures we don't have to regen states most of the time
maxSkipSlots: 32,
broadcastValidationStrictness: "warn",
// should be less than or equal to MIN_SIGNATURE_SETS_TO_BATCH_VERIFY
// batching too much may block the I/O thread so if useWorker=false, suggest this value to be 32
// since this batch attestation work is designed to work with useWorker=true, make this the lowest value
minSameMessageSignatureSetsToBatch: 2,
};
185 changes: 150 additions & 35 deletions packages/beacon-node/src/chain/validation/attestation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@ import {ATTESTATION_SUBNET_COUNT, SLOTS_PER_EPOCH, ForkName, ForkSeq} from "@lod
import {
computeEpochAtSlot,
CachedBeaconStateAllForks,
ISignatureSet,
getAttestationDataSigningRoot,
createSingleSignatureSetFromComponents,
SingleSignatureSet,
} from "@lodestar/state-transition";
import {IBeaconChain} from "..";
import {AttestationError, AttestationErrorCode, GossipAction} from "../errors/index.js";
import {MAXIMUM_GOSSIP_CLOCK_DISPARITY_SEC} from "../../constants/index.js";
import {RegenCaller} from "../regen/index.js";
Expand All @@ -21,6 +20,13 @@ import {
} from "../../util/sszBytes.js";
import {AttestationDataCacheEntry} from "../seenCache/seenAttestationData.js";
import {sszDeserializeAttestation} from "../../network/gossip/topic.js";
import {Result, wrapError} from "../../util/wrapError.js";
import {IBeaconChain} from "../interface.js";

export type BatchResult = {
results: Result<AttestationValidationResult>[];
batchableBls: boolean;
};

export type AttestationValidationResult = {
attestation: phase0.Attestation;
Expand All @@ -40,6 +46,12 @@ export type GossipAttestation = {
serializedData: Uint8Array;
// available in NetworkProcessor since we check for unknown block root attestations
attSlot: Slot;
attDataBase64?: string | null;
};

export type Phase0Result = AttestationValidationResult & {
signatureSet: SingleSignatureSet;
validatorIndex: number;
};

/**
Expand All @@ -48,13 +60,6 @@ export type GossipAttestation = {
*/
const SHUFFLING_LOOK_AHEAD_EPOCHS = 1;

/**
* Validate attestations from gossip
* - Only deserialize the attestation if needed, use the cached AttestationData instead
* - This is to avoid deserializing similar attestation multiple times which could help the gc
* - subnet is required
* - do not prioritize bls signature set
*/
export async function validateGossipAttestation(
fork: ForkName,
chain: IBeaconChain,
Expand All @@ -65,6 +70,105 @@ export async function validateGossipAttestation(
return validateAttestation(fork, chain, attestationOrBytes, subnet);
}

/**
* Verify gossip attestations of the same attestation data. The main advantage is we can batch verify bls signatures
* through verifySignatureSetsSameMessage bls api to improve performance.
* - If there are less than 2 signatures (minSameMessageSignatureSetsToBatch), verify each signature individually with batchable = true
* - do not prioritize bls signature set
*/
export async function validateGossipAttestationsSameAttData(
fork: ForkName,
chain: IBeaconChain,
attestationOrBytesArr: AttestationOrBytes[],
subnet: number,
// for unit test, consumers do not need to pass this
phase0ValidationFn = validateGossipAttestationNoSignatureCheck
): Promise<BatchResult> {
if (attestationOrBytesArr.length === 0) {
return {results: [], batchableBls: false};
}

// phase0: do all verifications except for signature verification
const phase0ResultOrErrors = await Promise.all(
attestationOrBytesArr.map((attestationOrBytes) =>
wrapError(phase0ValidationFn(fork, chain, attestationOrBytes, subnet))
)
);

// phase1: verify signatures of all valid attestations
// map new index to index in resultOrErrors
const newIndexToOldIndex = new Map<number, number>();
const signatureSets: SingleSignatureSet[] = [];
let newIndex = 0;
const phase0Results: Phase0Result[] = [];
for (const [i, resultOrError] of phase0ResultOrErrors.entries()) {
if (resultOrError.err) {
continue;
}
phase0Results.push(resultOrError.result);
newIndexToOldIndex.set(newIndex, i);
signatureSets.push(resultOrError.result.signatureSet);
newIndex++;
}

let signatureValids: boolean[];
const batchableBls = signatureSets.length >= chain.opts.minSameMessageSignatureSetsToBatch;
if (batchableBls) {
// all signature sets should have same signing root since we filtered in network processor
signatureValids = await chain.bls.verifySignatureSetsSameMessage(
signatureSets.map((set) => ({publicKey: set.pubkey, signature: set.signature})),
signatureSets[0].signingRoot
);
} else {
// don't want to block the main thread if there are too few signatures
signatureValids = await Promise.all(
signatureSets.map((set) => chain.bls.verifySignatureSets([set], {batchable: true}))
);
}

// phase0 post validation
for (const [i, sigValid] of signatureValids.entries()) {
const oldIndex = newIndexToOldIndex.get(i);
if (oldIndex == null) {
// should not happen
throw Error(`Cannot get old index for index ${i}`);
}

const {validatorIndex, attestation} = phase0Results[i];
const targetEpoch = attestation.data.target.epoch;
if (sigValid) {
// Now that the attestation has been fully verified, store that we have received a valid attestation from this validator.
//
// It's important to double check that the attestation still hasn't been observed, since
// there can be a race-condition if we receive two attestations at the same time and
// process them in different threads.
if (chain.seenAttesters.isKnown(targetEpoch, validatorIndex)) {
phase0ResultOrErrors[oldIndex] = {
err: new AttestationError(GossipAction.IGNORE, {
code: AttestationErrorCode.ATTESTATION_ALREADY_KNOWN,
targetEpoch,
validatorIndex,
}),
};
}

// valid
chain.seenAttesters.add(targetEpoch, validatorIndex);
} else {
phase0ResultOrErrors[oldIndex] = {
err: new AttestationError(GossipAction.IGNORE, {
code: AttestationErrorCode.INVALID_SIGNATURE,
}),
};
}
}

return {
results: phase0ResultOrErrors,
batchableBls,
};
}

/**
* Validate attestations from api
* - no need to deserialize attestation
Expand All @@ -81,17 +185,42 @@ export async function validateApiAttestation(
}

/**
* Only deserialize the attestation if needed, use the cached AttestationData instead
* This is to avoid deserializing similar attestation multiple times which could help the gc
* Validate a single unaggregated attestation
* subnet is null for api attestations
*/
async function validateAttestation(
export async function validateAttestation(
fork: ForkName,
chain: IBeaconChain,
attestationOrBytes: AttestationOrBytes,
/** Optional, to allow verifying attestations through API with unknown subnet */
subnet: number | null,
prioritizeBls = false
): Promise<AttestationValidationResult> {
const phase0Result = await validateGossipAttestationNoSignatureCheck(fork, chain, attestationOrBytes, subnet);
const {attestation, signatureSet, validatorIndex} = phase0Result;
const isValid = await chain.bls.verifySignatureSets([signatureSet], {batchable: true, priority: prioritizeBls});

if (isValid) {
const targetEpoch = attestation.data.target.epoch;
chain.seenAttesters.add(targetEpoch, validatorIndex);
return phase0Result;
} else {
throw new AttestationError(GossipAction.IGNORE, {
code: AttestationErrorCode.INVALID_SIGNATURE,
});
}
}

/**
* Only deserialize the attestation if needed, use the cached AttestationData instead
* This is to avoid deserializing similar attestation multiple times which could help the gc
*/
async function validateGossipAttestationNoSignatureCheck(
fork: ForkName,
chain: IBeaconChain,
attestationOrBytes: AttestationOrBytes,
/** Optional, to allow verifying attestations through API with unknown subnet */
subnet: number | null
): Promise<Phase0Result> {
// Do checks in this order:
// - do early checks (w/o indexed attestation)
// - > obtain indexed attestation and committes per slot
Expand All @@ -105,11 +234,14 @@ async function validateAttestation(
let attestationOrCache:
| {attestation: phase0.Attestation; cache: null}
| {attestation: null; cache: AttestationDataCacheEntry; serializedData: Uint8Array};
let attDataBase64: AttDataBase64 | null;
let attDataBase64: AttDataBase64 | null = null;
if (attestationOrBytes.serializedData) {
// gossip
attDataBase64 = getAttDataBase64FromAttestationSerialized(attestationOrBytes.serializedData);
const attSlot = attestationOrBytes.attSlot;
// for old LIFO linear gossip queue we don't have attDataBase64
// for indexed gossip queue we have attDataBase64
attDataBase64 =
attestationOrBytes.attDataBase64 ?? getAttDataBase64FromAttestationSerialized(attestationOrBytes.serializedData);
const cachedAttData = attDataBase64 !== null ? chain.seenAttestationDatas.get(attSlot, attDataBase64) : null;
if (cachedAttData === null) {
const attestation = sszDeserializeAttestation(attestationOrBytes.serializedData);
Expand Down Expand Up @@ -263,7 +395,7 @@ async function validateAttestation(

// [REJECT] The signature of attestation is valid.
const attestingIndices = [validatorIndex];
let signatureSet: ISignatureSet;
let signatureSet: SingleSignatureSet;
let attDataRootHex: RootHex;
const signature = attestationOrCache.attestation
? attestationOrCache.attestation.signature
Expand Down Expand Up @@ -304,24 +436,7 @@ async function validateAttestation(
}
}

if (!(await chain.bls.verifySignatureSets([signatureSet], {batchable: true, priority: prioritizeBls}))) {
throw new AttestationError(GossipAction.REJECT, {code: AttestationErrorCode.INVALID_SIGNATURE});
}

// Now that the attestation has been fully verified, store that we have received a valid attestation from this validator.
//
// It's important to double check that the attestation still hasn't been observed, since
// there can be a race-condition if we receive two attestations at the same time and
// process them in different threads.
if (chain.seenAttesters.isKnown(targetEpoch, validatorIndex)) {
throw new AttestationError(GossipAction.IGNORE, {
code: AttestationErrorCode.ATTESTATION_ALREADY_KNOWN,
targetEpoch,
validatorIndex,
});
}

chain.seenAttesters.add(targetEpoch, validatorIndex);
// no signature check, leave that for phase1

const indexedAttestation: phase0.IndexedAttestation = {
attestingIndices,
Expand All @@ -336,7 +451,7 @@ async function validateAttestation(
data: attData,
signature,
};
return {attestation, indexedAttestation, subnet: expectedSubnet, attDataRootHex};
return {attestation, indexedAttestation, subnet: expectedSubnet, attDataRootHex, signatureSet, validatorIndex};
}

/**
Expand Down
15 changes: 12 additions & 3 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ export function createLodestarMetrics(
help: "Count of total gossip validation queue length",
labelNames: ["topic"],
}),
dropRatio: register.gauge<"topic">({
name: "lodestar_gossip_validation_queue_current_drop_ratio",
help: "Current drop ratio of gossip validation queue",
keySize: register.gauge<"topic">({
name: "lodestar_gossip_validation_queue_key_size",
help: "Count of total gossip validation queue key size",
labelNames: ["topic"],
}),
droppedJobs: register.gauge<"topic">({
Expand Down Expand Up @@ -575,6 +575,15 @@ export function createLodestarMetrics(
labelNames: ["caller"],
buckets: [0, 1, 2, 4, 8, 16, 32, 64],
}),
attestationBatchHistogram: register.histogram({
name: "lodestar_gossip_attestation_verified_in_batch_histogram",
help: "Number of attestations verified in batch",
buckets: [1, 2, 4, 8, 16, 32, 64, 128],
}),
attestationNonBatchCount: register.gauge({
name: "lodestar_gossip_attestation_verified_non_batch_count",
help: "Count of attestations NOT verified in batch",
}),
},

// Gossip block
Expand Down
Loading
Loading