From 606e091c2cb72718ee48c093d610731df0732ca1 Mon Sep 17 00:00:00 2001 From: harkamal Date: Mon, 4 Mar 2024 22:00:50 +0530 Subject: [PATCH] refactor to a separate event and handler for unknownblock input --- packages/beacon-node/src/network/events.ts | 5 +++- .../src/network/processor/gossipHandlers.ts | 4 +-- .../src/network/processor/index.ts | 2 +- packages/beacon-node/src/sync/interface.ts | 2 ++ packages/beacon-node/src/sync/unknownBlock.ts | 17 ++++++++++- .../onWorker/dataSerialization.test.ts | 28 +++++++++++++++++-- .../test/e2e/sync/unknownBlockSync.test.ts | 6 +++- .../test/unit/sync/unknownBlock.test.ts | 2 +- 8 files changed, 57 insertions(+), 9 deletions(-) diff --git a/packages/beacon-node/src/network/events.ts b/packages/beacon-node/src/network/events.ts index f5ba9bae504b..65c5d56fb808 100644 --- a/packages/beacon-node/src/network/events.ts +++ b/packages/beacon-node/src/network/events.ts @@ -17,6 +17,7 @@ export enum NetworkEvent { // TODO remove this event, this is not a network-level concern, rather a chain / sync concern unknownBlockParent = "unknownBlockParent", unknownBlock = "unknownBlock", + unknownBlockInput = "unknownBlockInput", // Network processor events /** (Network -> App) A gossip message is ready for validation */ @@ -30,7 +31,8 @@ export type NetworkEventData = { [NetworkEvent.peerDisconnected]: {peer: PeerIdStr}; [NetworkEvent.reqRespRequest]: {request: RequestTypedContainer; peer: PeerId}; [NetworkEvent.unknownBlockParent]: {blockInput: BlockInput; peer: PeerIdStr}; - [NetworkEvent.unknownBlock]: {blockInputOrRootHex: BlockInput | RootHex; peer?: PeerIdStr}; + [NetworkEvent.unknownBlock]: {rootHex: RootHex; peer?: PeerIdStr}; + [NetworkEvent.unknownBlockInput]: {blockInput: BlockInput; peer?: PeerIdStr}; [NetworkEvent.pendingGossipsubMessage]: PendingGossipsubMessage; [NetworkEvent.gossipMessageValidationResult]: { msgId: string; @@ -45,6 +47,7 @@ export const networkEventDirection: Record = { [NetworkEvent.reqRespRequest]: EventDirection.none, // Only used internally in NetworkCore [NetworkEvent.unknownBlockParent]: EventDirection.workerToMain, [NetworkEvent.unknownBlock]: EventDirection.workerToMain, + [NetworkEvent.unknownBlockInput]: EventDirection.workerToMain, [NetworkEvent.pendingGossipsubMessage]: EventDirection.workerToMain, [NetworkEvent.gossipMessageValidationResult]: EventDirection.mainToWorker, }; diff --git a/packages/beacon-node/src/network/processor/gossipHandlers.ts b/packages/beacon-node/src/network/processor/gossipHandlers.ts index fb653dcaa79b..95022e663158 100644 --- a/packages/beacon-node/src/network/processor/gossipHandlers.ts +++ b/packages/beacon-node/src/network/processor/gossipHandlers.ts @@ -244,7 +244,7 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler metrics?.registerBeaconBlock(OpSource.gossip, seenTimestampSec, signedBlock.message); // if blobs are not yet fully available start an aggressive blob pull if (blockInput.type === BlockInputType.blobsPromise) { - events.emit(NetworkEvent.unknownBlock, {blockInputOrRootHex: blockInput, peer: peerIdStr}); + events.emit(NetworkEvent.unknownBlockInput, {blockInput: blockInput, peer: peerIdStr}); } chain @@ -284,7 +284,7 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler const forkTypes = config.getForkTypes(slot); const rootHex = toHexString(forkTypes.BeaconBlock.hashTreeRoot(signedBlock.message)); - events.emit(NetworkEvent.unknownBlock, {blockInputOrRootHex: rootHex, peer: peerIdStr}); + events.emit(NetworkEvent.unknownBlock, {rootHex: rootHex, peer: peerIdStr}); // Error is quite frequent and not critical logLevel = LogLevel.debug; diff --git a/packages/beacon-node/src/network/processor/index.ts b/packages/beacon-node/src/network/processor/index.ts index 775bb1568d98..420360920518 100644 --- a/packages/beacon-node/src/network/processor/index.ts +++ b/packages/beacon-node/src/network/processor/index.ts @@ -238,7 +238,7 @@ export class NetworkProcessor { } // Search for the unknown block this.unknownRootsBySlot.getOrDefault(slot).add(root); - this.events.emit(NetworkEvent.unknownBlock, {blockInputOrRootHex: root, peer}); + this.events.emit(NetworkEvent.unknownBlock, {rootHex: root, peer}); } private onPendingGossipsubMessage(message: PendingGossipsubMessage): void { diff --git a/packages/beacon-node/src/sync/interface.ts b/packages/beacon-node/src/sync/interface.ts index c133391511e3..1c1ae1ceedf7 100644 --- a/packages/beacon-node/src/sync/interface.ts +++ b/packages/beacon-node/src/sync/interface.ts @@ -111,4 +111,6 @@ export enum PendingBlockType { * During gossip time, we may get a block but the parent root is unknown (not in forkchoice). */ UNKNOWN_PARENT = "unknown_parent", + + UNKNOWN_BLOCKINPUT = "unknown_blockinput", } diff --git a/packages/beacon-node/src/sync/unknownBlock.ts b/packages/beacon-node/src/sync/unknownBlock.ts index 8cdc13cdb93c..fa1a2b0f228a 100644 --- a/packages/beacon-node/src/sync/unknownBlock.ts +++ b/packages/beacon-node/src/sync/unknownBlock.ts @@ -62,6 +62,7 @@ export class UnknownBlockSync { if (!this.subscribedToNetworkEvents) { this.logger.verbose("UnknownBlockSync enabled."); this.network.events.on(NetworkEvent.unknownBlock, this.onUnknownBlock); + this.network.events.on(NetworkEvent.unknownBlockInput, this.onUnknownBlockInput); this.network.events.on(NetworkEvent.unknownBlockParent, this.onUnknownParent); this.network.events.on(NetworkEvent.peerConnected, this.triggerUnknownBlockSearch); this.subscribedToNetworkEvents = true; @@ -74,6 +75,7 @@ export class UnknownBlockSync { unsubscribeFromNetwork(): void { this.logger.verbose("UnknownBlockSync disabled."); this.network.events.off(NetworkEvent.unknownBlock, this.onUnknownBlock); + this.network.events.off(NetworkEvent.unknownBlockInput, this.onUnknownBlockInput); this.network.events.off(NetworkEvent.unknownBlockParent, this.onUnknownParent); this.network.events.off(NetworkEvent.peerConnected, this.triggerUnknownBlockSearch); this.subscribedToNetworkEvents = false; @@ -93,7 +95,7 @@ export class UnknownBlockSync { */ private onUnknownBlock = (data: NetworkEventData[NetworkEvent.unknownBlock]): void => { try { - this.addUnknownBlock(data.blockInputOrRootHex, data.peer); + this.addUnknownBlock(data.rootHex, data.peer); this.triggerUnknownBlockSearch(); this.metrics?.syncUnknownBlock.requests.inc({type: PendingBlockType.UNKNOWN_BLOCK}); } catch (e) { @@ -101,6 +103,19 @@ export class UnknownBlockSync { } }; + /** + * Process an unknownBlockInput event and register the block in `pendingBlocks` Map. + */ + private onUnknownBlockInput = (data: NetworkEventData[NetworkEvent.unknownBlockInput]): void => { + try { + this.addUnknownBlock(data.blockInput, data.peer); + this.triggerUnknownBlockSearch(); + this.metrics?.syncUnknownBlock.requests.inc({type: PendingBlockType.UNKNOWN_BLOCKINPUT}); + } catch (e) { + this.logger.debug("Error handling unknownBlockInput event", {}, e as Error); + } + }; + /** * Process an unknownBlockParent event and register the block in `pendingBlocks` Map. */ diff --git a/packages/beacon-node/test/e2e/network/onWorker/dataSerialization.test.ts b/packages/beacon-node/test/e2e/network/onWorker/dataSerialization.test.ts index 89834bc194e6..976df5d2ae4d 100644 --- a/packages/beacon-node/test/e2e/network/onWorker/dataSerialization.test.ts +++ b/packages/beacon-node/test/e2e/network/onWorker/dataSerialization.test.ts @@ -15,7 +15,7 @@ import { ReqRespMethod, networkEventDirection, } from "../../../../src/network/index.js"; -import {BlockInputType, BlockSource} from "../../../../src/chain/blocks/types.js"; +import {BlockInputType, BlockSource, BlockInputBlobs, BlockInput} from "../../../../src/chain/blocks/types.js"; import {ZERO_HASH, ZERO_HASH_HEX} from "../../../../src/constants/constants.js"; import {IteratorEventType} from "../../../../src/util/asyncIterableToEvents.js"; import {NetworkWorkerApi} from "../../../../src/network/core/index.js"; @@ -89,7 +89,11 @@ describe.skip("data serialization through worker boundary", function () { peer, }, [NetworkEvent.unknownBlock]: { - blockInputOrRootHex: ZERO_HASH_HEX, + rootHex: ZERO_HASH_HEX, + peer, + }, + [NetworkEvent.unknownBlockInput]: { + blockInput: getEmptyBlockInput(), peer, }, [NetworkEvent.pendingGossipsubMessage]: { @@ -240,3 +244,23 @@ describe.skip("data serialization through worker boundary", function () { }); type Resolves> = T extends Promise ? (U extends void ? null : U) : never; + +function getEmptyBlockInput(): BlockInput { + let resolveAvailability: ((blobs: BlockInputBlobs) => void) | null = null; + const availabilityPromise = new Promise((resolveCB) => { + resolveAvailability = resolveCB; + }); + if (resolveAvailability === null) { + throw Error("Promise Constructor was not executed immediately"); + } + const blobsCache = new Map(); + return { + type: BlockInputType.blobsPromise, + block: ssz.deneb.SignedBeaconBlock.defaultValue(), + source: BlockSource.gossip, + blockBytes: ZERO_HASH, + blobsCache, + availabilityPromise, + resolveAvailability, + }; +} diff --git a/packages/beacon-node/test/e2e/sync/unknownBlockSync.test.ts b/packages/beacon-node/test/e2e/sync/unknownBlockSync.test.ts index 7507bd9e41b2..9df5cb079f87 100644 --- a/packages/beacon-node/test/e2e/sync/unknownBlockSync.test.ts +++ b/packages/beacon-node/test/e2e/sync/unknownBlockSync.test.ts @@ -43,6 +43,10 @@ describe("sync / unknown block sync", function () { id: "should do an unknown block sync from another BN", event: NetworkEvent.unknownBlock, }, + { + id: "should do an unknown input sync from another BN", + event: NetworkEvent.unknownBlockInput, + }, ]; for (const {id, event} of testCases) { @@ -140,7 +144,7 @@ describe("sync / unknown block sync", function () { break; case NetworkEvent.unknownBlock: bn2.network.events.emit(NetworkEvent.unknownBlock, { - blockInputOrRootHex: headSummary.blockRoot, + rootHex: headSummary.blockRoot, peer: bn2.network.peerId.toString(), }); break; diff --git a/packages/beacon-node/test/unit/sync/unknownBlock.test.ts b/packages/beacon-node/test/unit/sync/unknownBlock.test.ts index 736347d690e2..a0a8be65dd28 100644 --- a/packages/beacon-node/test/unit/sync/unknownBlock.test.ts +++ b/packages/beacon-node/test/unit/sync/unknownBlock.test.ts @@ -192,7 +192,7 @@ describe("sync by UnknownBlockSync", () => { peer, }); } else { - network.events?.emit(NetworkEvent.unknownBlock, {blockInputOrRootHex: blockRootHexC, peer}); + network.events?.emit(NetworkEvent.unknownBlock, {rootHex: blockRootHexC, peer}); } if (wrongBlockRoot) {