Skip to content

Commit

Permalink
Merge 32983cd into 1a8476b
Browse files Browse the repository at this point in the history
  • Loading branch information
nflaig authored May 24, 2023
2 parents 1a8476b + 32983cd commit 677c848
Show file tree
Hide file tree
Showing 14 changed files with 86 additions and 23 deletions.
2 changes: 1 addition & 1 deletion packages/beacon-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@
"@chainsafe/persistent-merkle-tree": "^0.5.0",
"@chainsafe/snappy-stream": "^5.1.2",
"@chainsafe/ssz": "^0.10.2",
"@chainsafe/threads": "^1.10.0",
"@chainsafe/threads": "^1.11.0",
"@ethersproject/abi": "^5.7.0",
"@fastify/bearer-auth": "^9.0.0",
"@fastify/cors": "^8.2.1",
Expand Down
17 changes: 16 additions & 1 deletion packages/beacon-node/src/api/rest/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ export type RestApiServerMetrics = SocketMetrics & {
errors: IGauge<"operationId">;
};

enum Status {
Listening = "listening",
Closed = "closed",
}

/**
* REST API powered by `fastify` server.
*/
Expand All @@ -36,6 +41,8 @@ export class RestApiServer {
protected readonly logger: Logger;
private readonly activeSockets: HttpActiveSocketsTracker;

private status = Status.Closed;

constructor(private readonly opts: RestApiServerOpts, modules: RestApiServerModules) {
// Apply opts defaults
const {logger, metrics} = modules;
Expand Down Expand Up @@ -94,7 +101,8 @@ export class RestApiServer {
server.addHook("onError", async (req, _res, err) => {
// Don't log ErrorAborted errors, they happen on node shutdown and are not useful
// Don't log NodeISSyncing errors, they happen very frequently while syncing and the validator polls duties
if (err instanceof ErrorAborted || err instanceof NodeIsSyncing) return;
// Don't log eventstream aborted errors if server instance is being closed on node shutdown
if (err instanceof ErrorAborted || err instanceof NodeIsSyncing || this.status === Status.Closed) return;

const {operationId} = req.routeConfig as RouteConfig;

Expand All @@ -114,6 +122,9 @@ export class RestApiServer {
* Start the REST API server.
*/
async listen(): Promise<void> {
if (this.status === Status.Listening) return;
this.status = Status.Listening;

try {
const host = this.opts.address;
const address = await this.server.listen({port: this.opts.port, host});
Expand All @@ -123,6 +134,7 @@ export class RestApiServer {
}
} catch (e) {
this.logger.error("Error starting REST api server", this.opts, e as Error);
this.status = Status.Closed;
throw e;
}
}
Expand All @@ -131,6 +143,9 @@ export class RestApiServer {
* Close the server instance and terminate all existing connections.
*/
async close(): Promise<void> {
if (this.status === Status.Closed) return;
this.status = Status.Closed;

// In NodeJS land calling close() only causes new connections to be rejected.
// Existing connections can prevent .close() from resolving for potentially forever.
// In Lodestar case when the BeaconNode wants to close we will just abruptly terminate
Expand Down
6 changes: 5 additions & 1 deletion packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import {
} from "@lodestar/state-transition";
import {routes} from "@lodestar/api";
import {ForkChoiceError, ForkChoiceErrorCode, EpochDifference, AncestorStatus} from "@lodestar/fork-choice";
import {isErrorAborted} from "@lodestar/utils";
import {ZERO_HASH_HEX} from "../../constants/index.js";
import {toCheckpointHex} from "../stateCache/index.js";
import {isOptimisticBlock} from "../../util/forkChoice.js";
import {isQueueErrorAborted} from "../../util/queue/index.js";
import {ChainEvent, ReorgEventData} from "../emitter.js";
import {REPROCESS_MIN_TIME_TO_NEXT_SLOT_SEC} from "../reprocess.js";
import {RegenCaller} from "../regen/interface.js";
Expand Down Expand Up @@ -317,7 +319,9 @@ export async function importBlock(
finalizedBlockHash
)
.catch((e) => {
this.logger.error("Error pushing notifyForkchoiceUpdate()", {headBlockHash, finalizedBlockHash}, e);
if (!isErrorAborted(e) && !isQueueErrorAborted(e)) {
this.logger.error("Error pushing notifyForkchoiceUpdate()", {headBlockHash, finalizedBlockHash}, e);
}
});
}
}
Expand Down
10 changes: 7 additions & 3 deletions packages/beacon-node/src/chain/blocks/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import {WithOptionalBytes, allForks} from "@lodestar/types";
import {toHex} from "@lodestar/utils";
import {JobItemQueue} from "../../util/queue/index.js";
import {toHex, isErrorAborted} from "@lodestar/utils";
import {JobItemQueue, isQueueErrorAborted} from "../../util/queue/index.js";
import {Metrics} from "../../metrics/metrics.js";
import {BlockError, BlockErrorCode} from "../errors/index.js";
import {BlockError, BlockErrorCode, isBlockErrorAborted} from "../errors/index.js";
import {BlockProcessOpts} from "../options.js";
import type {BeaconChain} from "../chain.js";
import {verifyBlocksInEpoch} from "./verifyBlock.js";
Expand Down Expand Up @@ -111,6 +111,10 @@ export async function processBlocks(
await importBlock.call(this, fullyVerifiedBlock, opts);
}
} catch (e) {
if (isErrorAborted(e) || isQueueErrorAborted(e) || isBlockErrorAborted(e)) {
return; // Ignore
}

// above functions should only throw BlockError
const err = getBlockError(e, blocks[0].block);

Expand Down
9 changes: 9 additions & 0 deletions packages/beacon-node/src/chain/errors/blockError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {LodestarError} from "@lodestar/utils";
import {toHexString} from "@chainsafe/ssz";
import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {ExecutePayloadStatus} from "../../execution/engine/interface.js";
import {QueueErrorCode} from "../../util/queue/index.js";
import {GossipActionError} from "./gossipValidation.js";

export enum BlockErrorCode {
Expand Down Expand Up @@ -116,6 +117,14 @@ export class BlockError extends LodestarError<BlockErrorType> {
}
}

export function isBlockErrorAborted(e: unknown): e is BlockError {
return (
e instanceof BlockError &&
e.type.code === BlockErrorCode.EXECUTION_ENGINE_ERROR &&
e.type.errorMessage === QueueErrorCode.QUEUE_ABORTED
);
}

export function renderBlockErrorType(type: BlockErrorType): Record<string, string | number | null> {
switch (type.code) {
case BlockErrorCode.PRESTATE_MISSING:
Expand Down
8 changes: 5 additions & 3 deletions packages/beacon-node/src/chain/prepareNextSlot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import {computeEpochAtSlot, isExecutionStateType, computeTimeAtSlot} from "@lode
import {ChainForkConfig} from "@lodestar/config";
import {ForkSeq, SLOTS_PER_EPOCH, ForkExecution} from "@lodestar/params";
import {Slot} from "@lodestar/types";
import {Logger, sleep, fromHex} from "@lodestar/utils";
import {Logger, sleep, fromHex, isErrorAborted} from "@lodestar/utils";
import {routes} from "@lodestar/api";
import {GENESIS_SLOT, ZERO_HASH_HEX} from "../constants/constants.js";
import {Metrics} from "../metrics/index.js";
Expand Down Expand Up @@ -172,8 +172,10 @@ export class PrepareNextSlotScheduler {
}
}
} catch (e) {
this.metrics?.precomputeNextEpochTransition.count.inc({result: "error"}, 1);
this.logger.error("Failed to run prepareForNextSlot", {nextEpoch, isEpochTransition, prepareSlot}, e as Error);
if (!isErrorAborted(e)) {
this.metrics?.precomputeNextEpochTransition.count.inc({result: "error"}, 1);
this.logger.error("Failed to run prepareForNextSlot", {nextEpoch, isEpochTransition, prepareSlot}, e as Error);
}
}
};
}
3 changes: 1 addition & 2 deletions packages/beacon-node/src/eth1/eth1DepositDataTracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,7 @@ export class Eth1DepositDataTracker {
private async runAutoUpdate(): Promise<void> {
let lastRunMs = 0;

// eslint-disable-next-line no-constant-condition
while (true) {
while (!this.signal.aborted) {
lastRunMs = Date.now();

try {
Expand Down
11 changes: 9 additions & 2 deletions packages/beacon-node/src/node/nodejs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {Registry} from "prom-client";
import {PeerId} from "@libp2p/interface-peer-id";
import {BeaconConfig} from "@lodestar/config";
import {phase0} from "@lodestar/types";
import {sleep} from "@lodestar/utils";
import {LoggerNode} from "@lodestar/logger/node";
import {Api, ServerApi} from "@lodestar/api";
import {BeaconStateAllForks} from "@lodestar/state-transition";
Expand Down Expand Up @@ -74,6 +75,12 @@ enum LoggerModule {
sync = "sync",
}

/**
* Short delay before closing db to give async operations sufficient time to complete
* and prevent "Database is not open" errors when shutting down beacon node.
*/
const DELAY_BEFORE_CLOSING_DB_MS = 500;

/**
* The main Beacon Node class. Contains various components for getting and processing data from the
* Ethereum Consensus ecosystem as well as systems for getting beacon node metadata.
Expand Down Expand Up @@ -316,11 +323,11 @@ export class BeaconNode {
if (this.metricsServer) await this.metricsServer.stop();
if (this.monitoring) this.monitoring.stop();
if (this.restApi) await this.restApi.close();

await this.chain.persistToDisk();
await this.chain.close();
await this.db.stop();
if (this.controller) this.controller.abort();
await sleep(DELAY_BEFORE_CLOSING_DB_MS);
await this.db.stop();
this.status = BeaconNodeStatus.closed;
}
}
Expand Down
9 changes: 6 additions & 3 deletions packages/beacon-node/src/util/clock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ export class Clock extends EventEmitter implements IClock {
private onNextSlot = (slot?: Slot): void => {
const clockSlot = slot ?? getCurrentSlot(this.config, this.genesisTime);
// process multiple clock slots in the case the main thread has been saturated for > SECONDS_PER_SLOT
while (this._currentSlot < clockSlot) {
while (this._currentSlot < clockSlot && !this.signal.aborted) {
const previousSlot = this._currentSlot;
this._currentSlot++;

Expand All @@ -189,8 +189,11 @@ export class Clock extends EventEmitter implements IClock {
this.emit(ClockEvent.epoch, currentEpoch);
}
}
//recursively invoke onNextSlot
this.timeoutId = setTimeout(this.onNextSlot, this.msUntilNextSlot());

if (!this.signal.aborted) {
//recursively invoke onNextSlot
this.timeoutId = setTimeout(this.onNextSlot, this.msUntilNextSlot());
}
};

private msUntilNextSlot(): number {
Expand Down
4 changes: 4 additions & 0 deletions packages/beacon-node/src/util/queue/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,7 @@ export class QueueError extends LodestarError<QueueErrorCodeType> {
super(type);
}
}

export function isQueueErrorAborted(e: unknown): e is QueueError {
return e instanceof QueueError && e.type.code === QueueErrorCode.QUEUE_ABORTED;
}
2 changes: 1 addition & 1 deletion packages/beacon-node/src/util/queue/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
export * from "./fnQueue.js";
export * from "./itemQueue.js";
export * from "./options.js";
export {QueueError, QueueErrorCode} from "./errors.js";
export {QueueError, QueueErrorCode, isQueueErrorAborted} from "./errors.js";
17 changes: 16 additions & 1 deletion packages/cli/src/cmds/beacon/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,22 @@ export async function beaconHandler(args: BeaconArgs & GlobalArgs): Promise<void
abortController.abort();
}, logger.info.bind(logger));

abortController.signal.addEventListener("abort", () => node.close(), {once: true});
abortController.signal.addEventListener(
"abort",
async () => {
try {
await node.close();
logger.debug("Beacon node closed");
} catch (e) {
logger.error("Error closing beacon node", {}, e as Error);
// Make sure db is always closed gracefully
await db.stop();
// Must explicitly exit process due to potential active handles
process.exit(1);
}
},
{once: true}
);
} catch (e) {
await db.stop();

Expand Down
1 change: 1 addition & 0 deletions packages/reqresp/src/ReqResp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ export class ReqResp {
}

async stop(): Promise<void> {
this.rateLimiter.stop();
this.controller.abort();
}

Expand Down
10 changes: 5 additions & 5 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -654,10 +654,10 @@
"@chainsafe/as-sha256" "^0.4.1"
"@chainsafe/persistent-merkle-tree" "^0.6.1"

"@chainsafe/threads@^1.10.0":
version "1.10.0"
resolved "https://registry.npmjs.org/@chainsafe/threads/-/threads-1.10.0.tgz"
integrity sha512-1PRtW3s5welk2WUAMN0bjEGtQ3PSqLWAxlMTSeOYJ2NV+bgNgi8PsALX/xEiB/CHd8XG5gS2BlmYZ+Q3b4wR7Q==
"@chainsafe/threads@^1.11.0":
version "1.11.0"
resolved "https://registry.yarnpkg.com/@chainsafe/threads/-/threads-1.11.0.tgz#4845f452d30901053991cf050696746a442a8933"
integrity sha512-l36K9eXqpE0PTnQThDBrxmkx0DX4UV2Qt0l9ASRHRbCUZW3SlSAS+b0//DfTQ2/OXeDVPuE7/72991lTOaKmdQ==
dependencies:
callsites "^3.1.0"
debug "^4.2.0"
Expand Down Expand Up @@ -13472,7 +13472,7 @@ rfdc@^1.2.0, rfdc@^1.3.0:

rimraf@^2.6.1:
version "2.7.1"
resolved "https://registry.npmjs.org/rimraf/-/rimraf-2.7.1.tgz"
resolved "https://registry.yarnpkg.com/rimraf/-/rimraf-2.7.1.tgz#35797f13a7fdadc566142c29d4f07ccad483e3ec"
integrity sha512-uWjbaKIK3T1OSVptzX7Nl6PvQ3qAGtKEtVRjRuazjfL3Bx5eI409VZSqgND+4UNnmzLVdPj9FqFJNPqBZFve4w==
dependencies:
glob "^7.1.3"
Expand Down

0 comments on commit 677c848

Please sign in to comment.