Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement peerDAS on electra #6353

Draft
wants to merge 57 commits into
base: unstable
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
4805a2e
feat: placeholder PR for electra
g11tech Jan 24, 2024
499d93c
feat: implement peerDAS on electra
g11tech Jan 24, 2024
156ef53
fix: docker build issue for c-kzg
matthewkeil Jun 21, 2024
47eedae
feat: get various sync mechanisms working with/without sharded data
g11tech Jul 14, 2024
d423004
feat: add the modifications to work with devnet2
g11tech Jul 14, 2024
a0c5d27
fix: refactor to add and use nodeid computation and clear out nodeid …
g11tech Jul 16, 2024
c7f6341
fix the types/test
g11tech Aug 9, 2024
81aaeb5
feat: add and use metadatav3 for peer custody subnet
g11tech Aug 12, 2024
e6c613f
rename electra fork to peerdas for rebase and make csc in metadata uint8
g11tech Aug 27, 2024
a3533f8
add supernode flag to configure node custody requirement and make it …
g11tech Aug 27, 2024
54579b0
add more info for debugging
g11tech Aug 27, 2024
180f7d8
fix log
g11tech Aug 27, 2024
4b6f167
fix bug
g11tech Aug 27, 2024
ae7678e
fx
g11tech Aug 27, 2024
a33a72f
subnet count 128
g11tech Aug 27, 2024
585165e
remove banning unknown block, addmore log
g11tech Aug 28, 2024
2833ac0
make the csc encoding updates as per latest spec
g11tech Sep 5, 2024
bf08852
resolve availability when datacolumns are downloaded and matched
g11tech Sep 7, 2024
006e781
add debug log
g11tech Sep 7, 2024
aece0ab
fix add missing data availability resolutions
g11tech Sep 10, 2024
387da88
add more log
g11tech Sep 10, 2024
2bc1a0d
add cache tracking
g11tech Sep 10, 2024
5e1de6f
trying some fix
g11tech Sep 10, 2024
d7721f8
fix bug
g11tech Sep 10, 2024
bd84892
more log
g11tech Sep 10, 2024
de341b5
add send more log
g11tech Sep 10, 2024
8c21168
make pull a little less agressive
g11tech Sep 10, 2024
d35873e
further wait till cutoff for all data to be available
g11tech Sep 11, 2024
f7571f4
add some more loggig and availaibility tracking
g11tech Sep 11, 2024
74d8122
add some log for debugging inbound data columns request
g11tech Sep 11, 2024
af933fb
some fixes
g11tech Sep 11, 2024
56c8c6e
custodied column fetch debugging log
g11tech Sep 12, 2024
2b10e4d
datacolumns retrival fix
g11tech Sep 12, 2024
cdd9bae
update compute spec tests
g11tech Sep 12, 2024
c4d04ee
fix the column id compute
g11tech Sep 13, 2024
3470076
more debug log
g11tech Sep 13, 2024
4ec7aff
edge case optimization
g11tech Sep 13, 2024
a33303f
feat: refactor and unit test getDataColumnSidecars (#7072)
matthewkeil Sep 16, 2024
bd4f7f9
feat: update ckzg to final DAS version (#7050)
matthewkeil Sep 16, 2024
b1940ee
fix: remove ckzg build script (#7089)
matthewkeil Sep 17, 2024
20ef4c6
feat: validate data column sidecars (#7073)
matthewkeil Sep 17, 2024
fee7c08
validate inclusion proof
g11tech Sep 17, 2024
574837a
use sample subnets for data availability
g11tech Sep 21, 2024
6a77828
add debug console log
g11tech Sep 21, 2024
cec27d6
handle edge case
g11tech Sep 21, 2024
a3de70f
turn persisting network identity to default true
g11tech Sep 26, 2024
cce193b
improve logging for debugging
g11tech Oct 1, 2024
2736b8c
add enhance datacolumn serving logs
g11tech Oct 1, 2024
a0e0087
more log
g11tech Oct 1, 2024
b04aaef
migrate datacolumns to finalized
g11tech Oct 2, 2024
7c9a01c
add debug and fix datacolumns migration and improve log
g11tech Oct 2, 2024
1c08ab3
some fixing of beacon params
g11tech Oct 2, 2024
513bccc
fix
g11tech Oct 2, 2024
fccf9a2
add prevdownload tracker
g11tech Oct 8, 2024
8689c76
feat: check for no commitments on block or column in sidecar validati…
matthewkeil Oct 22, 2024
c8075d0
feat: log peer disconnect info (#7231)
matthewkeil Nov 25, 2024
e8bc729
refactor: peerdas types (#7243)
matthewkeil Dec 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
improve logging for debugging
  • Loading branch information
g11tech committed Oct 1, 2024
commit cce193bbca331a5b995fbf39cc463c063ac91679
7 changes: 6 additions & 1 deletion packages/beacon-node/src/network/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ export enum NetworkEvent {
}

export type NetworkEventData = {
[NetworkEvent.peerConnected]: {peer: PeerIdStr; status: phase0.Status; dataColumns: ColumnIndex[]};
[NetworkEvent.peerConnected]: {
peer: PeerIdStr;
status: phase0.Status;
dataColumns: ColumnIndex[];
clientAgent: string;
};
[NetworkEvent.peerDisconnected]: {peer: PeerIdStr};
[NetworkEvent.reqRespRequest]: {request: RequestTypedContainer; peer: PeerId};
[NetworkEvent.unknownBlockParent]: {blockInput: BlockInput; peer: PeerIdStr};
Expand Down
1 change: 1 addition & 0 deletions packages/beacon-node/src/network/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ export interface INetwork extends INetworkCorePublic {

getConnectedPeers(): PeerIdStr[];
getConnectedPeerCustody(peerId: PeerIdStr): number[];
getConnectedPeerClientAgent(peerId: PeerIdStr): string;
getConnectedPeerCount(): number;
isSubscribedToGossipCoreTopics(): boolean;
reportPeer(peer: PeerIdStr, action: PeerAction, actionName: string): void;
Expand Down
10 changes: 10 additions & 0 deletions packages/beacon-node/src/network/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ export class Network implements INetwork {

private subscribedToCoreTopics = false;
private connectedPeers = new Map<PeerIdStr, ColumnIndex[]>();
private connectedPeerClients = new Map<PeerIdStr, string>();
private regossipBlsChangesPromise: Promise<void> | null = null;

constructor(modules: NetworkModules) {
Expand Down Expand Up @@ -278,6 +279,14 @@ export class Network implements INetwork {

return columns;
}
getConnectedPeerClientAgent(peerId: PeerIdStr): string {
const clientAgent = this.connectedPeerClients.get(peerId);
if (clientAgent === undefined) {
throw Error("clientAgent not in connectedPeerClients");
}

return clientAgent;
}
getConnectedPeerCount(): number {
return this.connectedPeers.size;
}
Expand Down Expand Up @@ -686,6 +695,7 @@ export class Network implements INetwork {
private onPeerConnected = (data: NetworkEventData[NetworkEvent.peerConnected]): void => {
this.logger.warn("onPeerConnected", {peer: data.peer, dataColumns: data.dataColumns.join(",")});
this.connectedPeers.set(data.peer, data.dataColumns);
this.connectedPeerClients.set(data.peer, data.clientAgent);
};

private onPeerDisconnected = (data: NetworkEventData[NetworkEvent.peerDisconnected]): void => {
Expand Down
19 changes: 14 additions & 5 deletions packages/beacon-node/src/network/peers/peerManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,11 @@ export class PeerManager {
// Store metadata always in case the peer updates attnets but not the sequence number
// Trust that the peer always sends the latest metadata (From Lighthouse)
const peerData = this.connectedPeers.get(peer.toString());
this.logger.warn("onMetadata", {peer: peer.toString(), peerData: peerData !== undefined});
console.log("onMetadata", metadata);
this.logger.warn("onMetadata", {
peer: peer.toString(),
peerData: peerData !== undefined,
csc: (metadata as Partial<peerdas.Metadata>).csc,
});
if (peerData) {
const oldMetadata = peerData.metadata;
peerData.metadata = {
Expand Down Expand Up @@ -407,10 +410,9 @@ export class PeerManager {
}
if (getConnection(this.libp2p, peer.toString())) {
const nodeId = peerData?.nodeId ?? computeNodeId(peer);
console.log("onStatus", peerData?.metadata);
const custodySubnetCount = peerData?.metadata?.csc;

const peerCustodySubnetCount = custodySubnetCount ?? 4;
const peerCustodySubnetCount = custodySubnetCount ?? this.config.CUSTODY_REQUIREMENT;
const peerCustodySubnets = getDataColumnSubnets(nodeId, peerCustodySubnetCount);

const matchingSubnetsNum = this.sampleSubnets.reduce(
Expand All @@ -419,6 +421,7 @@ export class PeerManager {
);
const hasAllColumns = matchingSubnetsNum === this.sampleSubnets.length;
const hasMinCustodyMatchingColumns = matchingSubnetsNum >= this.config.CUSTODY_REQUIREMENT;
const clientAgent = peerData?.agentClient ?? ClientKind.Unknown;

this.logger.warn(`onStatus ${custodySubnetCount == undefined ? "undefined custody count assuming 4" : ""}`, {
nodeId: toHexString(nodeId),
Expand All @@ -428,6 +431,7 @@ export class PeerManager {
hasAllColumns,
peerCustodySubnets: peerCustodySubnets.join(","),
mySampleSubnets: this.sampleSubnets.join(","),
clientAgent,
});

if (this.opts.onlyConnectToBiggerDataNodes && !hasAllColumns) {
Expand All @@ -448,7 +452,12 @@ export class PeerManager {

// coule be optimized by directly using the previously calculated subnet
const dataColumns = getDataColumns(nodeId, peerCustodySubnetCount);
this.networkEventBus.emit(NetworkEvent.peerConnected, {peer: peer.toString(), status, dataColumns});
this.networkEventBus.emit(NetworkEvent.peerConnected, {
peer: peer.toString(),
status,
dataColumns,
clientAgent: `${clientAgent}-csc:${peerCustodySubnets.length}`,
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {ChainForkConfig} from "@lodestar/config";
import {deneb, Epoch, phase0, SignedBeaconBlock, Slot, peerdas, ssz} from "@lodestar/types";
import {ForkSeq, NUMBER_OF_COLUMNS, ForkName} from "@lodestar/params";
import {computeEpochAtSlot} from "@lodestar/state-transition";
import {Logger} from "@lodestar/utils";

import {
BlobsSource,
Expand All @@ -28,7 +29,9 @@ export async function beaconBlocksMaybeBlobsByRange(
peerId: PeerIdStr,
request: phase0.BeaconBlocksByRangeRequest,
currentEpoch: Epoch,
partialDownload: PartialDownload
partialDownload: PartialDownload,
peerClient: string,
logger?: Logger
): Promise<{blocks: BlockInput[]; pendingDataColumns: null | number[]}> {
// Code below assumes the request is in the same epoch
// Range sync satisfies this condition, but double check here for sanity
Expand Down Expand Up @@ -108,9 +111,12 @@ export async function beaconBlocksMaybeBlobsByRange(
: network.sendBeaconBlocksByRange(peerId, request),
columns.length === 0 ? [] : network.sendDataColumnSidecarsByRange(peerId, dataColumnRequest),
]);
console.log({
beaconBlocksRequest: ssz.phase0.BeaconBlocksByRangeRequest.toJson(request),
dataColumnRequest: ssz.peerdas.DataColumnSidecarsByRangeRequest.toJson(dataColumnRequest),
logger?.debug("ByRange requests", {
beaconBlocksRequest: JSON.stringify(ssz.phase0.BeaconBlocksByRangeRequest.toJson(request)),
dataColumnRequest: JSON.stringify(ssz.peerdas.DataColumnSidecarsByRangeRequest.toJson(dataColumnRequest)),
peerColumns: peerColumns.join(","),
peerId,
peerClient,
});

const blocks = matchBlockWithDataColumns(
Expand All @@ -124,7 +130,9 @@ export async function beaconBlocksMaybeBlobsByRange(
endSlot,
BlockSource.byRange,
DataColumnsSource.byRange,
partialDownload
partialDownload,
peerClient,
logger
);

return {blocks, pendingDataColumns: pendingDataColumns.length > 0 ? pendingDataColumns : null};
Expand Down Expand Up @@ -222,7 +230,9 @@ export function matchBlockWithDataColumns(
endSlot: Slot,
blockSource: BlockSource,
dataColumnsSource: DataColumnsSource,
prevPartialDownload: null | PartialDownload
prevPartialDownload: null | PartialDownload,
peerClient: string,
logger?: Logger
): BlockInput[] {
const blockInputs: BlockInput[] = [];
let dataColumnSideCarIndex = 0;
Expand Down Expand Up @@ -256,12 +266,15 @@ export function matchBlockWithDataColumns(
}

const blobKzgCommitmentsLen = (block.data.message.body as deneb.BeaconBlockBody).blobKzgCommitments.length;
console.log("matchBlockWithDataColumns", {
logger?.debug("processing matchBlockWithDataColumns", {
blobKzgCommitmentsLen,
dataColumnSidecars: dataColumnSidecars.length,
shouldHaveAllData,
neededColumns,
requestedColumns,
neededColumns: neededColumns.join(","),
requestedColumns: requestedColumns.join(","),
slot: block.data.message.slot,
dataColumnsSlots: dataColumnSidecars.map((dcm) => dcm.signedBlockHeader.message.slot).join(","),
peerClient,
});
if (blobKzgCommitmentsLen === 0) {
if (dataColumnSidecars.length > 0) {
Expand All @@ -285,18 +298,23 @@ export function matchBlockWithDataColumns(
true
);

console.log("matchBlockWithDataColumns", {dataColumnIndexes, requestedColumnsPresent});
logger?.debug("matchBlockWithDataColumns2", {
dataColumnIndexes: dataColumnIndexes.join(","),
requestedColumnsPresent,
slot: block.data.message.slot,
peerClient,
});

if (dataColumnSidecars.length !== requestedColumns.length || !requestedColumnsPresent) {
console.log(
"matchBlockWithDataColumns",
logger?.debug(
`Missing or mismatching dataColumnSidecars from peerId=${peerId} for blockSlot=${block.data.message.slot} with numColumns=${sampledColumns.length} dataColumnSidecars=${dataColumnSidecars.length} requestedColumnsPresent=${requestedColumnsPresent} received dataColumnIndexes=${dataColumnIndexes.join(",")} requested=${requestedColumns.join(",")}`,
{
allBlocks: allBlocks.length,
allDataColumnSidecars: allDataColumnSidecars.length,
peerId,
nodeId: toHexString(computeNodeId(peerId)),
blobKzgCommitmentsLen,
peerClient,
}
);
throw Error(
Expand Down Expand Up @@ -361,6 +379,9 @@ export function matchBlockWithDataColumns(
.join(",")}`
);
}
console.log("matchedBlockWithDataColumns", blockInputs);
logger?.debug("matched BlockWithDataColumns", {
peerClient,
blockInputs: blockInputs.map((bInpt) => `${bInpt.block.message.slot}=${bInpt.type}`).join(" "),
});
return blockInputs;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {fromHexString, toHexString} from "@chainsafe/ssz";
import {ChainForkConfig} from "@lodestar/config";
import {phase0, deneb, peerdas, ssz} from "@lodestar/types";
import {ForkName, ForkSeq, NUMBER_OF_COLUMNS} from "@lodestar/params";
import {Logger} from "@lodestar/utils";
import {
BlockInput,
BlockInputType,
Expand All @@ -25,14 +26,16 @@ export async function beaconBlocksMaybeBlobsByRoot(
network: INetwork,
peerId: PeerIdStr,
request: phase0.BeaconBlocksByRootRequest,
partialDownload: null | PartialDownload
partialDownload: null | PartialDownload,
peerClient: string,
logger?: Logger
): Promise<{blocks: BlockInput[]; pendingDataColumns: null | number[]}> {
console.log("beaconBlocksMaybeBlobsByRoot", request);
const allBlocks = partialDownload
? partialDownload.blocks.map((blockInput) => ({data: blockInput.block, bytes: blockInput.blockBytes!}))
: await network.sendBeaconBlocksByRoot(peerId, request);

console.log("beaconBlocksMaybeBlobsByRoot response", {allBlocks: allBlocks.length});
logger?.debug("beaconBlocksMaybeBlobsByRoot response", {allBlocks: allBlocks.length, peerClient});

const preDataBlocks = [];
const blobsDataBlocks = [];
Expand Down Expand Up @@ -69,7 +72,7 @@ export async function beaconBlocksMaybeBlobsByRoot(
} else if (fork === ForkName.deneb) {
blobsDataBlocks.push(block);
const blobKzgCommitmentsLen = (block.data.message.body as deneb.BeaconBlockBody).blobKzgCommitments.length;
console.log("beaconBlocksMaybeBlobsByRoot", {blobKzgCommitmentsLen});
logger?.debug("beaconBlocksMaybeBlobsByRoot", {blobKzgCommitmentsLen, peerClient});
for (let index = 0; index < blobKzgCommitmentsLen; index++) {
blobIdentifiers.push({blockRoot, index});
}
Expand Down Expand Up @@ -119,7 +122,13 @@ export async function beaconBlocksMaybeBlobsByRoot(
}, [] as number[]);

let allDataColumnsSidecars: peerdas.DataColumnSidecar[];
console.log("allDataColumnsSidecars", partialDownload, dataColumnIdentifiers);
logger?.debug("allDataColumnsSidecars partialDownload", {
...(partialDownload
? {blocks: partialDownload.blocks.length, pendingDataColumns: partialDownload.pendingDataColumns.join(",")}
: {blocks: null, pendingDataColumns: null}),
dataColumnIdentifiers: dataColumnIdentifiers.map((did) => did.index).join(","),
peerClient,
});
if (dataColumnIdentifiers.length > 0) {
allDataColumnsSidecars = await network.sendDataColumnSidecarsByRoot(peerId, dataColumnIdentifiers);
} else {
Expand All @@ -142,7 +151,9 @@ export async function beaconBlocksMaybeBlobsByRoot(
Infinity,
BlockSource.byRoot,
DataColumnsSource.byRoot,
partialDownload
partialDownload,
peerClient,
logger
);
blockInputs = [...blockInputs, ...blockInputWithBlobs];
}
Expand All @@ -158,7 +169,9 @@ export async function unavailableBeaconBlobsByRoot(
network: INetwork,
peerId: PeerIdStr,
unavailableBlockInput: BlockInput | NullBlockInput,
metrics: Metrics | null
metrics: Metrics | null,
peerClient: string,
logger?: Logger
): Promise<BlockInput> {
if (unavailableBlockInput.block !== null && unavailableBlockInput.type !== BlockInputType.dataPromise) {
return unavailableBlockInput;
Expand Down Expand Up @@ -308,7 +321,9 @@ export async function unavailableBeaconBlobsByRoot(
DataColumnsSource.byRoot,
unavailableBlockInput.block !== null
? {blocks: [unavailableBlockInput], pendingDataColumns: neededColumns}
: null
: null,
peerClient,
logger
);

// don't forget to resolve availability as the block may be stuck in availability wait
Expand Down
12 changes: 7 additions & 5 deletions packages/beacon-node/src/sync/range/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ export type SyncChainFns = {
downloadBeaconBlocksByRange: (
peer: PeerIdStr,
request: phase0.BeaconBlocksByRangeRequest,
partialDownload: PartialDownload
partialDownload: PartialDownload,
peerClient: string
) => Promise<{blocks: BlockInput[]; pendingDataColumns: null | number[]}>;
/** Report peer for negative actions. Decouples from the full network instance */
reportPeer: (peer: PeerIdStr, action: PeerAction, actionName: string) => void;
Expand Down Expand Up @@ -113,7 +114,7 @@ export class SyncChain {
/** Sorted map of batches undergoing some kind of processing. */
private readonly batches = new Map<Epoch, Batch>();
private readonly peerset = new Map<PeerIdStr, ChainTarget>();
private readonly peersetCustody = new Map<PeerIdStr, {custodyColumns: number[]}>();
private readonly peersetCustody = new Map<PeerIdStr, {custodyColumns: number[]; clientAgent: string}>();

private readonly logger: Logger;
private readonly config: ChainForkConfig;
Expand Down Expand Up @@ -195,9 +196,9 @@ export class SyncChain {
/**
* Add peer to the chain and request batches if active
*/
addPeer(peer: PeerIdStr, target: ChainTarget, custodyColumns: number[]): void {
addPeer(peer: PeerIdStr, target: ChainTarget, custodyColumns: number[], clientAgent: string): void {
this.peerset.set(peer, target);
this.peersetCustody.set(peer, {custodyColumns});
this.peersetCustody.set(peer, {custodyColumns, clientAgent});
this.computeTarget();
this.triggerBatchDownloader();
}
Expand Down Expand Up @@ -403,9 +404,10 @@ export class SyncChain {
private async sendBatch(batch: Batch, peer: PeerIdStr): Promise<void> {
try {
const partialDownload = batch.startDownloading(peer);
const peerClient = this.peersetCustody.get(peer)?.clientAgent ?? "unknown";

// wrapError ensures to never call both batch success() and batch error()
const res = await wrapError(this.downloadBeaconBlocksByRange(peer, batch.request, partialDownload));
const res = await wrapError(this.downloadBeaconBlocksByRange(peer, batch.request, partialDownload, peerClient));

if (!res.err) {
const blocks = batch.downloadingSuccess(res.result);
Expand Down
14 changes: 11 additions & 3 deletions packages/beacon-node/src/sync/range/range.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,15 +204,18 @@ export class RangeSync extends (EventEmitter as {new (): RangeSyncEmitter}) {
private downloadBeaconBlocksByRange: SyncChainFns["downloadBeaconBlocksByRange"] = async (
peerId,
request,
partialDownload
partialDownload,
peerClient
) => {
return beaconBlocksMaybeBlobsByRange(
this.config,
this.network,
peerId,
request,
this.chain.clock.currentEpoch,
partialDownload
partialDownload,
peerClient,
this.logger
);
};

Expand Down Expand Up @@ -257,7 +260,12 @@ export class RangeSync extends (EventEmitter as {new (): RangeSyncEmitter}) {
});
}

syncChain.addPeer(peer, target, this.network.getConnectedPeerCustody(peer));
syncChain.addPeer(
peer,
target,
this.network.getConnectedPeerCustody(peer),
this.network.getConnectedPeerClientAgent(peer)
);
}

private update(localFinalizedEpoch: Epoch): void {
Expand Down
Loading