Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: refactor event loop interactions #6806

Merged
merged 5 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {kzgCommitmentToVersionedHash} from "../../util/blobs.js";
import {ChainEvent, ReorgEventData} from "../emitter.js";
import {REPROCESS_MIN_TIME_TO_NEXT_SLOT_SEC} from "../reprocess.js";
import type {BeaconChain} from "../chain.js";
import {callInNextEventLoop} from "../../util/eventLoop.js";
import {FullyVerifiedBlock, ImportBlockOpts, AttestationImportOpt, BlockInputType} from "./types.js";
import {getCheckpointFromState} from "./utils/checkpoint.js";
import {writeBlockInputToDb} from "./writeBlockInputToDb.js";
Expand Down Expand Up @@ -100,7 +101,7 @@ export async function importBlock(
this.logger.verbose("Added block to forkchoice and state cache", {slot: blockSlot, root: blockRootHex});

// We want to import block asap so call all event handler in the next event loop
setTimeout(async () => {
callInNextEventLoop(async () => {
this.emitter.emit(routes.events.EventType.block, {
block: blockRootHex,
slot: blockSlot,
Expand All @@ -125,7 +126,7 @@ export async function importBlock(
});
}
}
}, 0);
});

// 3. Import attestations to fork choice
//
Expand Down Expand Up @@ -301,7 +302,7 @@ export async function importBlock(
// - Use block's syncAggregate
if (blockEpoch >= this.config.ALTAIR_FORK_EPOCH) {
// we want to import block asap so do this in the next event loop
setTimeout(() => {
callInNextEventLoop(() => {
try {
this.lightClientServer.onImportBlockHead(
block.message as allForks.AllForksLightClient["BeaconBlock"],
Expand All @@ -311,7 +312,7 @@ export async function importBlock(
} catch (e) {
this.logger.verbose("Error lightClientServer.onImportBlock", {slot: blockSlot}, e as Error);
}
}, 0);
});
}
}

Expand Down Expand Up @@ -455,9 +456,9 @@ export async function importBlock(

// Gossip blocks need to be imported as soon as possible, waiting attestations could be processed
// in the next event loop. See https://github.com/ChainSafe/lodestar/issues/4789
setTimeout(() => {
callInNextEventLoop(() => {
this.reprocessController.onBlockImported({slot: blockSlot, root: blockRootHex}, advancedSlot);
}, 0);
});

if (opts.seenTimestampSec !== undefined) {
const recvToValidation = Date.now() / 1000 - opts.seenTimestampSec;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import {CachedBeaconStateAllForks, getBlockSignatureSets} from "@lodestar/state-transition";
import {allForks} from "@lodestar/types";
import {Logger, sleep} from "@lodestar/utils";
import {Logger} from "@lodestar/utils";
import {Metrics} from "../../metrics/metrics.js";
import {IBlsVerifier} from "../bls/index.js";
import {BlockError, BlockErrorCode} from "../errors/blockError.js";
import {nextEventLoop} from "../../util/eventLoop.js";
import {ImportBlockOpts} from "./types.js";

/**
Expand Down Expand Up @@ -40,10 +41,10 @@ export async function verifyBlocksSignatures(
);

// getBlockSignatureSets() takes 45ms in benchmarks for 2022Q2 mainnet blocks (100 sigs). When syncing a 32 blocks
// segments it will block the event loop for 1400 ms, which is too much. This sleep will allow the event loop to
// segments it will block the event loop for 1400 ms, which is too much. This call will allow the event loop to
// yield, which will cause one block's state transition to run. However, the tradeoff is okay and doesn't slow sync
if ((i + 1) % 8 === 0) {
await sleep(0);
await nextEventLoop();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import {
DataAvailableStatus,
StateHashTreeRootSource,
} from "@lodestar/state-transition";
import {ErrorAborted, Logger, sleep} from "@lodestar/utils";
import {ErrorAborted, Logger} from "@lodestar/utils";
import {Metrics} from "../../metrics/index.js";
import {BlockError, BlockErrorCode} from "../errors/index.js";
import {BlockProcessOpts} from "../options.js";
import {byteArrayEquals} from "../../util/bytes.js";
import {nextEventLoop} from "../../util/eventLoop.js";
import {BlockInput, ImportBlockOpts} from "./types.js";

/**
Expand Down Expand Up @@ -90,7 +91,7 @@ export async function verifyBlocksStateTransitionOnly(

// this avoids keeping our node busy processing blocks
if (i < blocks.length - 1) {
await sleep(0);
await nextEventLoop();
}
}

Expand Down
7 changes: 4 additions & 3 deletions packages/beacon-node/src/chain/bls/multiThread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {ISignatureSet} from "@lodestar/state-transition";
import {QueueError, QueueErrorCode} from "../../util/queue/index.js";
import {Metrics} from "../../metrics/index.js";
import {LinkedList} from "../../util/array.js";
import {callInNextEventLoop} from "../../util/eventLoop.js";
import {IBlsVerifier, VerifySignatureOpts} from "./interface.js";
import {getAggregatedPubkey, getAggregatedPubkeysCount, getJobResultError} from "./utils.js";
import {verifySets} from "./verifySets.js";
Expand Down Expand Up @@ -262,7 +263,7 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
} else {
this.jobs.push(job);
}
setTimeout(this.runJob, 0);
callInNextEventLoop(this.runJob);
}
}

Expand Down Expand Up @@ -413,7 +414,7 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
this.workersBusy--;

// Potentially run a new job
setTimeout(this.runJob, 0);
callInNextEventLoop(this.runJob);
};

/**
Expand Down Expand Up @@ -448,7 +449,7 @@ export class BlsMultiThreadWorkerPool implements IBlsVerifier {
this.jobs.unshift(job);
}
this.bufferedJobs = null;
setTimeout(this.runJob, 0);
callInNextEventLoop(this.runJob);
}
};

Expand Down
7 changes: 4 additions & 3 deletions packages/beacon-node/src/chain/regen/regen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ import {
stateTransition,
} from "@lodestar/state-transition";
import {IForkChoice, ProtoBlock} from "@lodestar/fork-choice";
import {Logger, sleep} from "@lodestar/utils";
import {Logger} from "@lodestar/utils";
import {SLOTS_PER_EPOCH} from "@lodestar/params";
import {ChainForkConfig} from "@lodestar/config";
import {Metrics} from "../../metrics/index.js";
import {IBeaconDb} from "../../db/index.js";
import {getCheckpointFromState} from "../blocks/utils/checkpoint.js";
import {ChainEvent, ChainEventEmitter} from "../emitter.js";
import {CheckpointStateCache, BlockStateCache} from "../stateCache/types.js";
import {nextEventLoop} from "../../util/eventLoop.js";
import {IStateRegeneratorInternal, RegenCaller, StateCloneOpts} from "./interface.js";
import {RegenError, RegenErrorCode} from "./errors.js";

Expand Down Expand Up @@ -239,7 +240,7 @@ export class StateRegenerator implements IStateRegeneratorInternal {
}

// this avoids keeping our node busy processing blocks
await sleep(0);
await nextEventLoop();
} catch (e) {
throw new RegenError({
code: RegenErrorCode.STATE_TRANSITION_ERROR,
Expand Down Expand Up @@ -325,7 +326,7 @@ async function processSlotsToNearestCheckpoint(
emitter.emit(ChainEvent.checkpoint, cp, checkpointState.clone(true));

// this avoids keeping our node busy processing blocks
await sleep(0);
await nextEventLoop();
}
return postState;
}
9 changes: 5 additions & 4 deletions packages/beacon-node/src/network/gossip/gossipsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {ClientKind} from "../peers/client.js";
import {GOSSIP_MAX_SIZE, GOSSIP_MAX_SIZE_BELLATRIX} from "../../constants/network.js";
import {Libp2p} from "../interface.js";
import {NetworkEvent, NetworkEventBus, NetworkEventData} from "../events.js";
import {callInNextEventLoop} from "../../util/eventLoop.js";
import {GossipTopic, GossipType} from "./interface.js";
import {GossipTopicCache, stringifyGossipTopic, getCoreTopicsAtFork} from "./topic.js";
import {DataTransformSnappy, fastMsgIdFn, msgIdFn, msgIdToStrFn} from "./encoding.js";
Expand Down Expand Up @@ -284,7 +285,7 @@ export class Eth2Gossipsub extends GossipSub {
// Use setTimeout to yield to the macro queue
// Without this we'll have huge event loop lag
// See https://github.com/ChainSafe/lodestar/issues/5604
setTimeout(() => {
callInNextEventLoop(() => {
this.events.emit(NetworkEvent.pendingGossipsubMessage, {
topic,
msg,
Expand All @@ -294,16 +295,16 @@ export class Eth2Gossipsub extends GossipSub {
seenTimestampSec,
startProcessUnixSec: null,
});
}, 0);
});
}

private onValidationResult(data: NetworkEventData[NetworkEvent.gossipMessageValidationResult]): void {
// Use setTimeout to yield to the macro queue
// Without this we'll have huge event loop lag
// See https://github.com/ChainSafe/lodestar/issues/5604
setTimeout(() => {
callInNextEventLoop(() => {
this.reportMessageValidationResult(data.msgId, data.propagationSource, data.acceptance);
}, 0);
});
}
}

Expand Down
9 changes: 5 additions & 4 deletions packages/beacon-node/src/network/processor/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
GossipValidatorFn,
} from "../gossip/interface.js";
import {PeerIdStr} from "../peers/index.js";
import {callInNextEventLoop} from "../../util/eventLoop.js";
import {createGossipQueues} from "./gossipQueues/index.js";
import {PendingGossipsubMessage} from "./types.js";
import {ValidatorFnsModules, GossipHandlerOpts, getGossipHandlers} from "./gossipHandlers.js";
Expand Down Expand Up @@ -452,22 +453,22 @@ export class NetworkProcessor {

if (Array.isArray(messageOrArray)) {
for (const [i, msg] of messageOrArray.entries()) {
setTimeout(() => {
callInNextEventLoop(() => {
this.events.emit(NetworkEvent.gossipMessageValidationResult, {
msgId: msg.msgId,
propagationSource: msg.propagationSource,
acceptance: acceptanceArr[i],
});
}, 0);
});
}
} else {
setTimeout(() => {
callInNextEventLoop(() => {
this.events.emit(NetworkEvent.gossipMessageValidationResult, {
msgId: messageOrArray.msgId,
propagationSource: messageOrArray.propagationSource,
acceptance: acceptanceArr[0],
});
}, 0);
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {PeersData} from "../peers/peersData.js";
import {IPeerRpcScoreStore, PeerAction} from "../peers/score/index.js";
import {NetworkCoreMetrics} from "../core/metrics.js";
import {StatusCache} from "../statusCache.js";
import {callInNextEventLoop} from "../../util/eventLoop.js";
import {onOutgoingReqRespError} from "./score.js";
import {
GetReqRespHandlerFn,
Expand Down Expand Up @@ -259,7 +260,7 @@ export class ReqRespBeaconNode extends ReqResp {
// Allow onRequest to return and close the stream
// For Goodbye there may be a race condition where the listener of `receivedGoodbye`
// disconnects in the same synchronous call, preventing the stream from ending cleanly
setTimeout(() => this.networkEventBus.emit(NetworkEvent.reqRespRequest, {request, peer}), 0);
callInNextEventLoop(() => this.networkEventBus.emit(NetworkEvent.reqRespRequest, {request, peer}));
}

protected onIncomingRequest(peerId: PeerId, protocol: ProtocolDescriptor): void {
Expand Down
22 changes: 22 additions & 0 deletions packages/beacon-node/src/util/eventLoop.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import {sleep} from "@lodestar/utils";

/**
* Schedules in 1ms a Promise to be resolved during the `timers` phase.
* Awaiting this Promise will force the whole event queue to be executed.
*
* Caution: as the execution of the event queue might lead to new enqueuing, this might take significant time.
*/
export function nextEventLoop(): Promise<void> {
// `setTimeout` delay is at least 1ms
// Say https://nodejs.org/api/timers.html#settimeoutcallback-delay-args
return sleep(0);
}

/**
* Schedules in 1ms a callback for execution during the next `timers` phase.
*/
export function callInNextEventLoop(callback: () => void): void {
// `setTimeout` delay is at least 1ms
// Say https://nodejs.org/api/timers.html#settimeoutcallback-delay-args
setTimeout(callback, 0);
}
6 changes: 3 additions & 3 deletions packages/beacon-node/src/util/queue/itemQueue.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {sleep} from "@lodestar/utils";
import {LinkedList} from "../array.js";
import {callInNextEventLoop, nextEventLoop} from "../../util/eventLoop.js";
import {QueueError, QueueErrorCode} from "./errors.js";
import {defaultQueueOpts, QueueMetrics, JobQueueOpts, QueueType} from "./options.js";

Expand Down Expand Up @@ -66,7 +66,7 @@ export class JobItemQueue<Args extends any[], R> {
if (this.jobs.length === 1 && this.opts.noYieldIfOneItem) {
void this.runJob();
} else if (this.runningJobs < this.opts.maxConcurrency) {
setTimeout(this.runJob, 0);
callInNextEventLoop(this.runJob);
}
});
}
Expand Down Expand Up @@ -106,7 +106,7 @@ export class JobItemQueue<Args extends any[], R> {
// Yield to the macro queue
if (Date.now() - this.lastYield > this.opts.yieldEveryMs) {
this.lastYield = Date.now();
await sleep(0);
await nextEventLoop();
}
} catch (e) {
job.reject(e as Error);
Expand Down
Loading