diff --git a/packages/beacon-node/package.json b/packages/beacon-node/package.json index b1104d2db75f..c47afa96c908 100644 --- a/packages/beacon-node/package.json +++ b/packages/beacon-node/package.json @@ -103,7 +103,7 @@ "@chainsafe/persistent-merkle-tree": "^0.5.0", "@chainsafe/snappy-stream": "^5.1.2", "@chainsafe/ssz": "^0.10.2", - "@chainsafe/threads": "^1.10.0", + "@chainsafe/threads": "^1.11.0", "@ethersproject/abi": "^5.7.0", "@fastify/bearer-auth": "^9.0.0", "@fastify/cors": "^8.2.1", diff --git a/packages/beacon-node/src/api/rest/base.ts b/packages/beacon-node/src/api/rest/base.ts index b65e52fd8b01..7f7c03773a9b 100644 --- a/packages/beacon-node/src/api/rest/base.ts +++ b/packages/beacon-node/src/api/rest/base.ts @@ -28,6 +28,11 @@ export type RestApiServerMetrics = SocketMetrics & { errors: IGauge<"operationId">; }; +enum Status { + Listening = "listening", + Closed = "closed", +} + /** * REST API powered by `fastify` server. */ @@ -36,6 +41,8 @@ export class RestApiServer { protected readonly logger: Logger; private readonly activeSockets: HttpActiveSocketsTracker; + private status = Status.Closed; + constructor(private readonly opts: RestApiServerOpts, modules: RestApiServerModules) { // Apply opts defaults const {logger, metrics} = modules; @@ -94,7 +101,8 @@ export class RestApiServer { server.addHook("onError", async (req, _res, err) => { // Don't log ErrorAborted errors, they happen on node shutdown and are not useful // Don't log NodeISSyncing errors, they happen very frequently while syncing and the validator polls duties - if (err instanceof ErrorAborted || err instanceof NodeIsSyncing) return; + // Don't log eventstream aborted errors if server instance is being closed on node shutdown + if (err instanceof ErrorAborted || err instanceof NodeIsSyncing || this.status === Status.Closed) return; const {operationId} = req.routeConfig as RouteConfig; @@ -114,6 +122,9 @@ export class RestApiServer { * Start the REST API server. */ async listen(): Promise { + if (this.status === Status.Listening) return; + this.status = Status.Listening; + try { const host = this.opts.address; const address = await this.server.listen({port: this.opts.port, host}); @@ -123,6 +134,7 @@ export class RestApiServer { } } catch (e) { this.logger.error("Error starting REST api server", this.opts, e as Error); + this.status = Status.Closed; throw e; } } @@ -131,6 +143,9 @@ export class RestApiServer { * Close the server instance and terminate all existing connections. */ async close(): Promise { + if (this.status === Status.Closed) return; + this.status = Status.Closed; + // In NodeJS land calling close() only causes new connections to be rejected. // Existing connections can prevent .close() from resolving for potentially forever. // In Lodestar case when the BeaconNode wants to close we will just abruptly terminate diff --git a/packages/beacon-node/src/chain/blocks/importBlock.ts b/packages/beacon-node/src/chain/blocks/importBlock.ts index b06da6dfcdc5..7e091028891b 100644 --- a/packages/beacon-node/src/chain/blocks/importBlock.ts +++ b/packages/beacon-node/src/chain/blocks/importBlock.ts @@ -10,9 +10,11 @@ import { } from "@lodestar/state-transition"; import {routes} from "@lodestar/api"; import {ForkChoiceError, ForkChoiceErrorCode, EpochDifference, AncestorStatus} from "@lodestar/fork-choice"; +import {isErrorAborted} from "@lodestar/utils"; import {ZERO_HASH_HEX} from "../../constants/index.js"; import {toCheckpointHex} from "../stateCache/index.js"; import {isOptimisticBlock} from "../../util/forkChoice.js"; +import {isQueueErrorAborted} from "../../util/queue/index.js"; import {ChainEvent, ReorgEventData} from "../emitter.js"; import {REPROCESS_MIN_TIME_TO_NEXT_SLOT_SEC} from "../reprocess.js"; import {RegenCaller} from "../regen/interface.js"; @@ -317,7 +319,9 @@ export async function importBlock( finalizedBlockHash ) .catch((e) => { - this.logger.error("Error pushing notifyForkchoiceUpdate()", {headBlockHash, finalizedBlockHash}, e); + if (!isErrorAborted(e) && !isQueueErrorAborted(e)) { + this.logger.error("Error pushing notifyForkchoiceUpdate()", {headBlockHash, finalizedBlockHash}, e); + } }); } } diff --git a/packages/beacon-node/src/chain/blocks/index.ts b/packages/beacon-node/src/chain/blocks/index.ts index 60e3342385bb..c65f15a5476a 100644 --- a/packages/beacon-node/src/chain/blocks/index.ts +++ b/packages/beacon-node/src/chain/blocks/index.ts @@ -1,8 +1,8 @@ import {WithOptionalBytes, allForks} from "@lodestar/types"; -import {toHex} from "@lodestar/utils"; -import {JobItemQueue} from "../../util/queue/index.js"; +import {toHex, isErrorAborted} from "@lodestar/utils"; +import {JobItemQueue, isQueueErrorAborted} from "../../util/queue/index.js"; import {Metrics} from "../../metrics/metrics.js"; -import {BlockError, BlockErrorCode} from "../errors/index.js"; +import {BlockError, BlockErrorCode, isBlockErrorAborted} from "../errors/index.js"; import {BlockProcessOpts} from "../options.js"; import type {BeaconChain} from "../chain.js"; import {verifyBlocksInEpoch} from "./verifyBlock.js"; @@ -111,6 +111,10 @@ export async function processBlocks( await importBlock.call(this, fullyVerifiedBlock, opts); } } catch (e) { + if (isErrorAborted(e) || isQueueErrorAborted(e) || isBlockErrorAborted(e)) { + return; // Ignore + } + // above functions should only throw BlockError const err = getBlockError(e, blocks[0].block); diff --git a/packages/beacon-node/src/chain/errors/blockError.ts b/packages/beacon-node/src/chain/errors/blockError.ts index 21fee9dd5511..d9d42fdbc221 100644 --- a/packages/beacon-node/src/chain/errors/blockError.ts +++ b/packages/beacon-node/src/chain/errors/blockError.ts @@ -3,6 +3,7 @@ import {LodestarError} from "@lodestar/utils"; import {toHexString} from "@chainsafe/ssz"; import {CachedBeaconStateAllForks} from "@lodestar/state-transition"; import {ExecutePayloadStatus} from "../../execution/engine/interface.js"; +import {QueueErrorCode} from "../../util/queue/index.js"; import {GossipActionError} from "./gossipValidation.js"; export enum BlockErrorCode { @@ -116,6 +117,14 @@ export class BlockError extends LodestarError { } } +export function isBlockErrorAborted(e: unknown): e is BlockError { + return ( + e instanceof BlockError && + e.type.code === BlockErrorCode.EXECUTION_ENGINE_ERROR && + e.type.errorMessage === QueueErrorCode.QUEUE_ABORTED + ); +} + export function renderBlockErrorType(type: BlockErrorType): Record { switch (type.code) { case BlockErrorCode.PRESTATE_MISSING: diff --git a/packages/beacon-node/src/chain/prepareNextSlot.ts b/packages/beacon-node/src/chain/prepareNextSlot.ts index 1c7e63e75811..c6f25b1bfc1b 100644 --- a/packages/beacon-node/src/chain/prepareNextSlot.ts +++ b/packages/beacon-node/src/chain/prepareNextSlot.ts @@ -2,7 +2,7 @@ import {computeEpochAtSlot, isExecutionStateType, computeTimeAtSlot} from "@lode import {ChainForkConfig} from "@lodestar/config"; import {ForkSeq, SLOTS_PER_EPOCH, ForkExecution} from "@lodestar/params"; import {Slot} from "@lodestar/types"; -import {Logger, sleep, fromHex} from "@lodestar/utils"; +import {Logger, sleep, fromHex, isErrorAborted} from "@lodestar/utils"; import {routes} from "@lodestar/api"; import {GENESIS_SLOT, ZERO_HASH_HEX} from "../constants/constants.js"; import {Metrics} from "../metrics/index.js"; @@ -172,8 +172,10 @@ export class PrepareNextSlotScheduler { } } } catch (e) { - this.metrics?.precomputeNextEpochTransition.count.inc({result: "error"}, 1); - this.logger.error("Failed to run prepareForNextSlot", {nextEpoch, isEpochTransition, prepareSlot}, e as Error); + if (!isErrorAborted(e)) { + this.metrics?.precomputeNextEpochTransition.count.inc({result: "error"}, 1); + this.logger.error("Failed to run prepareForNextSlot", {nextEpoch, isEpochTransition, prepareSlot}, e as Error); + } } }; } diff --git a/packages/beacon-node/src/eth1/eth1DepositDataTracker.ts b/packages/beacon-node/src/eth1/eth1DepositDataTracker.ts index c5e8605e83cd..a7fc91fafb06 100644 --- a/packages/beacon-node/src/eth1/eth1DepositDataTracker.ts +++ b/packages/beacon-node/src/eth1/eth1DepositDataTracker.ts @@ -162,8 +162,7 @@ export class Eth1DepositDataTracker { private async runAutoUpdate(): Promise { let lastRunMs = 0; - // eslint-disable-next-line no-constant-condition - while (true) { + while (!this.signal.aborted) { lastRunMs = Date.now(); try { diff --git a/packages/beacon-node/src/node/nodejs.ts b/packages/beacon-node/src/node/nodejs.ts index 7ea931dd8220..14243be2efa1 100644 --- a/packages/beacon-node/src/node/nodejs.ts +++ b/packages/beacon-node/src/node/nodejs.ts @@ -4,6 +4,7 @@ import {Registry} from "prom-client"; import {PeerId} from "@libp2p/interface-peer-id"; import {BeaconConfig} from "@lodestar/config"; import {phase0} from "@lodestar/types"; +import {sleep} from "@lodestar/utils"; import {LoggerNode} from "@lodestar/logger/node"; import {Api, ServerApi} from "@lodestar/api"; import {BeaconStateAllForks} from "@lodestar/state-transition"; @@ -74,6 +75,12 @@ enum LoggerModule { sync = "sync", } +/** + * Short delay before closing db to give async operations sufficient time to complete + * and prevent "Database is not open" errors when shutting down beacon node. + */ +const DELAY_BEFORE_CLOSING_DB_MS = 500; + /** * The main Beacon Node class. Contains various components for getting and processing data from the * Ethereum Consensus ecosystem as well as systems for getting beacon node metadata. @@ -316,11 +323,11 @@ export class BeaconNode { if (this.metricsServer) await this.metricsServer.stop(); if (this.monitoring) this.monitoring.stop(); if (this.restApi) await this.restApi.close(); - await this.chain.persistToDisk(); await this.chain.close(); - await this.db.stop(); if (this.controller) this.controller.abort(); + await sleep(DELAY_BEFORE_CLOSING_DB_MS); + await this.db.stop(); this.status = BeaconNodeStatus.closed; } } diff --git a/packages/beacon-node/src/util/clock.ts b/packages/beacon-node/src/util/clock.ts index b7741790ab33..2f19a5a8ca6b 100644 --- a/packages/beacon-node/src/util/clock.ts +++ b/packages/beacon-node/src/util/clock.ts @@ -176,7 +176,7 @@ export class Clock extends EventEmitter implements IClock { private onNextSlot = (slot?: Slot): void => { const clockSlot = slot ?? getCurrentSlot(this.config, this.genesisTime); // process multiple clock slots in the case the main thread has been saturated for > SECONDS_PER_SLOT - while (this._currentSlot < clockSlot) { + while (this._currentSlot < clockSlot && !this.signal.aborted) { const previousSlot = this._currentSlot; this._currentSlot++; @@ -189,8 +189,11 @@ export class Clock extends EventEmitter implements IClock { this.emit(ClockEvent.epoch, currentEpoch); } } - //recursively invoke onNextSlot - this.timeoutId = setTimeout(this.onNextSlot, this.msUntilNextSlot()); + + if (!this.signal.aborted) { + //recursively invoke onNextSlot + this.timeoutId = setTimeout(this.onNextSlot, this.msUntilNextSlot()); + } }; private msUntilNextSlot(): number { diff --git a/packages/beacon-node/src/util/queue/errors.ts b/packages/beacon-node/src/util/queue/errors.ts index d910f781a64c..edb0b3b87374 100644 --- a/packages/beacon-node/src/util/queue/errors.ts +++ b/packages/beacon-node/src/util/queue/errors.ts @@ -12,3 +12,7 @@ export class QueueError extends LodestarError { super(type); } } + +export function isQueueErrorAborted(e: unknown): e is QueueError { + return e instanceof QueueError && e.type.code === QueueErrorCode.QUEUE_ABORTED; +} diff --git a/packages/beacon-node/src/util/queue/index.ts b/packages/beacon-node/src/util/queue/index.ts index 5a375c73d4f4..b55979b1f5ba 100644 --- a/packages/beacon-node/src/util/queue/index.ts +++ b/packages/beacon-node/src/util/queue/index.ts @@ -1,4 +1,4 @@ export * from "./fnQueue.js"; export * from "./itemQueue.js"; export * from "./options.js"; -export {QueueError, QueueErrorCode} from "./errors.js"; +export {QueueError, QueueErrorCode, isQueueErrorAborted} from "./errors.js"; diff --git a/packages/cli/src/cmds/beacon/handler.ts b/packages/cli/src/cmds/beacon/handler.ts index eedc5d3a334d..b2b0dd3fde4a 100644 --- a/packages/cli/src/cmds/beacon/handler.ts +++ b/packages/cli/src/cmds/beacon/handler.ts @@ -95,7 +95,22 @@ export async function beaconHandler(args: BeaconArgs & GlobalArgs): Promise node.close(), {once: true}); + abortController.signal.addEventListener( + "abort", + async () => { + try { + await node.close(); + logger.debug("Beacon node closed"); + } catch (e) { + logger.error("Error closing beacon node", {}, e as Error); + // Make sure db is always closed gracefully + await db.stop(); + // Must explicitly exit process due to potential active handles + process.exit(1); + } + }, + {once: true} + ); } catch (e) { await db.stop(); diff --git a/packages/reqresp/src/ReqResp.ts b/packages/reqresp/src/ReqResp.ts index 267003fa7e50..3fa9a43c3200 100644 --- a/packages/reqresp/src/ReqResp.ts +++ b/packages/reqresp/src/ReqResp.ts @@ -143,6 +143,7 @@ export class ReqResp { } async stop(): Promise { + this.rateLimiter.stop(); this.controller.abort(); } diff --git a/yarn.lock b/yarn.lock index 0fe06c29c7a1..3bb4bfb1bdad 100644 --- a/yarn.lock +++ b/yarn.lock @@ -654,10 +654,10 @@ "@chainsafe/as-sha256" "^0.4.1" "@chainsafe/persistent-merkle-tree" "^0.6.1" -"@chainsafe/threads@^1.10.0": - version "1.10.0" - resolved "https://registry.npmjs.org/@chainsafe/threads/-/threads-1.10.0.tgz" - integrity sha512-1PRtW3s5welk2WUAMN0bjEGtQ3PSqLWAxlMTSeOYJ2NV+bgNgi8PsALX/xEiB/CHd8XG5gS2BlmYZ+Q3b4wR7Q== +"@chainsafe/threads@^1.11.0": + version "1.11.0" + resolved "https://registry.yarnpkg.com/@chainsafe/threads/-/threads-1.11.0.tgz#4845f452d30901053991cf050696746a442a8933" + integrity sha512-l36K9eXqpE0PTnQThDBrxmkx0DX4UV2Qt0l9ASRHRbCUZW3SlSAS+b0//DfTQ2/OXeDVPuE7/72991lTOaKmdQ== dependencies: callsites "^3.1.0" debug "^4.2.0" @@ -13472,7 +13472,7 @@ rfdc@^1.2.0, rfdc@^1.3.0: rimraf@^2.6.1: version "2.7.1" - resolved "https://registry.npmjs.org/rimraf/-/rimraf-2.7.1.tgz" + resolved "https://registry.yarnpkg.com/rimraf/-/rimraf-2.7.1.tgz#35797f13a7fdadc566142c29d4f07ccad483e3ec" integrity sha512-uWjbaKIK3T1OSVptzX7Nl6PvQ3qAGtKEtVRjRuazjfL3Bx5eI409VZSqgND+4UNnmzLVdPj9FqFJNPqBZFve4w== dependencies: glob "^7.1.3"