Skip to content

Commit

Permalink
refactor to a separate event and handler for unknownblock input
Browse files Browse the repository at this point in the history
  • Loading branch information
g11tech committed Mar 4, 2024
1 parent bfd1a9a commit 606e091
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 9 deletions.
5 changes: 4 additions & 1 deletion packages/beacon-node/src/network/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export enum NetworkEvent {
// TODO remove this event, this is not a network-level concern, rather a chain / sync concern
unknownBlockParent = "unknownBlockParent",
unknownBlock = "unknownBlock",
unknownBlockInput = "unknownBlockInput",

// Network processor events
/** (Network -> App) A gossip message is ready for validation */
Expand All @@ -30,7 +31,8 @@ export type NetworkEventData = {
[NetworkEvent.peerDisconnected]: {peer: PeerIdStr};
[NetworkEvent.reqRespRequest]: {request: RequestTypedContainer; peer: PeerId};
[NetworkEvent.unknownBlockParent]: {blockInput: BlockInput; peer: PeerIdStr};
[NetworkEvent.unknownBlock]: {blockInputOrRootHex: BlockInput | RootHex; peer?: PeerIdStr};
[NetworkEvent.unknownBlock]: {rootHex: RootHex; peer?: PeerIdStr};
[NetworkEvent.unknownBlockInput]: {blockInput: BlockInput; peer?: PeerIdStr};
[NetworkEvent.pendingGossipsubMessage]: PendingGossipsubMessage;
[NetworkEvent.gossipMessageValidationResult]: {
msgId: string;
Expand All @@ -45,6 +47,7 @@ export const networkEventDirection: Record<NetworkEvent, EventDirection> = {
[NetworkEvent.reqRespRequest]: EventDirection.none, // Only used internally in NetworkCore
[NetworkEvent.unknownBlockParent]: EventDirection.workerToMain,
[NetworkEvent.unknownBlock]: EventDirection.workerToMain,
[NetworkEvent.unknownBlockInput]: EventDirection.workerToMain,
[NetworkEvent.pendingGossipsubMessage]: EventDirection.workerToMain,
[NetworkEvent.gossipMessageValidationResult]: EventDirection.mainToWorker,
};
Expand Down
4 changes: 2 additions & 2 deletions packages/beacon-node/src/network/processor/gossipHandlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler
metrics?.registerBeaconBlock(OpSource.gossip, seenTimestampSec, signedBlock.message);
// if blobs are not yet fully available start an aggressive blob pull
if (blockInput.type === BlockInputType.blobsPromise) {
events.emit(NetworkEvent.unknownBlock, {blockInputOrRootHex: blockInput, peer: peerIdStr});
events.emit(NetworkEvent.unknownBlockInput, {blockInput: blockInput, peer: peerIdStr});
}

chain
Expand Down Expand Up @@ -284,7 +284,7 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler
const forkTypes = config.getForkTypes(slot);
const rootHex = toHexString(forkTypes.BeaconBlock.hashTreeRoot(signedBlock.message));

events.emit(NetworkEvent.unknownBlock, {blockInputOrRootHex: rootHex, peer: peerIdStr});
events.emit(NetworkEvent.unknownBlock, {rootHex: rootHex, peer: peerIdStr});

// Error is quite frequent and not critical
logLevel = LogLevel.debug;
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/network/processor/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ export class NetworkProcessor {
}
// Search for the unknown block
this.unknownRootsBySlot.getOrDefault(slot).add(root);
this.events.emit(NetworkEvent.unknownBlock, {blockInputOrRootHex: root, peer});
this.events.emit(NetworkEvent.unknownBlock, {rootHex: root, peer});
}

private onPendingGossipsubMessage(message: PendingGossipsubMessage): void {
Expand Down
2 changes: 2 additions & 0 deletions packages/beacon-node/src/sync/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,6 @@ export enum PendingBlockType {
* During gossip time, we may get a block but the parent root is unknown (not in forkchoice).
*/
UNKNOWN_PARENT = "unknown_parent",

UNKNOWN_BLOCKINPUT = "unknown_blockinput",
}
17 changes: 16 additions & 1 deletion packages/beacon-node/src/sync/unknownBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ export class UnknownBlockSync {
if (!this.subscribedToNetworkEvents) {
this.logger.verbose("UnknownBlockSync enabled.");
this.network.events.on(NetworkEvent.unknownBlock, this.onUnknownBlock);
this.network.events.on(NetworkEvent.unknownBlockInput, this.onUnknownBlockInput);
this.network.events.on(NetworkEvent.unknownBlockParent, this.onUnknownParent);
this.network.events.on(NetworkEvent.peerConnected, this.triggerUnknownBlockSearch);
this.subscribedToNetworkEvents = true;
Expand All @@ -74,6 +75,7 @@ export class UnknownBlockSync {
unsubscribeFromNetwork(): void {
this.logger.verbose("UnknownBlockSync disabled.");
this.network.events.off(NetworkEvent.unknownBlock, this.onUnknownBlock);
this.network.events.off(NetworkEvent.unknownBlockInput, this.onUnknownBlockInput);
this.network.events.off(NetworkEvent.unknownBlockParent, this.onUnknownParent);
this.network.events.off(NetworkEvent.peerConnected, this.triggerUnknownBlockSearch);
this.subscribedToNetworkEvents = false;
Expand All @@ -93,14 +95,27 @@ export class UnknownBlockSync {
*/
private onUnknownBlock = (data: NetworkEventData[NetworkEvent.unknownBlock]): void => {
try {
this.addUnknownBlock(data.blockInputOrRootHex, data.peer);
this.addUnknownBlock(data.rootHex, data.peer);
this.triggerUnknownBlockSearch();
this.metrics?.syncUnknownBlock.requests.inc({type: PendingBlockType.UNKNOWN_BLOCK});
} catch (e) {
this.logger.debug("Error handling unknownBlock event", {}, e as Error);
}
};

/**
* Process an unknownBlockInput event and register the block in `pendingBlocks` Map.
*/
private onUnknownBlockInput = (data: NetworkEventData[NetworkEvent.unknownBlockInput]): void => {
try {
this.addUnknownBlock(data.blockInput, data.peer);
this.triggerUnknownBlockSearch();
this.metrics?.syncUnknownBlock.requests.inc({type: PendingBlockType.UNKNOWN_BLOCKINPUT});
} catch (e) {
this.logger.debug("Error handling unknownBlockInput event", {}, e as Error);
}
};

/**
* Process an unknownBlockParent event and register the block in `pendingBlocks` Map.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
ReqRespMethod,
networkEventDirection,
} from "../../../../src/network/index.js";
import {BlockInputType, BlockSource} from "../../../../src/chain/blocks/types.js";
import {BlockInputType, BlockSource, BlockInputBlobs, BlockInput} from "../../../../src/chain/blocks/types.js";
import {ZERO_HASH, ZERO_HASH_HEX} from "../../../../src/constants/constants.js";
import {IteratorEventType} from "../../../../src/util/asyncIterableToEvents.js";
import {NetworkWorkerApi} from "../../../../src/network/core/index.js";
Expand Down Expand Up @@ -89,7 +89,11 @@ describe.skip("data serialization through worker boundary", function () {
peer,
},
[NetworkEvent.unknownBlock]: {
blockInputOrRootHex: ZERO_HASH_HEX,
rootHex: ZERO_HASH_HEX,
peer,
},
[NetworkEvent.unknownBlockInput]: {
blockInput: getEmptyBlockInput(),
peer,
},
[NetworkEvent.pendingGossipsubMessage]: {
Expand Down Expand Up @@ -240,3 +244,23 @@ describe.skip("data serialization through worker boundary", function () {
});

type Resolves<T extends Promise<unknown>> = T extends Promise<infer U> ? (U extends void ? null : U) : never;

function getEmptyBlockInput(): BlockInput {
let resolveAvailability: ((blobs: BlockInputBlobs) => void) | null = null;
const availabilityPromise = new Promise<BlockInputBlobs>((resolveCB) => {
resolveAvailability = resolveCB;
});
if (resolveAvailability === null) {
throw Error("Promise Constructor was not executed immediately");
}
const blobsCache = new Map();
return {
type: BlockInputType.blobsPromise,
block: ssz.deneb.SignedBeaconBlock.defaultValue(),
source: BlockSource.gossip,
blockBytes: ZERO_HASH,
blobsCache,
availabilityPromise,
resolveAvailability,
};
}
6 changes: 5 additions & 1 deletion packages/beacon-node/test/e2e/sync/unknownBlockSync.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ describe("sync / unknown block sync", function () {
id: "should do an unknown block sync from another BN",
event: NetworkEvent.unknownBlock,
},
{
id: "should do an unknown input sync from another BN",
event: NetworkEvent.unknownBlockInput,
},
];

for (const {id, event} of testCases) {
Expand Down Expand Up @@ -140,7 +144,7 @@ describe("sync / unknown block sync", function () {
break;
case NetworkEvent.unknownBlock:
bn2.network.events.emit(NetworkEvent.unknownBlock, {
blockInputOrRootHex: headSummary.blockRoot,
rootHex: headSummary.blockRoot,
peer: bn2.network.peerId.toString(),
});
break;
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/test/unit/sync/unknownBlock.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ describe("sync by UnknownBlockSync", () => {
peer,
});
} else {
network.events?.emit(NetworkEvent.unknownBlock, {blockInputOrRootHex: blockRootHexC, peer});
network.events?.emit(NetworkEvent.unknownBlock, {rootHex: blockRootHexC, peer});
}

if (wrongBlockRoot) {
Expand Down

0 comments on commit 606e091

Please sign in to comment.