Skip to content

Commit

Permalink
feat: validate gossip attestations same att data in batch
Browse files Browse the repository at this point in the history
  • Loading branch information
twoeths committed Aug 15, 2023
1 parent 4d5f777 commit 8966fbb
Show file tree
Hide file tree
Showing 18 changed files with 891 additions and 260 deletions.
5 changes: 4 additions & 1 deletion packages/beacon-node/src/chain/errors/attestationError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ export enum AttestationErrorCode {
INVALID_SERIALIZED_BYTES = "ATTESTATION_ERROR_INVALID_SERIALIZED_BYTES",
/** Too many skipped slots. */
TOO_MANY_SKIPPED_SLOTS = "ATTESTATION_ERROR_TOO_MANY_SKIPPED_SLOTS",
/** attDataBase64 is not available */
NO_INDEXED_DATA = "ATTESTATION_ERROR_NO_INDEXED_DATA",
}

export type AttestationErrorType =
Expand Down Expand Up @@ -166,7 +168,8 @@ export type AttestationErrorType =
| {code: AttestationErrorCode.INVALID_AGGREGATOR}
| {code: AttestationErrorCode.INVALID_INDEXED_ATTESTATION}
| {code: AttestationErrorCode.INVALID_SERIALIZED_BYTES}
| {code: AttestationErrorCode.TOO_MANY_SKIPPED_SLOTS; headBlockSlot: Slot; attestationSlot: Slot};
| {code: AttestationErrorCode.TOO_MANY_SKIPPED_SLOTS; headBlockSlot: Slot; attestationSlot: Slot}
| {code: AttestationErrorCode.NO_INDEXED_DATA};

