Skip to content

Commit

Permalink
Merge 2db8d02 into 1fa3f37
Browse files Browse the repository at this point in the history
  • Loading branch information
g11tech authored Oct 8, 2024
2 parents 1fa3f37 + 2db8d02 commit 57dccd7
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 28 deletions.
2 changes: 1 addition & 1 deletion packages/beacon-node/src/chain/blocks/verifyBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ export async function verifyBlocksInEpoch(
} as SegmentExecStatus),

// data availability for the blobs
verifyBlocksDataAvailability(this, blocksInput, opts),
verifyBlocksDataAvailability(this, blocksInput, abortController.signal, opts),

// Run state transition only
// TODO: Ensure it yields to allow flushing to workers and engine API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {computeTimeAtSlot} from "@lodestar/state-transition";
import {DataAvailabilityStatus} from "@lodestar/fork-choice";
import {ChainForkConfig} from "@lodestar/config";
import {deneb, UintNum64} from "@lodestar/types";
import {Logger} from "@lodestar/utils";
import {Logger, ErrorAborted} from "@lodestar/utils";
import {BlockError, BlockErrorCode} from "../errors/index.js";
import {validateBlobSidecars} from "../validation/blobSidecar.js";
import {Metrics} from "../../metrics/metrics.js";
Expand All @@ -27,6 +27,7 @@ const BLOB_AVAILABILITY_TIMEOUT = 12_000;
export async function verifyBlocksDataAvailability(
chain: {config: ChainForkConfig; genesisTime: UintNum64; logger: Logger; metrics: Metrics | null},
blocks: BlockInput[],
signal: AbortSignal,
opts: ImportBlockOpts
): Promise<{
dataAvailabilityStatuses: DataAvailabilityStatus[];
Expand All @@ -43,9 +44,12 @@ export async function verifyBlocksDataAvailability(
const availableBlockInputs: BlockInput[] = [];

for (const blockInput of blocks) {
if (signal.aborted) {
throw new ErrorAborted("verifyBlockStateTransitionOnly");
}
// Validate status of only not yet finalized blocks, we don't need yet to propogate the status
// as it is not used upstream anywhere
const {dataAvailabilityStatus, availableBlockInput} = await maybeValidateBlobs(chain, blockInput, opts);
const {dataAvailabilityStatus, availableBlockInput} = await maybeValidateBlobs(chain, blockInput, signal, opts);
dataAvailabilityStatuses.push(dataAvailabilityStatus);
availableBlockInputs.push(availableBlockInput);
}
Expand All @@ -69,6 +73,7 @@ export async function verifyBlocksDataAvailability(
async function maybeValidateBlobs(
chain: {config: ChainForkConfig; genesisTime: UintNum64; logger: Logger},
blockInput: BlockInput,
signal: AbortSignal,
opts: ImportBlockOpts
): Promise<{dataAvailabilityStatus: DataAvailabilityStatus; availableBlockInput: BlockInput}> {
switch (blockInput.type) {
Expand All @@ -92,7 +97,7 @@ async function maybeValidateBlobs(
const blobsData =
blockInput.type === BlockInputType.availableData
? blockInput.blockData
: await raceWithCutoff(chain, blockInput, blockInput.cachedData.availabilityPromise);
: await raceWithCutoff(chain, blockInput, blockInput.cachedData.availabilityPromise, signal);
const {blobs} = blobsData;

const {blobKzgCommitments} = (block as deneb.SignedBeaconBlock).message.body;
Expand Down Expand Up @@ -122,16 +127,21 @@ async function maybeValidateBlobs(
async function raceWithCutoff<T>(
chain: {config: ChainForkConfig; genesisTime: UintNum64; logger: Logger},
blockInput: BlockInput,
availabilityPromise: Promise<T>
availabilityPromise: Promise<T>,
signal: AbortSignal
): Promise<T> {
const {block} = blockInput;
const blockSlot = block.message.slot;

const cutoffTime = Math.max(
computeTimeAtSlot(chain.config, blockSlot, chain.genesisTime) * 1000 + BLOB_AVAILABILITY_TIMEOUT - Date.now(),
0
);
const cutoffTimeout = new Promise((_resolve, reject) => setTimeout(reject, cutoffTime));
const cutoffTime =
computeTimeAtSlot(chain.config, blockSlot, chain.genesisTime) * 1000 + BLOB_AVAILABILITY_TIMEOUT - Date.now();
const cutoffTimeout =
cutoffTime > 0
? new Promise((_resolve, reject) => {
setTimeout(reject, cutoffTime);
signal.addEventListener("abort", () => reject(signal.reason));
})
: Promise.reject();
chain.logger.debug("Racing for blob availabilityPromise", {blockSlot, cutoffTime});

try {
Expand Down
4 changes: 4 additions & 0 deletions packages/beacon-node/src/execution/engine/disabled.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,8 @@ export class ExecutionEngineDisabled implements IExecutionEngine {
getPayloadBodiesByRange(): Promise<never> {
throw Error("Execution engine disabled");
}

getBlobs(): Promise<never> {
throw Error("Execution engine disabled");
}
}
15 changes: 14 additions & 1 deletion packages/beacon-node/src/execution/engine/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
import {Metrics} from "../../metrics/index.js";
import {JobItemQueue} from "../../util/queue/index.js";
import {EPOCHS_PER_BATCH} from "../../sync/constants.js";
import {numToQuantity} from "../../eth1/provider/utils.js";
import {bytesToData, numToQuantity} from "../../eth1/provider/utils.js";
import {getLodestarClientVersion} from "../../util/metadata.js";
import {
ExecutionPayloadStatus,
Expand All @@ -24,6 +24,7 @@ import {
ExecutionEngineState,
ClientVersion,
ClientCode,
BlobAndProof,
} from "./interface.js";
import {PayloadIdCache} from "./payloadIdCache.js";
import {
Expand All @@ -38,6 +39,7 @@ import {
assertReqSizeLimit,
deserializeExecutionPayloadBody,
serializeExecutionRequests,
deserializeBlobAndProofs,
} from "./types.js";
import {getExecutionEngineState} from "./utils.js";

Expand Down Expand Up @@ -463,6 +465,17 @@ export class ExecutionEngineHttp implements IExecutionEngine {
return response.map(deserializeExecutionPayloadBody);
}

async getBlobs(_fork: ForkName, versionedHashes: VersionedHashes): Promise<(BlobAndProof | null)[]> {
const method = "engine_getBlobsV1";
assertReqSizeLimit(versionedHashes.length, 128);
const versionedHashesHex = versionedHashes.map(bytesToData);
const response = await this.rpc.fetchWithRetries<
EngineApiRpcReturnTypes[typeof method],
EngineApiRpcParamTypes[typeof method]
>({method, params: [versionedHashesHex]});
return response.map(deserializeBlobAndProofs);
}

private async getClientVersion(clientVersion: ClientVersion): Promise<ClientVersion[]> {
const method = "engine_getClientVersionV1";

Expand Down
7 changes: 7 additions & 0 deletions packages/beacon-node/src/execution/engine/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ export type BlobsBundle = {
proofs: KZGProof[];
};

export type BlobAndProof = {
blob: Blob;
proof: KZGProof;
};

export type ClientVersion = {
code: ClientCode;
name: string;
Expand Down Expand Up @@ -179,4 +184,6 @@ export interface IExecutionEngine {
getPayloadBodiesByHash(fork: ForkName, blockHash: DATA[]): Promise<(ExecutionPayloadBody | null)[]>;

getPayloadBodiesByRange(fork: ForkName, start: number, count: number): Promise<(ExecutionPayloadBody | null)[]>;

getBlobs(fork: ForkName, versionedHashes: VersionedHashes): Promise<(BlobAndProof | null)[]>;
}
7 changes: 7 additions & 0 deletions packages/beacon-node/src/execution/engine/mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ export class ExecutionEngineMockBackend implements JsonRpcBackend {
engine_getPayloadBodiesByHashV1: this.getPayloadBodiesByHash.bind(this),
engine_getPayloadBodiesByRangeV1: this.getPayloadBodiesByRange.bind(this),
engine_getClientVersionV1: this.getClientVersionV1.bind(this),
engine_getBlobsV1: this.getBlobs.bind(this),
};
}

Expand Down Expand Up @@ -396,6 +397,12 @@ export class ExecutionEngineMockBackend implements JsonRpcBackend {
return [{code: ClientCode.XX, name: "mock", version: "", commit: ""}];
}

private getBlobs(
versionedHashes: EngineApiRpcParamTypes["engine_newPayloadV3"][1]
): EngineApiRpcReturnTypes["engine_getBlobsV1"] {
return versionedHashes.map((_vh) => null);
}

private timestampToFork(timestamp: number): ForkExecution {
if (timestamp > (this.opts.electraForkTimestamp ?? Infinity)) return ForkName.electra;
if (timestamp > (this.opts.denebForkTimestamp ?? Infinity)) return ForkName.deneb;
Expand Down
20 changes: 19 additions & 1 deletion packages/beacon-node/src/execution/engine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
QUANTITY,
quantityToBigint,
} from "../../eth1/provider/utils.js";
import {ExecutionPayloadStatus, BlobsBundle, PayloadAttributes, VersionedHashes} from "./interface.js";
import {ExecutionPayloadStatus, BlobsBundle, PayloadAttributes, VersionedHashes, BlobAndProof} from "./interface.js";
import {WithdrawalV1} from "./payloadIdCache.js";

/* eslint-disable @typescript-eslint/naming-convention */
Expand Down Expand Up @@ -69,6 +69,8 @@ export type EngineApiRpcParamTypes = {
* Object - Instance of ClientVersion
*/
engine_getClientVersionV1: [ClientVersionRpc];

engine_getBlobsV1: [DATA[]];
};

export type PayloadStatus = {
Expand Down Expand Up @@ -111,6 +113,8 @@ export type EngineApiRpcReturnTypes = {
engine_getPayloadBodiesByRangeV1: (ExecutionPayloadBodyRpc | null)[];

engine_getClientVersionV1: ClientVersionRpc[];

engine_getBlobsV1: (BlobAndProofRpc | null)[];
};

type ExecutionPayloadRpcWithValue = {
Expand Down Expand Up @@ -185,6 +189,11 @@ export type ConsolidationRequestRpc = {
targetPubkey: DATA;
};

export type BlobAndProofRpc = {
blob: DATA;
proof: DATA;
};

export type VersionedHashesRpc = DATA[];

export type PayloadAttributesRpc = {
Expand Down Expand Up @@ -494,6 +503,15 @@ export function serializeExecutionPayloadBody(data: ExecutionPayloadBody | null)
: null;
}

export function deserializeBlobAndProofs(data: BlobAndProofRpc | null): BlobAndProof | null {
return data
? {
blob: dataToBytes(data.blob, BYTES_PER_FIELD_ELEMENT * FIELD_ELEMENTS_PER_BLOB),
proof: dataToBytes(data.proof, 48),
}
: null;
}

export function assertReqSizeLimit(blockHashesReqCount: number, count: number): void {
if (blockHashesReqCount > count) {
throw new Error(`Requested blocks must not be > ${count}`);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import {ChainForkConfig} from "@lodestar/config";
import {phase0, deneb} from "@lodestar/types";
import {ForkSeq} from "@lodestar/params";
import {ForkSeq, ForkName} from "@lodestar/params";
import {fromHex} from "@lodestar/utils";
import {signedBlockToSignedHeader} from "@lodestar/state-transition";
import {
BlockInput,
BlockInputType,
Expand All @@ -16,6 +17,8 @@ import {PeerIdStr} from "../../util/peerId.js";
import {INetwork} from "../interface.js";
import {BlockInputAvailabilitySource} from "../../chain/seenCache/seenGossipBlockInput.js";
import {Metrics} from "../../metrics/index.js";
import {IExecutionEngine} from "../../execution/index.js";
import {computeInclusionProof, kzgCommitmentToVersionedHash} from "../../util/blobs.js";
import {matchBlockWithBlobs} from "./beaconBlocksMaybeBlobsByRange.js";

export async function beaconBlocksMaybeBlobsByRoot(
Expand All @@ -35,6 +38,7 @@ export async function beaconBlocksMaybeBlobsByRoot(
if (ForkSeq[fork] >= ForkSeq.deneb) {
const blobKzgCommitmentsLen = (block.data.message.body as deneb.BeaconBlockBody).blobKzgCommitments.length;
for (let index = 0; index < blobKzgCommitmentsLen; index++) {
// try see if the blob is available locally
blobIdentifiers.push({blockRoot, index});
}
}
Expand All @@ -57,8 +61,9 @@ export async function unavailableBeaconBlobsByRoot(
network: INetwork,
peerId: PeerIdStr,
unavailableBlockInput: BlockInput | NullBlockInput,
metrics: Metrics | null
opts: {executionEngine?: IExecutionEngine; metrics: Metrics | null}
): Promise<BlockInput> {
const {executionEngine, metrics} = opts;
if (unavailableBlockInput.block !== null && unavailableBlockInput.type !== BlockInputType.dataPromise) {
return unavailableBlockInput;
}
Expand All @@ -77,26 +82,62 @@ export async function unavailableBeaconBlobsByRoot(
}

// resolve missing blobs
const blobIdentifiers: deneb.BlobIdentifier[] = [];
const blobIdentifiersWithCommitments: (deneb.BlobIdentifier & {kzgCommitment: deneb.KZGCommitment})[] = [];
const slot = block.message.slot;
const fork = config.getForkName(slot);
const blockRoot = config.getForkTypes(slot).BeaconBlock.hashTreeRoot(block.message);

const blobKzgCommitmentsLen = (block.message.body as deneb.BeaconBlockBody).blobKzgCommitments.length;
for (let index = 0; index < blobKzgCommitmentsLen; index++) {
if (blobsCache.has(index) === false) blobIdentifiers.push({blockRoot, index});
if (blobsCache.has(index) === false) {
blobIdentifiersWithCommitments.push({
blockRoot,
index,
kzgCommitment: (block.message.body as deneb.BeaconBlockBody).blobKzgCommitments[index],
});
}
}

let allBlobSidecars: deneb.BlobSidecar[];
if (blobIdentifiers.length > 0) {
allBlobSidecars = await network.sendBlobSidecarsByRoot(peerId, blobIdentifiers);
const networkReqIdentifiers: deneb.BlobIdentifier[] = [];
if (executionEngine !== undefined) {
const signedBlockHeader = signedBlockToSignedHeader(config, block);
const versionedHashes = blobIdentifiersWithCommitments.map((bi) => kzgCommitmentToVersionedHash(bi.kzgCommitment));
const blobAndProofs = await executionEngine
.getBlobs(ForkName.deneb, versionedHashes)
.catch((_e) => versionedHashes.map((_vh) => null));

for (let j = 0; j < versionedHashes.length; j++) {
const blobAndProof = blobAndProofs[j];
if (blobAndProof !== null) {
const blob = blobAndProof.blob;
const kzgProof = blobAndProof.proof;
const {kzgCommitment, index} = blobIdentifiersWithCommitments[j];
const kzgCommitmentInclusionProof = computeInclusionProof(fork, block.message.body, index);
const blobSidecar = {index, blob, kzgCommitment, kzgProof, signedBlockHeader, kzgCommitmentInclusionProof};
// add them in cache so that its reflected in all the blockInputs that carry this
// for e.g. a blockInput that might be awaiting blobs promise fullfillment in
// verifyBlocksDataAvailability
blobsCache.set(blobSidecar.index, {blobSidecar, blobBytes: null});
}
// may be blobsidecar arrived in the timespan of making the request
else if (blobsCache.has(blobIdentifiersWithCommitments[j].index) === false) {
const {blockRoot, index} = blobIdentifiersWithCommitments[j];
networkReqIdentifiers.push({blockRoot, index});
}
}
}

let networkResBlobSidecars: deneb.BlobSidecar[];
if (networkReqIdentifiers.length > 0) {
networkResBlobSidecars = await network.sendBlobSidecarsByRoot(peerId, networkReqIdentifiers);
} else {
allBlobSidecars = [];
networkResBlobSidecars = [];
}

// add them in cache so that its reflected in all the blockInputs that carry this
// for e.g. a blockInput that might be awaiting blobs promise fullfillment in
// verifyBlocksDataAvailability
for (const blobSidecar of allBlobSidecars) {
for (const blobSidecar of networkResBlobSidecars) {
blobsCache.set(blobSidecar.index, {blobSidecar, blobBytes: null});
}

Expand Down
11 changes: 4 additions & 7 deletions packages/beacon-node/src/sync/unknownBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -533,13 +533,10 @@ export class UnknownBlockSync {
for (let i = 0; i < MAX_ATTEMPTS_PER_BLOCK; i++) {
const peer = shuffledPeers[i % shuffledPeers.length];
try {
const blockInput = await unavailableBeaconBlobsByRoot(
this.config,
this.network,
peer,
unavailableBlockInput,
this.metrics
);
const blockInput = await unavailableBeaconBlobsByRoot(this.config, this.network, peer, unavailableBlockInput, {
metrics: this.metrics,
executionEngine: this.chain.executionEngine,
});

// Peer does not have the block, try with next peer
if (blockInput === undefined) {
Expand Down

0 comments on commit 57dccd7

Please sign in to comment.