Skip to content

Commit

Permalink
refactor(reqresp)!: support byte based handlers (#5417)
Browse files Browse the repository at this point in the history
* Update reqresp to support only binary handler

* Convert beacon-node reqresp to support only binary handlers

* Update the beacon-reqresp and tests

* Update the readme

* Fix lint errors

* Increase the genesis delay to fix slow startup of nodes

* Fix the table reporter

* Fix a type issue

* Update the table reporter

* Fix el log file name

* Update the metadata response to support fork digest

* Update context bytes for v1,v2 protocols

* Update beacon metadata response

* Add more test fixtures

* Add protocol version to log for debugging

* Update the metadata response format

* Remove unncessary comments

* Debug logger

* Fix the sim log reporter

* Revert the genesis slot delay

* Fix e2e test types

* Fix e2e tests

* Update structure of handler

* Update the beacon-node handlers

* Fix linter errors

* Fix readme file

* Fix prettier warnings

* Review PR

* Fix lib import

* Update readme example

* Use Version enum

* Simplify MetadataController

* Add ReqResp interop test

* Add INVALID_RESPONSE_SSZ

* Review PR

---------

Co-authored-by: dapplion <35266934+dapplion@users.noreply.github.com>
  • Loading branch information
nazarhussain and dapplion authored May 2, 2023
1 parent 342eb17 commit 6a6669f
Show file tree
Hide file tree
Showing 76 changed files with 1,423 additions and 1,551 deletions.
257 changes: 123 additions & 134 deletions packages/beacon-node/src/network/reqresp/ReqRespBeaconNode.ts

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {ContextBytesType, EncodedPayload, EncodedPayloadType} from "@lodestar/reqresp";
import {ResponseOutgoing} from "@lodestar/reqresp";
import {deneb} from "@lodestar/types";
import {toHex} from "@lodestar/utils";
import {IBeaconChain} from "../../../chain/index.js";
Expand All @@ -9,7 +9,7 @@ export async function* onBeaconBlockAndBlobsSidecarByRoot(
requestBody: deneb.BeaconBlockAndBlobsSidecarByRootRequest,
chain: IBeaconChain,
db: IBeaconDb
): AsyncIterable<EncodedPayload<deneb.SignedBeaconBlockAndBlobsSidecar>> {
): AsyncIterable<ResponseOutgoing> {
const finalizedSlot = chain.forkChoice.getFinalizedBlock().slot;

for (const blockRoot of requestBody) {
Expand Down Expand Up @@ -41,12 +41,8 @@ export async function* onBeaconBlockAndBlobsSidecarByRoot(
}

yield {
type: EncodedPayloadType.bytes,
bytes: signedBeaconBlockAndBlobsSidecarFromBytes(blockBytes, blobsSidecarBytes),
contextBytes: {
type: ContextBytesType.ForkDigest,
forkSlot,
},
data: signedBeaconBlockAndBlobsSidecarFromBytes(blockBytes, blobsSidecarBytes),
fork: chain.config.getForkName(forkSlot),
};
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {GENESIS_SLOT, MAX_REQUEST_BLOCKS} from "@lodestar/params";
import {ContextBytesType, EncodedPayloadBytes, EncodedPayloadType, ResponseError, RespStatus} from "@lodestar/reqresp";
import {ResponseError, ResponseOutgoing, RespStatus} from "@lodestar/reqresp";
import {deneb, phase0} from "@lodestar/types";
import {fromHex} from "@lodestar/utils";
import {IBeaconChain} from "../../../chain/index.js";
Expand All @@ -11,7 +11,7 @@ export function onBeaconBlocksByRange(
request: phase0.BeaconBlocksByRangeRequest,
chain: IBeaconChain,
db: IBeaconDb
): AsyncIterable<EncodedPayloadBytes> {
): AsyncIterable<ResponseOutgoing> {
return onBlocksOrBlobsSidecarsByRange(request, chain, {
finalized: db.blockArchive,
unfinalized: db.block,
Expand All @@ -25,7 +25,7 @@ export async function* onBlocksOrBlobsSidecarsByRange(
finalized: Pick<IBeaconDb["blockArchive"], "binaryEntriesStream" | "decodeKey">;
unfinalized: Pick<IBeaconDb["block"], "getBinary">;
}
): AsyncIterable<EncodedPayloadBytes> {
): AsyncIterable<ResponseOutgoing> {
const {startSlot, count} = validateBeaconBlocksByRangeRequest(request);
const endSlot = startSlot + count;

Expand All @@ -43,12 +43,8 @@ export async function* onBlocksOrBlobsSidecarsByRange(
// Chain of blobs won't change
for await (const {key, value} of db.finalized.binaryEntriesStream({gte: startSlot, lt: endSlot})) {
yield {
type: EncodedPayloadType.bytes,
bytes: value,
contextBytes: {
type: ContextBytesType.ForkDigest,
forkSlot: db.finalized.decodeKey(key),
},
data: value,
fork: chain.config.getForkName(db.finalized.decodeKey(key)),
};
}
}
Expand Down Expand Up @@ -78,12 +74,8 @@ export async function* onBlocksOrBlobsSidecarsByRange(
}

yield {
type: EncodedPayloadType.bytes,
bytes: blockBytes,
contextBytes: {
type: ContextBytesType.ForkDigest,
forkSlot: block.slot,
},
data: blockBytes,
fork: chain.config.getForkName(block.slot),
};
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {toHexString} from "@chainsafe/ssz";
import {EncodedPayload, EncodedPayloadType, ContextBytesType} from "@lodestar/reqresp";
import {allForks, phase0, Slot} from "@lodestar/types";
import {ResponseOutgoing} from "@lodestar/reqresp";
import {Slot, phase0} from "@lodestar/types";
import {IBeaconChain} from "../../../chain/index.js";
import {IBeaconDb} from "../../../db/index.js";
import {getSlotFromSignedBeaconBlockSerialized} from "../../../util/sszBytes.js";
Expand All @@ -9,7 +9,7 @@ export async function* onBeaconBlocksByRoot(
requestBody: phase0.BeaconBlocksByRootRequest,
chain: IBeaconChain,
db: IBeaconDb
): AsyncIterable<EncodedPayload<allForks.SignedBeaconBlock>> {
): AsyncIterable<ResponseOutgoing> {
for (const blockRoot of requestBody) {
const root = blockRoot;
const summary = chain.forkChoice.getBlock(root);
Expand Down Expand Up @@ -39,12 +39,8 @@ export async function* onBeaconBlocksByRoot(
}

yield {
type: EncodedPayloadType.bytes,
bytes: blockBytes,
contextBytes: {
type: ContextBytesType.ForkDigest,
forkSlot: slot,
},
data: blockBytes,
fork: chain.config.getForkName(slot),
};
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {deneb} from "@lodestar/types";
import {EncodedPayloadBytes} from "@lodestar/reqresp";
import {ResponseOutgoing} from "@lodestar/reqresp";
import {IBeaconChain} from "../../../chain/index.js";
import {IBeaconDb} from "../../../db/index.js";
import {onBlocksOrBlobsSidecarsByRange} from "./beaconBlocksByRange.js";
Expand All @@ -10,7 +10,7 @@ export function onBlobsSidecarsByRange(
request: deneb.BlobsSidecarsByRangeRequest,
chain: IBeaconChain,
db: IBeaconDb
): AsyncIterable<EncodedPayloadBytes> {
): AsyncIterable<ResponseOutgoing> {
return onBlocksOrBlobsSidecarsByRange(request, chain, {
finalized: db.blobsSidecarArchive,
unfinalized: db.blobsSidecar,
Expand Down
41 changes: 24 additions & 17 deletions packages/beacon-node/src/network/reqresp/handlers/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {HandlerTypeFromMessage} from "@lodestar/reqresp";
import * as protocols from "@lodestar/reqresp/protocols";
import {PeerId} from "@libp2p/interface-peer-id";
import {phase0, ssz} from "@lodestar/types";
import {ProtocolHandler, ResponseOutgoing} from "@lodestar/reqresp";
import {IBeaconChain} from "../../../chain/index.js";
import {IBeaconDb} from "../../../db/index.js";
import {onBeaconBlocksByRange} from "./beaconBlocksByRange.js";
Expand All @@ -13,15 +14,15 @@ import {onLightClientUpdatesByRange} from "./lightClientUpdatesByRange.js";
import {onStatus} from "./status.js";

export interface ReqRespHandlers {
onStatus: HandlerTypeFromMessage<typeof protocols.Status>;
onBeaconBlocksByRange: HandlerTypeFromMessage<typeof protocols.BeaconBlocksByRange>;
onBeaconBlocksByRoot: HandlerTypeFromMessage<typeof protocols.BeaconBlocksByRoot>;
onBeaconBlockAndBlobsSidecarByRoot: HandlerTypeFromMessage<typeof protocols.BeaconBlockAndBlobsSidecarByRoot>;
onBlobsSidecarsByRange: HandlerTypeFromMessage<typeof protocols.BlobsSidecarsByRange>;
onLightClientBootstrap: HandlerTypeFromMessage<typeof protocols.LightClientBootstrap>;
onLightClientUpdatesByRange: HandlerTypeFromMessage<typeof protocols.LightClientUpdatesByRange>;
onLightClientFinalityUpdate: HandlerTypeFromMessage<typeof protocols.LightClientFinalityUpdate>;
onLightClientOptimisticUpdate: HandlerTypeFromMessage<typeof protocols.LightClientOptimisticUpdate>;
onStatus: (req: phase0.Status, peerId: PeerId) => AsyncIterable<ResponseOutgoing>;
onBeaconBlocksByRange: ProtocolHandler;
onBeaconBlocksByRoot: ProtocolHandler;
onBeaconBlockAndBlobsSidecarByRoot: ProtocolHandler;
onBlobsSidecarsByRange: ProtocolHandler;
onLightClientBootstrap: ProtocolHandler;
onLightClientUpdatesByRange: ProtocolHandler;
onLightClientFinalityUpdate: ProtocolHandler;
onLightClientOptimisticUpdate: ProtocolHandler;
}
/**
* The ReqRespHandler module handles app-level requests / responses from other peers,
Expand All @@ -33,22 +34,28 @@ export function getReqRespHandlers({db, chain}: {db: IBeaconDb; chain: IBeaconCh
yield* onStatus(chain);
},
async *onBeaconBlocksByRange(req) {
yield* onBeaconBlocksByRange(req, chain, db);
const body = ssz.phase0.BeaconBlocksByRangeRequest.deserialize(req.data);
yield* onBeaconBlocksByRange(body, chain, db);
},
async *onBeaconBlocksByRoot(req) {
yield* onBeaconBlocksByRoot(req, chain, db);
const body = ssz.phase0.BeaconBlocksByRootRequest.deserialize(req.data);
yield* onBeaconBlocksByRoot(body, chain, db);
},
async *onBeaconBlockAndBlobsSidecarByRoot(req) {
yield* onBeaconBlockAndBlobsSidecarByRoot(req, chain, db);
const body = ssz.deneb.BeaconBlockAndBlobsSidecarByRootRequest.deserialize(req.data);
yield* onBeaconBlockAndBlobsSidecarByRoot(body, chain, db);
},
async *onBlobsSidecarsByRange(req) {
yield* onBlobsSidecarsByRange(req, chain, db);
const body = ssz.deneb.BlobsSidecarsByRangeRequest.deserialize(req.data);
yield* onBlobsSidecarsByRange(body, chain, db);
},
async *onLightClientBootstrap(req) {
yield* onLightClientBootstrap(req, chain);
const body = ssz.Root.deserialize(req.data);
yield* onLightClientBootstrap(body, chain);
},
async *onLightClientUpdatesByRange(req) {
yield* onLightClientUpdatesByRange(req, chain);
const body = ssz.altair.LightClientUpdatesByRange.deserialize(req.data);
yield* onLightClientUpdatesByRange(body, chain);
},
async *onLightClientFinalityUpdate() {
yield* onLightClientFinalityUpdate(chain);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
import {
EncodedPayload,
EncodedPayloadType,
RespStatus,
ResponseError,
LightClientServerError,
LightClientServerErrorCode,
ResponseOutgoing,
} from "@lodestar/reqresp";
import {Root, allForks} from "@lodestar/types";
import {Root} from "@lodestar/types";
import {IBeaconChain} from "../../../chain/index.js";
import {ReqRespMethod, responseSszTypeByMethod} from "../types.js";

export async function* onLightClientBootstrap(
requestBody: Root,
chain: IBeaconChain
): AsyncIterable<EncodedPayload<allForks.LightClientBootstrap>> {
export async function* onLightClientBootstrap(requestBody: Root, chain: IBeaconChain): AsyncIterable<ResponseOutgoing> {
try {
const bootstrap = await chain.lightClientServer.getBootstrap(requestBody);
const fork = chain.config.getForkName(bootstrap.header.beacon.slot);
const type = responseSszTypeByMethod[ReqRespMethod.LightClientBootstrap](fork, 0);
yield {
type: EncodedPayloadType.ssz,
data: await chain.lightClientServer.getBootstrap(requestBody),
data: type.serialize(bootstrap),
fork,
};
} catch (e) {
if ((e as LightClientServerError).type?.code === LightClientServerErrorCode.RESOURCE_UNAVAILABLE) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import {EncodedPayload, ResponseError, RespStatus, EncodedPayloadType} from "@lodestar/reqresp";
import {allForks} from "@lodestar/types";
import {ResponseOutgoing, RespStatus, ResponseError} from "@lodestar/reqresp";
import {IBeaconChain} from "../../../chain/index.js";
import {ReqRespMethod, responseSszTypeByMethod} from "../types.js";

export async function* onLightClientFinalityUpdate(
chain: IBeaconChain
): AsyncIterable<EncodedPayload<allForks.LightClientFinalityUpdate>> {
const finalityUpdate = chain.lightClientServer.getFinalityUpdate();
if (finalityUpdate === null) {
export async function* onLightClientFinalityUpdate(chain: IBeaconChain): AsyncIterable<ResponseOutgoing> {
const update = chain.lightClientServer.getFinalityUpdate();
if (update === null) {
throw new ResponseError(RespStatus.RESOURCE_UNAVAILABLE, "No latest finality update available");
} else {
const fork = chain.config.getForkName(update.signatureSlot);
const type = responseSszTypeByMethod[ReqRespMethod.LightClientFinalityUpdate](fork, 0);
yield {
type: EncodedPayloadType.ssz,
data: finalityUpdate,
data: type.serialize(update),
fork,
};
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import {EncodedPayload, EncodedPayloadType, ResponseError, RespStatus} from "@lodestar/reqresp";
import {allForks} from "@lodestar/types";
import {ResponseOutgoing, ResponseError, RespStatus} from "@lodestar/reqresp";
import {IBeaconChain} from "../../../chain/index.js";
import {ReqRespMethod, responseSszTypeByMethod} from "../types.js";

export async function* onLightClientOptimisticUpdate(
chain: IBeaconChain
): AsyncIterable<EncodedPayload<allForks.LightClientOptimisticUpdate>> {
const optimisticUpdate = chain.lightClientServer.getOptimisticUpdate();
if (optimisticUpdate === null) {
export async function* onLightClientOptimisticUpdate(chain: IBeaconChain): AsyncIterable<ResponseOutgoing> {
const update = chain.lightClientServer.getOptimisticUpdate();
if (update === null) {
throw new ResponseError(RespStatus.RESOURCE_UNAVAILABLE, "No latest optimistic update available");
} else {
const fork = chain.config.getForkName(update.signatureSlot);
const type = responseSszTypeByMethod[ReqRespMethod.LightClientOptimisticUpdate](fork, 0);
yield {
type: EncodedPayloadType.ssz,
data: optimisticUpdate,
data: type.serialize(update),
fork,
};
}
}
Original file line number Diff line number Diff line change
@@ -1,25 +1,29 @@
import {altair, allForks} from "@lodestar/types";
import {altair} from "@lodestar/types";
import {MAX_REQUEST_LIGHT_CLIENT_UPDATES} from "@lodestar/params";
import {
EncodedPayload,
EncodedPayloadType,
ResponseOutgoing,
LightClientServerError,
LightClientServerErrorCode,
ResponseError,
RespStatus,
} from "@lodestar/reqresp";
import {IBeaconChain} from "../../../chain/index.js";
import {ReqRespMethod, responseSszTypeByMethod} from "../types.js";

export async function* onLightClientUpdatesByRange(
requestBody: altair.LightClientUpdatesByRange,
chain: IBeaconChain
): AsyncIterable<EncodedPayload<allForks.LightClientUpdate>> {
): AsyncIterable<ResponseOutgoing> {
const count = Math.min(MAX_REQUEST_LIGHT_CLIENT_UPDATES, requestBody.count);
for (let period = requestBody.startPeriod; period < requestBody.startPeriod + count; period++) {
try {
const update = await chain.lightClientServer.getUpdate(period);
const fork = chain.config.getForkName(update.signatureSlot);
const type = responseSszTypeByMethod[ReqRespMethod.LightClientUpdatesByRange](fork, 0);

yield {
type: EncodedPayloadType.ssz,
data: await chain.lightClientServer.getUpdate(period),
data: type.serialize(update),
fork,
};
} catch (e) {
if ((e as LightClientServerError).type?.code === LightClientServerErrorCode.RESOURCE_UNAVAILABLE) {
Expand Down
14 changes: 10 additions & 4 deletions packages/beacon-node/src/network/reqresp/handlers/status.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import {EncodedPayload, EncodedPayloadType} from "@lodestar/reqresp";
import {phase0} from "@lodestar/types";
import {ResponseOutgoing} from "@lodestar/reqresp";
import {ssz} from "@lodestar/types";
import {ForkName} from "@lodestar/params";
import {IBeaconChain} from "../../../chain/index.js";

export async function* onStatus(chain: IBeaconChain): AsyncIterable<EncodedPayload<phase0.Status>> {
yield {type: EncodedPayloadType.ssz, data: chain.getStatus()};
export async function* onStatus(chain: IBeaconChain): AsyncIterable<ResponseOutgoing> {
const status = chain.getStatus();
yield {
data: ssz.phase0.Status.serialize(status),
// Status topic is fork-agnostic
fork: ForkName.phase0,
};
}
Loading

0 comments on commit 6a6669f

Please sign in to comment.