export class AttestationError extends GossipActionError<AttestationErrorType> {
getMetadata(): Record<string, string | number | null> {
Expand Down
200 changes: 168 additions & 32 deletions packages/beacon-node/src/chain/validation/attestation.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import {toHexString} from "@chainsafe/ssz";
import bls from "@chainsafe/bls";
import {phase0, Epoch, Root, Slot, RootHex, ssz} from "@lodestar/types";
import {ProtoBlock} from "@lodestar/fork-choice";
import {ATTESTATION_SUBNET_COUNT, SLOTS_PER_EPOCH, ForkName, ForkSeq} from "@lodestar/params";
import {
computeEpochAtSlot,
CachedBeaconStateAllForks,
ISignatureSet,
getAttestationDataSigningRoot,
createSingleSignatureSetFromComponents,
SingleSignatureSet,
} from "@lodestar/state-transition";
import {IBeaconChain} from "..";
import {AttestationError, AttestationErrorCode, GossipAction} from "../errors/index.js";
Expand All @@ -16,11 +17,19 @@ import {RegenCaller} from "../regen/index.js";
import {
AttDataBase64,
getAggregationBitsFromAttestationSerialized,
getAttDataBase64FromAttestationSerialized,
getSignatureFromAttestationSerialized,
} from "../../util/sszBytes.js";
import {AttestationDataCacheEntry} from "../seenCache/seenAttestationData.js";
import {sszDeserializeAttestation} from "../../network/gossip/topic.js";
import {Result, wrapError} from "../../util/wrapError.js";
import {MIN_SIGNATURE_SETS_TO_BATCH_VERIFY} from "../../network/processor/gossipQueues/index.js";
import {signatureFromBytesNoCheck} from "../opPools/utils.js";

export type BatchResult = {
results: Result<AttestationValidationResult>[];
batchableBls: boolean;
fallbackBls: boolean;
};

export type AttestationValidationResult = {
attestation: phase0.Attestation;
Expand All @@ -40,6 +49,12 @@ export type GossipAttestation = {
serializedData: Uint8Array;
// available in NetworkProcessor since we check for unknown block root attestations
attSlot: Slot;
attDataBase64?: string | null;
};

export type Phase0Result = AttestationValidationResult & {
signatureSet: SingleSignatureSet;
validatorIndex: number;
};

/**
Expand All @@ -49,11 +64,13 @@ export type GossipAttestation = {
const SHUFFLING_LOOK_AHEAD_EPOCHS = 1;

/**
* Validate attestations from gossip
* - Only deserialize the attestation if needed, use the cached AttestationData instead
* - This is to avoid deserializing similar attestation multiple times which could help the gc
* - subnet is required
* - do not prioritize bls signature set
* Verify gossip attestations of the same attestation data.
* - If there are less than 32 signatures, verify each signature individually with batchable = true
* - If there are not less than 32 signatures
* - do a quick verify by aggregate all signatures and pubkeys, this takes 4.6ms for 32 signatures and 7.6ms for 64 signatures
* - if one of the signature is invalid, do a fallback verify by verify each signature individually with batchable = false
* - subnet is required
* - do not prioritize bls signature set
*/
export async function validateGossipAttestation(
fork: ForkName,
Expand All @@ -65,6 +82,112 @@ export async function validateGossipAttestation(
return validateAttestation(fork, chain, attestationOrBytes, subnet);
}

export async function validateGossipAttestationsSameAttData(
fork: ForkName,
chain: IBeaconChain,
attestationOrBytesArr: AttestationOrBytes[],
subnet: number,
// for unit test, consumers do not need to pass this
phase0ValidationFn = validateGossipAttestationNoSignatureCheck
): Promise<BatchResult> {
if (attestationOrBytesArr.length === 0) {
return {results: [], batchableBls: false, fallbackBls: false};
}

// phase0: do all verifications except for signature verification
const phase0ResultOrErrors = await Promise.all(
attestationOrBytesArr.map((attestationOrBytes) =>
wrapError(phase0ValidationFn(fork, chain, attestationOrBytes, subnet))
)
);

// phase1: verify signatures of all valid attestations
// map new index to index in resultOrErrors
const newIndexToOldIndex = new Map<number, number>();
const signatureSets: SingleSignatureSet[] = [];
let newIndex = 0;
const phase0Results: Phase0Result[] = [];
for (const [i, resultOrError] of phase0ResultOrErrors.entries()) {
if (resultOrError.err) {
continue;
}
phase0Results.push(resultOrError.result);
newIndexToOldIndex.set(newIndex, i);
signatureSets.push(resultOrError.result.signatureSet);
newIndex++;
}

let signatureValids: boolean[];
let batchableBls = false;
let fallbackBls = false;
if (signatureSets.length >= MIN_SIGNATURE_SETS_TO_BATCH_VERIFY) {
// all signature sets should have same signing root since we filtered in network processor
const aggregatedPubkey = bls.PublicKey.aggregate(signatureSets.map((set) => set.pubkey));
const aggregatedSignature = bls.Signature.aggregate(
// no need to check signature, will do a final verify later
signatureSets.map((set) => signatureFromBytesNoCheck(set.signature))
);

// quick check, it's likely this is valid most of the time
batchableBls = true;
const isAllValid = aggregatedSignature.verify(aggregatedPubkey, signatureSets[0].signingRoot);
fallbackBls = !isAllValid;
signatureValids = isAllValid
? new Array<boolean>(signatureSets.length).fill(true)
: // batchable is false because one of the signature is invalid
await Promise.all(signatureSets.map((set) => chain.bls.verifySignatureSets([set], {batchable: false})));
} else {
batchableBls = false;
// don't want to block the main thread if there are too few signatures
signatureValids = await Promise.all(
signatureSets.map((set) => chain.bls.verifySignatureSets([set], {batchable: true}))
);
}

// phase0 post validation
for (const [i, isValid] of signatureValids.entries()) {
const oldIndex = newIndexToOldIndex.get(i);
if (oldIndex == null) {
// should not happen
throw Error(`Cannot get old index for index ${i}`);
}

const {validatorIndex, attestation} = phase0Results[i];
const targetEpoch = attestation.data.target.epoch;
if (isValid) {
// Now that the attestation has been fully verified, store that we have received a valid attestation from this validator.
//
// It's important to double check that the attestation still hasn't been observed, since
// there can be a race-condition if we receive two attestations at the same time and
// process them in different threads.
if (chain.seenAttesters.isKnown(targetEpoch, validatorIndex)) {
phase0ResultOrErrors[oldIndex] = {
err: new AttestationError(GossipAction.IGNORE, {
code: AttestationErrorCode.ATTESTATION_ALREADY_KNOWN,
targetEpoch,
validatorIndex,
}),
};
}

// valid
chain.seenAttesters.add(targetEpoch, validatorIndex);
} else {
phase0ResultOrErrors[oldIndex] = {
err: new AttestationError(GossipAction.IGNORE, {
code: AttestationErrorCode.INVALID_SIGNATURE,
}),
};
}
}

return {
results: phase0ResultOrErrors,
batchableBls,
fallbackBls,
};
}

/**
* Validate attestations from api
* - no need to deserialize attestation
Expand All @@ -81,17 +204,42 @@ export async function validateApiAttestation(
}

/**
* Only deserialize the attestation if needed, use the cached AttestationData instead
* This is to avoid deserializing similar attestation multiple times which could help the gc
* Validate a single unaggregated attestation
* subnet is null for api attestations
*/
async function validateAttestation(
export async function validateAttestation(
fork: ForkName,
chain: IBeaconChain,
attestationOrBytes: AttestationOrBytes,
/** Optional, to allow verifying attestations through API with unknown subnet */
subnet: number | null,
prioritizeBls = false
): Promise<AttestationValidationResult> {
const phase0Result = await validateGossipAttestationNoSignatureCheck(fork, chain, attestationOrBytes, subnet);
const {attestation, signatureSet, validatorIndex} = phase0Result;
const isValid = await chain.bls.verifySignatureSets([signatureSet], {batchable: true, priority: prioritizeBls});

if (isValid) {
const targetEpoch = attestation.data.target.epoch;
chain.seenAttesters.add(targetEpoch, validatorIndex);
return phase0Result;
} else {
throw new AttestationError(GossipAction.IGNORE, {
code: AttestationErrorCode.INVALID_SIGNATURE,
});
}
}

/**
* Only deserialize the attestation if needed, use the cached AttestationData instead
* This is to avoid deserializing similar attestation multiple times which could help the gc
*/
async function validateGossipAttestationNoSignatureCheck(
fork: ForkName,
chain: IBeaconChain,
attestationOrBytes: AttestationOrBytes,
/** Optional, to allow verifying attestations through API with unknown subnet */
subnet: number | null
): Promise<Phase0Result> {
// Do checks in this order:
// - do early checks (w/o indexed attestation)
// - > obtain indexed attestation and committes per slot
Expand All @@ -108,8 +256,13 @@ async function validateAttestation(
let attDataBase64: AttDataBase64 | null;
if (attestationOrBytes.serializedData) {
// gossip
attDataBase64 = getAttDataBase64FromAttestationSerialized(attestationOrBytes.serializedData);
const attSlot = attestationOrBytes.attSlot;
if (!attestationOrBytes.attDataBase64) {
throw new AttestationError(GossipAction.IGNORE, {
code: AttestationErrorCode.NO_INDEXED_DATA,
});
}
attDataBase64 = attestationOrBytes.attDataBase64;
const cachedAttData = attDataBase64 !== null ? chain.seenAttestationDatas.get(attSlot, attDataBase64) : null;
if (cachedAttData === null) {
const attestation = sszDeserializeAttestation(attestationOrBytes.serializedData);
Expand Down Expand Up @@ -263,7 +416,7 @@ async function validateAttestation(

// [REJECT] The signature of attestation is valid.
const attestingIndices = [validatorIndex];
let signatureSet: ISignatureSet;
let signatureSet: SingleSignatureSet;
let attDataRootHex: RootHex;
const signature = attestationOrCache.attestation
? attestationOrCache.attestation.signature
Expand Down Expand Up @@ -304,24 +457,7 @@ async function validateAttestation(
}
}

if (!(await chain.bls.verifySignatureSets([signatureSet], {batchable: true, priority: prioritizeBls}))) {
throw new AttestationError(GossipAction.REJECT, {code: AttestationErrorCode.INVALID_SIGNATURE});
}

// Now that the attestation has been fully verified, store that we have received a valid attestation from this validator.
//
// It's important to double check that the attestation still hasn't been observed, since
// there can be a race-condition if we receive two attestations at the same time and
// process them in different threads.
if (chain.seenAttesters.isKnown(targetEpoch, validatorIndex)) {
throw new AttestationError(GossipAction.IGNORE, {
code: AttestationErrorCode.ATTESTATION_ALREADY_KNOWN,
targetEpoch,
validatorIndex,
});
}

chain.seenAttesters.add(targetEpoch, validatorIndex);
// no signature check, leave that for phase1

const indexedAttestation: phase0.IndexedAttestation = {
attestingIndices,
Expand All @@ -336,7 +472,7 @@ async function validateAttestation(
data: attData,
signature,
};
return {attestation, indexedAttestation, subnet: expectedSubnet, attDataRootHex};
return {attestation, indexedAttestation, subnet: expectedSubnet, attDataRootHex, signatureSet, validatorIndex};
}

/**
Expand Down
22 changes: 19 additions & 3 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ export function createLodestarMetrics(
help: "Count of total gossip validation queue length",
labelNames: ["topic"],
}),
dropRatio: register.gauge<"topic">({
name: "lodestar_gossip_validation_queue_current_drop_ratio",
help: "Current drop ratio of gossip validation queue",
keySize: register.gauge<"topic">({
name: "lodestar_gossip_validation_queue_key_size",
help: "Count of total gossip validation queue key size",
labelNames: ["topic"],
}),
droppedJobs: register.gauge<"topic">({
Expand Down Expand Up @@ -575,6 +575,22 @@ export function createLodestarMetrics(
labelNames: ["caller"],
buckets: [0, 1, 2, 4, 8, 16, 32, 64],
}),
attestationBatchCount: register.gauge({
name: "lodestar_gossip_attestation_verified_in_batch_count",
help: "Count of attestations verified in batch",
}),
attestationNonBatchCount: register.gauge({
name: "lodestar_gossip_attestation_verified_non_batch_count",
help: "Count of attestations NOT verified in batch",
}),
totalBatch: register.gauge({
name: "lodestar_gossip_attestation_total_batch_count",
help: "Total number of attestation batches",
}),
totalBatchFallbackBlsCheck: register.gauge({
name: "lodestar_gossip_attestation_total_batch_fallback_bls_check_count",
help: "Total number of attestation batches that fallback to checking each signature separately",
}),
},

// Gossip block
Expand Down
Loading

0 comments on commit 8966fbb

Please sign in to comment.