Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support el_offline in eth/v1/node/syncing #5723

Merged
merged 22 commits into from
Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/api/src/beacon/routes/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ export type SyncingStatus = {
isSyncing: boolean;
/** Set to true if the node is optimistically tracking head. */
isOptimistic: boolean;
/** Set to true if the connected el client is offline */
elOffline: boolean;
};

export enum NodeHealth {
Expand Down
2 changes: 1 addition & 1 deletion packages/api/test/unit/beacon/testData/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export const testData: GenericServerTestCases<Api> = {
},
getSyncingStatus: {
args: [],
res: {data: {headSlot: "1", syncDistance: "2", isSyncing: false, isOptimistic: true}},
res: {data: {headSlot: "1", syncDistance: "2", isSyncing: false, isOptimistic: true, elOffline: false}},
},
getHealth: {
args: [],
Expand Down
10 changes: 10 additions & 0 deletions packages/beacon-node/src/eth1/provider/jsonRpcHttpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@ import {encodeJwtToken} from "./jwt.js";
const maxStringLengthToPrint = 500;
const REQUEST_TIMEOUT = 30 * 1000;

// As we are using `cross-fetch` which does not support for types for errors
// We can't use `node-fetch` for browser compatibility
export type FetchError = {
errno: string;
code: string;
};

export const isFetchError = (error: unknown): error is FetchError =>
(error as FetchError) !== undefined && "code" in (error as FetchError) && "errno" in (error as FetchError);

interface RpcResponse<R> extends RpcResponseError {
result?: R;
}
Expand Down
6 changes: 5 additions & 1 deletion packages/beacon-node/src/execution/engine/disabled.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {IExecutionEngine, PayloadIdCache} from "./interface.js";
import {ExecutionEngineState, IExecutionEngine, PayloadIdCache} from "./interface.js";

export class ExecutionEngineDisabled implements IExecutionEngine {
readonly payloadIdCache = new PayloadIdCache();
Expand Down Expand Up @@ -26,4 +26,8 @@ export class ExecutionEngineDisabled implements IExecutionEngine {
getPayloadBodiesByRange(): Promise<never> {
throw Error("Execution engine disabled");
}

getState(): ExecutionEngineState {
throw Error("Execution engine disabled");
}
}
104 changes: 85 additions & 19 deletions packages/beacon-node/src/execution/engine/http.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import {Root, RootHex, allForks, Wei} from "@lodestar/types";
import {SLOTS_PER_EPOCH, ForkName, ForkSeq} from "@lodestar/params";

import {Logger} from "@lodestar/logger";
import {ErrorJsonRpcResponse, HttpRpcError} from "../../eth1/provider/jsonRpcHttpClient.js";
import {IJsonRpcHttpClient, ReqOpts} from "../../eth1/provider/jsonRpcHttpClient.js";
import {Metrics} from "../../metrics/index.js";
import {JobItemQueue} from "../../util/queue/index.js";
import {EPOCHS_PER_BATCH} from "../../sync/constants.js";
import {numToQuantity} from "../../eth1/provider/utils.js";
import {IJson, RpcPayload} from "../../eth1/interface.js";
import {
ExecutePayloadStatus,
ExecutePayloadResponse,
Expand All @@ -15,6 +16,7 @@ import {
PayloadAttributes,
BlobsBundle,
VersionedHashes,
ExecutionEngineState,
} from "./interface.js";
import {PayloadIdCache} from "./payloadIdCache.js";
import {
Expand All @@ -29,10 +31,12 @@ import {
assertReqSizeLimit,
deserializeExecutionPayloadBody,
} from "./types.js";
import {getExecutionEngineState} from "./utils.js";

export type ExecutionEngineModules = {
signal: AbortSignal;
metrics?: Metrics | null;
logger: Logger;
};

export type ExecutionEngineHttpOpts = {
Expand Down Expand Up @@ -82,6 +86,13 @@ const getPayloadOpts: ReqOpts = {routeId: "getPayload"};
* https://github.com/ethereum/execution-apis/blob/v1.0.0-alpha.1/src/engine/interop/specification.md
*/
export class ExecutionEngineHttp implements IExecutionEngine {
private logger: Logger;

// The default state is ONLINE, it will be updated to SYNCING once we receive the first payload
// This assumption is better than the OFFLINE state, since we can't be sure if the EL is offline and being offline may trigger some notifications
// It's safer to to avoid false positives and assume that the EL is syncing until we receive the first payload
private state: ExecutionEngineState = ExecutionEngineState.ONLINE;

readonly payloadIdCache = new PayloadIdCache();
/**
* A queue to serialize the fcUs and newPayloads calls:
Expand All @@ -95,21 +106,33 @@ export class ExecutionEngineHttp implements IExecutionEngine {
private readonly rpcFetchQueue: JobItemQueue<[EngineRequest], EngineResponse>;

private jobQueueProcessor = async ({method, params, methodOpts}: EngineRequest): Promise<EngineResponse> => {
return this.rpc.fetchWithRetries<EngineApiRpcReturnTypes[typeof method], EngineApiRpcParamTypes[typeof method]>(
return this.fetchWithRetries<EngineApiRpcReturnTypes[typeof method], EngineApiRpcParamTypes[typeof method]>(
{method, params},
methodOpts
);
};

constructor(
private readonly rpc: IJsonRpcHttpClient,
{metrics, signal}: ExecutionEngineModules
{metrics, signal, logger}: ExecutionEngineModules
) {
this.rpcFetchQueue = new JobItemQueue<[EngineRequest], EngineResponse>(
this.jobQueueProcessor,
{maxLength: QUEUE_MAX_LENGTH, maxConcurrency: 1, noYieldIfOneItem: true, signal},
metrics?.engineHttpProcessorQueue
);
this.logger = logger;
}

protected async fetchWithRetries<R, P = IJson[]>(payload: RpcPayload<P>, opts?: ReqOpts): Promise<R> {
try {
const res = await this.rpc.fetchWithRetries<R, P>(payload, opts);
this.updateEngineState(ExecutionEngineState.ONLINE);
return res;
} catch (err) {
this.updateEngineState(getExecutionEngineState({payloadError: err}));
throw err;
}
}

/**
Expand Down Expand Up @@ -152,7 +175,7 @@ export class ExecutionEngineHttp implements IExecutionEngine {

const serializedExecutionPayload = serializeExecutionPayload(fork, executionPayload);

let engingRequest: EngineRequest;
let engineRequest: EngineRequest;
if (ForkSeq[fork] >= ForkSeq.deneb) {
if (versionedHashes === undefined) {
throw Error(`versionedHashes required in notifyNewPayload for fork=${fork}`);
Expand All @@ -165,32 +188,34 @@ export class ExecutionEngineHttp implements IExecutionEngine {
const parentBeaconBlockRoot = serializeBeaconBlockRoot(parentBlockRoot);

const method = "engine_newPayloadV3";
engingRequest = {
engineRequest = {
method,
params: [serializedExecutionPayload, serializedVersionedHashes, parentBeaconBlockRoot],
methodOpts: notifyNewPayloadOpts,
};
} else {
const method = ForkSeq[fork] >= ForkSeq.capella ? "engine_newPayloadV2" : "engine_newPayloadV1";
engingRequest = {
engineRequest = {
method,
params: [serializedExecutionPayload],
methodOpts: notifyNewPayloadOpts,
};
}

const {status, latestValidHash, validationError} = await (
this.rpcFetchQueue.push(engingRequest) as Promise<EngineApiRpcReturnTypes[typeof method]>
this.rpcFetchQueue.push(engineRequest) as Promise<EngineApiRpcReturnTypes[typeof method]>
)
// If there are errors by EL like connection refused, internal error, they need to be
// treated separate from being INVALID. For now, just pass the error upstream.
.catch((e: Error): EngineApiRpcReturnTypes[typeof method] => {
this.updateEngineState(getExecutionEngineState({payloadError: e}));
if (e instanceof HttpRpcError || e instanceof ErrorJsonRpcResponse) {
return {status: ExecutePayloadStatus.ELERROR, latestValidHash: null, validationError: e.message};
} else {
return {status: ExecutePayloadStatus.UNAVAILABLE, latestValidHash: null, validationError: e.message};
}
});
this.updateEngineState(getExecutionEngineState({payloadStatus: status}));

switch (status) {
case ExecutePayloadStatus.VALID:
Expand Down Expand Up @@ -277,12 +302,21 @@ export class ExecutionEngineHttp implements IExecutionEngine {
methodOpts: fcUReqOpts,
}) as Promise<EngineApiRpcReturnTypes[typeof method]>;

const response = await request;
const response = await request
// If there are errors by EL like connection refused, internal error, they need to be
// treated separate from being INVALID. For now, just pass the error upstream.
.catch((e: Error): EngineApiRpcReturnTypes[typeof method] => {
this.updateEngineState(getExecutionEngineState({payloadError: e}));
throw e;
});

const {
payloadStatus: {status, latestValidHash: _latestValidHash, validationError},
payloadId,
} = response;

this.updateEngineState(getExecutionEngineState({payloadStatus: status}));

switch (status) {
case ExecutePayloadStatus.VALID:
// if payloadAttributes are provided, a valid payloadId is expected
Expand Down Expand Up @@ -333,7 +367,7 @@ export class ExecutionEngineHttp implements IExecutionEngine {
: ForkSeq[fork] >= ForkSeq.capella
? "engine_getPayloadV2"
: "engine_getPayloadV1";
const payloadResponse = await this.rpc.fetchWithRetries<
const payloadResponse = await this.fetchWithRetries<
EngineApiRpcReturnTypes[typeof method],
EngineApiRpcParamTypes[typeof method]
>(
Expand All @@ -353,13 +387,10 @@ export class ExecutionEngineHttp implements IExecutionEngine {
async getPayloadBodiesByHash(blockHashes: RootHex[]): Promise<(ExecutionPayloadBody | null)[]> {
const method = "engine_getPayloadBodiesByHashV1";
assertReqSizeLimit(blockHashes.length, 32);
const response = await this.rpc.fetchWithRetries<
const response = await this.fetchWithRetries<
EngineApiRpcReturnTypes[typeof method],
EngineApiRpcParamTypes[typeof method]
>({
method,
params: blockHashes,
});
>({method, params: blockHashes});
return response.map(deserializeExecutionPayloadBody);
}

Expand All @@ -371,15 +402,50 @@ export class ExecutionEngineHttp implements IExecutionEngine {
assertReqSizeLimit(blockCount, 32);
const start = numToQuantity(startBlockNumber);
const count = numToQuantity(blockCount);
const response = await this.rpc.fetchWithRetries<
const response = await this.fetchWithRetries<
EngineApiRpcReturnTypes[typeof method],
EngineApiRpcParamTypes[typeof method]
>({
method,
params: [start, count],
});
>({method, params: [start, count]});
return response.map(deserializeExecutionPayloadBody);
}

getState(): ExecutionEngineState {
return this.state;
}

private updateEngineState(newState: ExecutionEngineState): void {
const oldState = this.state;

if (oldState === newState) return;

// The ONLINE is initial state and can reached from offline or auth failed error
if (
newState === ExecutionEngineState.ONLINE &&
!(oldState === ExecutionEngineState.OFFLINE || oldState === ExecutionEngineState.AUTH_FAILED)
) {
return;
}

switch (newState) {
case ExecutionEngineState.ONLINE:
this.logger.info("ExecutionEngine became online");
break;
case ExecutionEngineState.OFFLINE:
this.logger.error("ExecutionEngine went offline");
break;
case ExecutionEngineState.SYNCED:
this.logger.info("ExecutionEngine is synced");
break;
case ExecutionEngineState.SYNCING:
this.logger.info("ExecutionEngine is syncing");
break;
case ExecutionEngineState.AUTH_FAILED:
this.logger.error("ExecutionEngine authentication failed");
break;
}

this.state = newState;
}
}

type EngineRequestKey = keyof EngineApiRpcParamTypes;
Expand Down
10 changes: 10 additions & 0 deletions packages/beacon-node/src/execution/engine/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ export enum ExecutePayloadStatus {
UNSAFE_OPTIMISTIC_STATUS = "UNSAFE_OPTIMISTIC_STATUS",
}

export enum ExecutionEngineState {
ONLINE = "ONLINE",
OFFLINE = "OFFLINE",
SYNCING = "SYNCING",
SYNCED = "SYNCED",
AUTH_FAILED = "AUTH_FAILED",
}

export type ExecutePayloadResponse =
| {status: ExecutePayloadStatus.SYNCING | ExecutePayloadStatus.ACCEPTED; latestValidHash: null; validationError: null}
| {status: ExecutePayloadStatus.VALID; latestValidHash: RootHex; validationError: null}
Expand Down Expand Up @@ -132,4 +140,6 @@ export interface IExecutionEngine {
getPayloadBodiesByHash(blockHash: DATA[]): Promise<(ExecutionPayloadBody | null)[]>;

getPayloadBodiesByRange(start: number, count: number): Promise<(ExecutionPayloadBody | null)[]>;

getState(): ExecutionEngineState;
}
41 changes: 40 additions & 1 deletion packages/beacon-node/src/execution/engine/utils.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {IJson, RpcPayload} from "../../eth1/interface.js";
import {IJsonRpcHttpClient} from "../../eth1/provider/jsonRpcHttpClient.js";
import {IJsonRpcHttpClient, isFetchError} from "../../eth1/provider/jsonRpcHttpClient.js";
import {ExecutePayloadStatus, ExecutionEngineState} from "./interface.js";

export type JsonRpcBackend = {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand Down Expand Up @@ -27,3 +28,41 @@ export class ExecutionEngineMockJsonRpcClient implements IJsonRpcHttpClient {
return Promise.all(rpcPayloadArr.map((payload) => this.fetch<R>(payload)));
}
}

const fatalErrorCodes = ["ECONNREFUSED", "ENOTFOUND", "EAI_AGAIN"];
const connectionErrorCodes = ["ECONNRESET", "ECONNABORTED"];

export function getExecutionEngineState({
payloadError,
payloadStatus,
}:
| {payloadStatus: ExecutePayloadStatus; payloadError?: never}
| {payloadStatus?: never; payloadError: unknown}): ExecutionEngineState {
switch (payloadStatus) {
case ExecutePayloadStatus.ACCEPTED:
case ExecutePayloadStatus.VALID:
case ExecutePayloadStatus.UNSAFE_OPTIMISTIC_STATUS:
return ExecutionEngineState.SYNCED;

case ExecutePayloadStatus.ELERROR:
case ExecutePayloadStatus.INVALID:
case ExecutePayloadStatus.SYNCING:
case ExecutePayloadStatus.INVALID_BLOCK_HASH:
return ExecutionEngineState.SYNCING;

case ExecutePayloadStatus.UNAVAILABLE:
return ExecutionEngineState.OFFLINE;
}

if (payloadError && isFetchError(payloadError) && fatalErrorCodes.includes(payloadError.code)) {
return ExecutionEngineState.OFFLINE;
}

if (payloadError && isFetchError(payloadError) && connectionErrorCodes.includes(payloadError.code)) {
return ExecutionEngineState.AUTH_FAILED;
}

// In case we can't determine the state, we assume it's online
// This assumption is better than considering offline, because the offline state may trigger some notifications
return ExecutionEngineState.ONLINE;
}
7 changes: 6 additions & 1 deletion packages/beacon-node/src/node/nodejs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ enum LoggerModule {
backfill = "backfill",
chain = "chain",
eth1 = "eth1",
execution = "execution",
metrics = "metrics",
monitoring = "monitoring",
network = "network",
Expand Down Expand Up @@ -209,7 +210,11 @@ export class BeaconNode {
logger: logger.child({module: LoggerModule.eth1}),
signal,
}),
executionEngine: initializeExecutionEngine(opts.executionEngine, {metrics, signal}),
executionEngine: initializeExecutionEngine(opts.executionEngine, {
metrics,
signal,
logger: logger.child({module: LoggerModule.execution}),
}),
executionBuilder: opts.executionBuilder.enabled
? initializeExecutionBuilder(opts.executionBuilder, config, metrics)
: undefined,
Expand Down
Loading
Loading