Skip to content

Commit

Permalink
add bit of gossip, data availability handling, fix build and run
Browse files Browse the repository at this point in the history
  • Loading branch information
g11tech committed Mar 16, 2024
1 parent c7f3ec6 commit 40bd276
Show file tree
Hide file tree
Showing 17 changed files with 212 additions and 40 deletions.
8 changes: 3 additions & 5 deletions packages/beacon-node/src/api/impl/beacon/blocks/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {fromHexString, toHexString} from "@chainsafe/ssz";
import {routes, ServerApi, ResponseFormat} from "@lodestar/api";
import {computeTimeAtSlot, reconstructFullBlockOrContents} from "@lodestar/state-transition";
import {SLOTS_PER_HISTORICAL_ROOT} from "@lodestar/params";
import {SLOTS_PER_HISTORICAL_ROOT, ForkName} from "@lodestar/params";
import {sleep, toHex} from "@lodestar/utils";
import {allForks, deneb, isSignedBlockContents, ProducedBlockSource} from "@lodestar/types";
import {BlockSource, getBlockInput, ImportBlockOpts, BlockInput} from "../../../../chain/blocks/types.js";
Expand Down Expand Up @@ -50,11 +50,9 @@ export function getBeaconBlockApi({
blockForImport = getBlockInput.postDeneb(
config,
signedBlock,
BlockSource.api,
blobSidecars,
// don't bundle any bytes for block and blobs
null,
blobSidecars.map(() => null)
{fork: ForkName.deneb, blobs: blobSidecars, blobsBytes: [null]},
BlockSource.api
);
} else {
signedBlock = signedBlockOrContents;
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ export async function importBlock(
});

if (blockInput.type === BlockInputType.postDeneb) {
for (const blobSidecar of blockInput.blobs) {
for (const blobSidecar of blockInput.blockData.blobs) {
const {index, kzgCommitment} = blobSidecar;
this.emitter.emit(routes.events.EventType.blobSidecar, {
blockRoot: blockRootHex,
Expand Down
3 changes: 2 additions & 1 deletion packages/beacon-node/src/chain/blocks/types.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {CachedBeaconStateAllForks, computeEpochAtSlot, DataAvailableStatus} from "@lodestar/state-transition";
import {MaybeValidExecutionStatus} from "@lodestar/fork-choice";
import {allForks, deneb, electra, Slot, RootHex} from "@lodestar/types";
import {ForkSeq, ForkName, ForkPreBlobs, ForkILs} from "@lodestar/params";
import {ForkSeq, ForkName, ForkILs} from "@lodestar/params";
import {ChainForkConfig} from "@lodestar/config";

export enum BlockInputType {
Expand All @@ -21,6 +21,7 @@ export enum BlockSource {
export enum GossipedInputType {
block = "block",
blob = "blob",
ilist = "ilist",
}

type ForkBlobsInfo = {fork: ForkName.deneb};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {computeTimeAtSlot, DataAvailableStatus} from "@lodestar/state-transition";
import {ChainForkConfig} from "@lodestar/config";
import {deneb, UintNum64} from "@lodestar/types";
import {ForkName} from "@lodestar/params";
import {Logger} from "@lodestar/utils";
import {BlockError, BlockErrorCode} from "../errors/index.js";
import {validateBlobSidecars} from "../validation/blobSidecar.js";
Expand Down Expand Up @@ -78,12 +79,21 @@ async function maybeValidateBlobs(
const {block} = blockInput;
const blockSlot = block.message.slot;

const blobsData =
blockInput.type === BlockInputType.postDeneb
? blockInput
: await raceWithCutoff(chain, blockInput, blockInput.availabilityPromise);
const {blobs} = blobsData;
let blockData;
if (blockInput.type === BlockInputType.postDeneb) {
blockData = blockInput.blockData;
} else {
const {cachedData} = blockInput;
// weird that typescript is getting confused doing the same thing but with
// differing promise types, need to separate the case out
if (cachedData.fork === ForkName.deneb) {
blockData = await raceWithCutoff(chain, blockInput, cachedData.availabilityPromise);
} else {
blockData = await raceWithCutoff(chain, blockInput, cachedData.availabilityPromise);
}
}

const {blobs} = blockData;
const {blobKzgCommitments} = (block as deneb.SignedBeaconBlock).message.body;
const beaconBlockRoot = chain.config.getForkTypes(blockSlot).BeaconBlock.hashTreeRoot(block.message);

Expand Down
6 changes: 3 additions & 3 deletions packages/beacon-node/src/chain/blocks/writeBlockInputToDb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ export async function writeBlockInputToDb(this: BeaconChain, blocksInput: BlockI
if (blockInput.type === BlockInputType.postDeneb || blockInput.type === BlockInputType.blobsPromise) {
const blobSidecars =
blockInput.type == BlockInputType.postDeneb
? blockInput.blobs
? blockInput.blockData.blobs
: // At this point of import blobs are available and can be safely awaited
(await blockInput.availabilityPromise).blobs;
(await blockInput.cachedData.availabilityPromise).blobs;

// NOTE: Old blobs are pruned on archive
fnPromises.push(this.db.blobSidecars.add({blockRoot, slot: block.message.slot, blobSidecars}));
Expand Down Expand Up @@ -64,7 +64,7 @@ export async function removeEagerlyPersistedBlockInputs(this: BeaconChain, block
blockToRemove.push(block);

if (type === BlockInputType.postDeneb) {
const blobSidecars = blockInput.blobs;
const blobSidecars = blockInput.blockData.blobs;
blobsToRemove.push({blockRoot, slot: block.message.slot, blobSidecars});
}
}
Expand Down
34 changes: 26 additions & 8 deletions packages/beacon-node/src/chain/seenCache/seenGossipBlockInput.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import {toHexString} from "@chainsafe/ssz";
import {deneb, RootHex, ssz, allForks} from "@lodestar/types";
import {deneb, electra, RootHex, ssz, allForks} from "@lodestar/types";
import {ChainForkConfig} from "@lodestar/config";
import {pruneSetToMax} from "@lodestar/utils";
import {BLOBSIDECAR_FIXED_SIZE, ForkSeq, ForkName} from "@lodestar/params";
import {BLOBSIDECAR_FIXED_SIZE, ForkSeq, ForkName, isForkBlobs, isForkILs} from "@lodestar/params";

import {
BlockInput,
Expand All @@ -13,6 +13,7 @@ import {
CachedData,
GossipedInputType,
getBlockInputBlobs,
BlockInputDataIls,
} from "../blocks/types.js";
import {Metrics} from "../../metrics/index.js";

Expand All @@ -23,7 +24,12 @@ export enum BlockInputAvailabilitySource {

type GossipedBlockInput =
| {type: GossipedInputType.block; signedBlock: allForks.SignedBeaconBlock; blockBytes: Uint8Array | null}
| {type: GossipedInputType.blob; blobSidecar: deneb.BlobSidecar; blobBytes: Uint8Array | null};
| {type: GossipedInputType.blob; blobSidecar: deneb.BlobSidecar; blobBytes: Uint8Array | null}
| {
type: GossipedInputType.ilist;
inclusionList: electra.NewInclusionListRequest;
inclusionListBytes: Uint8Array | null;
};

type BlockInputCacheType = {
block?: allForks.SignedBeaconBlock;
Expand Down Expand Up @@ -75,7 +81,9 @@ export class SeenGossipBlockInput {
let blockCache;
let fork;

if (gossipedInput.type === GossipedInputType.block) {
if (gossipedInput.type === GossipedInputType.ilist) {
throw Error("Inclusion list gossip handling not implemented");
} else if (gossipedInput.type === GossipedInputType.block) {
const {signedBlock, blockBytes} = gossipedInput;
fork = config.getForkName(signedBlock.message.slot);

Expand Down Expand Up @@ -205,13 +213,13 @@ function getEmptyBlockInputCacheEntry(fork: ForkName): BlockInputCacheType {
throw Error("Promise Constructor was not executed immediately");
}

if (ForkSeq[fork] < ForkSeq.deneb) {
if (!isForkBlobs(fork)) {
return {blockInputPromise, resolveBlockInput};
}

const blobsCache = new Map();

if (fork === ForkName.deneb) {
if (!isForkILs(fork)) {
// blobs availability
let resolveAvailability: ((blobs: BlockInputDataBlobs) => void) | null = null;
const availabilityPromise = new Promise<BlockInputDataBlobs>((resolveCB) => {
resolveAvailability = resolveCB;
Expand All @@ -223,6 +231,16 @@ function getEmptyBlockInputCacheEntry(fork: ForkName): BlockInputCacheType {
const cachedData: CachedData = {fork, blobsCache, availabilityPromise, resolveAvailability};
return {blockInputPromise, resolveBlockInput, cachedData};
} else {
throw Error("il cache not implemented");
// il availability (with blobs)
let resolveAvailability: ((blobs: BlockInputDataIls) => void) | null = null;
const availabilityPromise = new Promise<BlockInputDataIls>((resolveCB) => {
resolveAvailability = resolveCB;
});

if (resolveAvailability === null) {
throw Error("Promise Constructor was not executed immediately");
}
const cachedData: CachedData = {fork, blobsCache, availabilityPromise, resolveAvailability, inclusionLists: []};
return {blockInputPromise, resolveBlockInput, cachedData};
}
}
7 changes: 7 additions & 0 deletions packages/beacon-node/src/chain/validation/inclusionList.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import {electra} from "@lodestar/types";
import {IBeaconChain} from "../interface.js";

export async function validateGossipInclusionList(
_chain: IBeaconChain,
_inclusionList: electra.NewInclusionListRequest
): Promise<void> {}
6 changes: 5 additions & 1 deletion packages/beacon-node/src/network/gossip/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {Libp2p} from "libp2p";
import {Message, TopicValidatorResult} from "@libp2p/interface";
import {PeerIdStr} from "@chainsafe/libp2p-gossipsub/types";
import {ForkName} from "@lodestar/params";
import {allForks, altair, capella, deneb, phase0, Slot} from "@lodestar/types";
import {allForks, altair, capella, deneb, electra, phase0, Slot} from "@lodestar/types";
import {BeaconConfig} from "@lodestar/config";
import {Logger} from "@lodestar/utils";
import {IBeaconChain} from "../../chain/index.js";
Expand All @@ -13,6 +13,7 @@ import {GossipActionError} from "../../chain/errors/gossipValidation.js";
export enum GossipType {
beacon_block = "beacon_block",
blob_sidecar = "blob_sidecar",
inclusion_list = "inclusion_list",
beacon_aggregate_and_proof = "beacon_aggregate_and_proof",
beacon_attestation = "beacon_attestation",
voluntary_exit = "voluntary_exit",
Expand Down Expand Up @@ -41,6 +42,7 @@ export interface IGossipTopic {
export type GossipTopicTypeMap = {
[GossipType.beacon_block]: {type: GossipType.beacon_block};
[GossipType.blob_sidecar]: {type: GossipType.blob_sidecar; index: number};
[GossipType.inclusion_list]: {type: GossipType.inclusion_list};
[GossipType.beacon_aggregate_and_proof]: {type: GossipType.beacon_aggregate_and_proof};
[GossipType.beacon_attestation]: {type: GossipType.beacon_attestation; subnet: number};
[GossipType.voluntary_exit]: {type: GossipType.voluntary_exit};
Expand Down Expand Up @@ -71,6 +73,7 @@ export type SSZTypeOfGossipTopic<T extends GossipTopic> = T extends {type: infer
export type GossipTypeMap = {
[GossipType.beacon_block]: allForks.SignedBeaconBlock;
[GossipType.blob_sidecar]: deneb.BlobSidecar;
[GossipType.inclusion_list]: electra.NewInclusionListRequest;
[GossipType.beacon_aggregate_and_proof]: phase0.SignedAggregateAndProof;
[GossipType.beacon_attestation]: phase0.Attestation;
[GossipType.voluntary_exit]: phase0.SignedVoluntaryExit;
Expand All @@ -86,6 +89,7 @@ export type GossipTypeMap = {
export type GossipFnByType = {
[GossipType.beacon_block]: (signedBlock: allForks.SignedBeaconBlock) => Promise<void> | void;
[GossipType.blob_sidecar]: (blobSidecar: deneb.BlobSidecar) => Promise<void> | void;
[GossipType.inclusion_list]: (inclusionList: electra.NewInclusionListRequest) => Promise<void> | void;
[GossipType.beacon_aggregate_and_proof]: (aggregateAndProof: phase0.SignedAggregateAndProof) => Promise<void> | void;
[GossipType.beacon_attestation]: (attestation: phase0.Attestation) => Promise<void> | void;
[GossipType.voluntary_exit]: (voluntaryExit: phase0.SignedVoluntaryExit) => Promise<void> | void;
Expand Down
10 changes: 10 additions & 0 deletions packages/beacon-node/src/network/gossip/topic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ export function stringifyGossipTopic(forkDigestContext: ForkDigestContext, topic
function stringifyGossipTopicType(topic: GossipTopic): string {
switch (topic.type) {
case GossipType.beacon_block:
case GossipType.inclusion_list:
case GossipType.beacon_aggregate_and_proof:
case GossipType.voluntary_exit:
case GossipType.proposer_slashing:
Expand All @@ -86,6 +87,8 @@ export function getGossipSSZType(topic: GossipTopic) {
return ssz[topic.fork].SignedBeaconBlock;
case GossipType.blob_sidecar:
return ssz.deneb.BlobSidecar;
case GossipType.inclusion_list:
return ssz.electra.NewInclusionListRequest;
case GossipType.beacon_aggregate_and_proof:
return ssz.phase0.SignedAggregateAndProof;
case GossipType.beacon_attestation:
Expand Down Expand Up @@ -162,6 +165,7 @@ export function parseGossipTopic(forkDigestContext: ForkDigestContext, topicStr:
// Inline-d the parseGossipTopicType() function since spreading the resulting object x4 the time to parse a topicStr
switch (gossipTypeStr) {
case GossipType.beacon_block:
case GossipType.inclusion_list:
case GossipType.beacon_aggregate_and_proof:
case GossipType.voluntary_exit:
case GossipType.proposer_slashing:
Expand Down Expand Up @@ -212,6 +216,11 @@ export function getCoreTopicsAtFork(
{type: GossipType.attester_slashing},
];

// electra
if (ForkSeq[fork] >= ForkSeq.electra) {
topics.push({type: GossipType.inclusion_list});
}

// After Deneb also track blob_sidecar_{index}
if (ForkSeq[fork] >= ForkSeq.deneb) {
for (let index = 0; index < MAX_BLOBS_PER_BLOCK; index++) {
Expand Down Expand Up @@ -261,6 +270,7 @@ function parseEncodingStr(encodingStr: string): GossipEncoding {
// TODO: Review which yes, and which not
export const gossipTopicIgnoreDuplicatePublishError: Record<GossipType, boolean> = {
[GossipType.beacon_block]: true,
[GossipType.inclusion_list]: true,
[GossipType.blob_sidecar]: true,
[GossipType.beacon_aggregate_and_proof]: true,
[GossipType.beacon_attestation]: true,
Expand Down
Loading

0 comments on commit 40bd276

Please sign in to comment.