Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize backfill sync to efficiently use reqresp fetched block data #3669

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions packages/cli/src/options/beaconNodeOptions/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import {ICliCommandOptions} from "../../util";
export interface ISyncArgs {
"sync.isSingleNode": boolean;
"sync.disableProcessAsChainSegment": boolean;
"sync.backfillBatchSize": number;
}

export function parseArgs(args: ISyncArgs): IBeaconNodeOptions["sync"] {
return {
isSingleNode: args["sync.isSingleNode"],
disableProcessAsChainSegment: args["sync.disableProcessAsChainSegment"],
backfillBatchSize: args["sync.backfillBatchSize"],
};
}

Expand All @@ -32,4 +34,12 @@ Use only for local networks with a single node, can be dangerous in regular netw
defaultDescription: String(defaultOptions.sync.disableProcessAsChainSegment),
group: "sync",
},

"sync.backfillBatchSize": {
hidden: true,
type: "number",
description: "Batch size for backfill sync to sync/process blocks",
defaultDescription: String(defaultOptions.sync.backfillBatchSize),
group: "sync",
},
};
2 changes: 2 additions & 0 deletions packages/cli/test/unit/options/beaconNodeOptions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ describe("options / beaconNodeOptions", () => {
"network.dontSendGossipAttestationsToForkchoice": true,
"sync.isSingleNode": true,
"sync.disableProcessAsChainSegment": true,
"sync.backfillBatchSize": 34,
} as IBeaconNodeArgs;

const expectedOptions: RecursivePartial<IBeaconNodeOptions> = {
Expand Down Expand Up @@ -117,6 +118,7 @@ describe("options / beaconNodeOptions", () => {
sync: {
isSingleNode: true,
disableProcessAsChainSegment: true,
backfillBatchSize: 34,
},
};

Expand Down
2 changes: 1 addition & 1 deletion packages/lodestar/src/node/nodejs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ export class BeaconNode {
logger: logger.child(opts.logger.sync),
});

const backfillSync = await BackfillSync.init({
const backfillSync = await BackfillSync.init(opts.sync, {
config,
db,
chain,
Expand Down
77 changes: 48 additions & 29 deletions packages/lodestar/src/sync/backfill/backfill.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import {blockToHeader} from "@chainsafe/lodestar-beacon-state-transition";
import {IBeaconConfig, IChainForkConfig} from "@chainsafe/lodestar-config";
import {phase0, Root, Slot, allForks, ssz} from "@chainsafe/lodestar-types";
import {ErrorAborted, ILogger} from "@chainsafe/lodestar-utils";
import {List, toHexString} from "@chainsafe/ssz";
import {List, toHexString, TreeBacked} from "@chainsafe/ssz";
import {SLOTS_PER_EPOCH} from "@chainsafe/lodestar-params";

import {IBeaconChain} from "../../chain";
import {GENESIS_SLOT, ZERO_HASH} from "../../constants";
import {IBeaconDb} from "../../db";
Expand All @@ -16,13 +18,10 @@ import {PeerSet} from "../../util/peerMap";
import {shuffleOne} from "../../util/shuffle";
import {BackfillSyncError, BackfillSyncErrorCode} from "./errors";
import {verifyBlockProposerSignature, verifyBlockSequence, BackfillBlockHeader, BackfillBlock} from "./verify";
import {SLOTS_PER_EPOCH} from "@chainsafe/lodestar-params";
import {byteArrayEquals} from "../../util/bytes";
import {TreeBacked} from "@chainsafe/ssz";
import {computeAnchorCheckpoint} from "../../chain/initState";

/** Default batch size. Same as range sync (2 epochs) */
const BATCH_SIZE = 64;
import {BlockArchiveBatchPutBinaryItem} from "../../db/repositories";
import {EPOCHS_PER_BATCH} from "../constants";

export type BackfillSyncModules = {
chain: IBeaconChain;
Expand All @@ -44,7 +43,7 @@ type BackfillModules = BackfillSyncModules & {
};

export type BackfillSyncOpts = {
batchSize: number;
backfillBatchSize?: number;
};

export enum BackfillSyncEvent {
Expand Down Expand Up @@ -114,7 +113,7 @@ export class BackfillSync extends (EventEmitter as {new (): BackfillSyncEmitter}
/**
* Process in blocks of at max batchSize
*/
private opts: BackfillSyncOpts;
private batchSize: number;
/**
* If wsCheckpoint provided was in past then the (db) state from which beacon node started,
* needs to be validated as per spec.
Expand Down Expand Up @@ -151,7 +150,7 @@ export class BackfillSync extends (EventEmitter as {new (): BackfillSyncEmitter}
private peers = new PeerSet();
private status: BackfillSyncStatus = BackfillSyncStatus.pending;

constructor(modules: BackfillModules, opts?: BackfillSyncOpts) {
constructor(opts: BackfillSyncOpts, modules: BackfillModules) {
super();

this.syncAnchor = modules.syncAnchor;
Expand All @@ -167,7 +166,7 @@ export class BackfillSync extends (EventEmitter as {new (): BackfillSyncEmitter}
this.logger = modules.logger;
this.metrics = modules.metrics;

this.opts = opts ?? {batchSize: BATCH_SIZE};
this.batchSize = opts.backfillBatchSize ?? EPOCHS_PER_BATCH * SLOTS_PER_EPOCH;
this.network.events.on(NetworkEvent.peerConnected, this.addPeer);
this.network.events.on(NetworkEvent.peerDisconnected, this.removePeer);

Expand Down Expand Up @@ -225,8 +224,8 @@ export class BackfillSync extends (EventEmitter as {new (): BackfillSyncEmitter}
* or wsCheckpoints identifiable as the keys of backfill sync.
*/
static async init<T extends BackfillSync = BackfillSync>(
modules: BackfillSyncModules,
opts?: BackfillSyncOpts
opts: BackfillSyncOpts,
modules: BackfillSyncModules
): Promise<T> {
const {config, anchorState, db, wsCheckpoint, logger} = modules;

Expand Down Expand Up @@ -256,17 +255,14 @@ export class BackfillSync extends (EventEmitter as {new (): BackfillSyncEmitter}
// Load a previous finalized or wsCheckpoint slot from DB below anchorSlot
const prevFinalizedCheckpointBlock = await extractPreviousFinOrWsCheckpoint(config, db, anchorSlot, logger);

return new this(
{
syncAnchor,
backfillStartFromSlot,
backfillRangeWrittenSlot,
wsCheckpointHeader,
prevFinalizedCheckpointBlock,
...modules,
},
opts
) as T;
return new this(opts, {
syncAnchor,
backfillStartFromSlot,
backfillRangeWrittenSlot,
wsCheckpointHeader,
prevFinalizedCheckpointBlock,
...modules,
}) as T;
}

/** Throw / return all AsyncGenerators */
Expand Down Expand Up @@ -518,7 +514,8 @@ export class BackfillSync extends (EventEmitter as {new (): BackfillSyncEmitter}

// TODO: one can verify the child of wsDbCheckpointBlock is at
// slot > wsCheckpointHeader
wsDbCheckpointBlock.message.slot > this.wsCheckpointHeader.slot
// Note: next epoch is at wsCheckpointHeader.slot + SLOTS_PER_EPOCH
wsDbCheckpointBlock.message.slot >= this.wsCheckpointHeader.slot + SLOTS_PER_EPOCH
)
// TODO: explode and stop the entire node
throw new Error(
Expand Down Expand Up @@ -667,7 +664,7 @@ export class BackfillSync extends (EventEmitter as {new (): BackfillSyncEmitter}

let isPrevFinWsConfirmedAnchorParent = false;
while (
backCount !== this.opts.batchSize &&
backCount !== this.batchSize &&
(parentBlock = await this.db.blockArchive.getByRoot(anchorBlock.message.parentRoot))
) {
// Before moving anchorBlock back, we need check for prevFinalizedCheckpointBlock
Expand Down Expand Up @@ -738,7 +735,8 @@ export class BackfillSync extends (EventEmitter as {new (): BackfillSyncEmitter}
// we will need to go make checks on the top of sync loop before writing as it might
// override prevFinalizedCheckpointBlock
if (this.prevFinalizedCheckpointBlock.slot < anchorBlock.message.slot)
await this.db.blockArchive.put(anchorBlock.message.slot, anchorBlock);
// Block fetched from reqResp.beaconBlocksByRoot is treebacked
await this.archiveBlocksTreeBacked([anchorBlock] as TreeBacked<allForks.SignedBeaconBlock>[]);

this.syncAnchor = {
anchorBlock,
Expand All @@ -763,7 +761,7 @@ export class BackfillSync extends (EventEmitter as {new (): BackfillSyncEmitter}
}

const toSlot = this.syncAnchor.anchorBlock.message.slot;
const fromSlot = Math.max(toSlot - this.opts.batchSize, this.prevFinalizedCheckpointBlock.slot, GENESIS_SLOT);
const fromSlot = Math.max(toSlot - this.batchSize, this.prevFinalizedCheckpointBlock.slot, GENESIS_SLOT);
const blocks = await this.network.reqResp.beaconBlocksByRange(peer, {
startSlot: fromSlot,
count: toSlot - fromSlot,
Expand All @@ -783,7 +781,12 @@ export class BackfillSync extends (EventEmitter as {new (): BackfillSyncEmitter}
return;
}

const {nextAnchor, verifiedBlocks, error} = verifyBlockSequence(this.config, blocks, anchorParentRoot);
const {nextAnchor, verifiedBlocks, error} = verifyBlockSequence(
this.config,
// Blocks fetched from reqResp.beaconBlocksByRange are treebacked.
blocks as TreeBacked<allForks.SignedBeaconBlock>[],
anchorParentRoot
);

// If any of the block's proposer signature fail, we can't trust this peer at all
if (verifiedBlocks.length > 0) {
Expand All @@ -799,7 +802,8 @@ export class BackfillSync extends (EventEmitter as {new (): BackfillSyncEmitter}
// Verified blocks are in reverse order with the nextAnchor being the smallest slot
// if nextAnchor is on the same slot as prevFinalizedCheckpointBlock, we can't save
// it before returning to top of sync loop for validation
await this.db.blockArchive.batchAdd(
// Blocks fetched from reqResp.beaconBlocksByRange are treebacked.
await this.archiveBlocksTreeBacked(
nextAnchor.slot > this.prevFinalizedCheckpointBlock.slot
? verifiedBlocks
: verifiedBlocks.slice(0, verifiedBlocks.length - 1)
Expand All @@ -821,6 +825,21 @@ export class BackfillSync extends (EventEmitter as {new (): BackfillSyncEmitter}
}
if (error) throw new BackfillSyncError({code: error});
}

private async archiveBlocksTreeBacked(blocks: TreeBacked<allForks.SignedBeaconBlock>[]): Promise<void> {
const blockEntries: BlockArchiveBatchPutBinaryItem[] = blocks.map((signedBlock) => {
const block = signedBlock.message as TreeBacked<allForks.BeaconBlock>;
return {
key: block.slot,
value: signedBlock.serialize(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what I mean in the issue is that we return both the binary and the signed block in beaconBlocksByRange and beaconBlocksByRoot. So instead of returning Promise<allForks.SignedBeaconBlock[]> in network req/resp api, return something like

type CachedBytes<T> = {bytes: Uint8Array} & T;

async beaconBlocksByRange(
    peerId: PeerId,
    request: phase0.BeaconBlocksByRangeRequest
  ): Promise<CachedBytes<allForks.SignedBeaconBlock>[]>

and consume that cached bytes without having to call serialize() again in BackFill sync and similar places

@dapplion do we have this type in ssz v2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I was wondering about the same 🙂
This format should currently save on hashTreeRoot calcs which would amount for 9% of hashTreeRoot in verifiBlockSequence (#3657 (comment)), and may be some 2-3% on the serialize.

CachedBytes<allForks.SignedBeaconBlock> would be awesome ❤️ awaiting @dapplion 's input regarding ssz v2.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now, ssz v2 only (optionally) caches the hashTreeRoot (of non-tree-backed values); doesn't cache the serialized bytes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added caching capability as that was discussed to be out of scope for ssz v2 👍

slot: block.slot,
blockRoot: block.hashTreeRoot(),
// TODO: Benchmark if faster to slice Buffer or fromHexString()
parentRoot: block.parentRoot,
};
});
await this.db.blockArchive.batchPutBinary(blockEntries);
}
}

async function extractPreviousFinOrWsCheckpoint(
Expand Down
10 changes: 6 additions & 4 deletions packages/lodestar/src/sync/backfill/verify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import {allForks, CachedBeaconStateAllForks, ISignatureSet} from "@chainsafe/lod
import {IBeaconConfig} from "@chainsafe/lodestar-config";
import {Root, allForks as allForkTypes, ssz, Slot} from "@chainsafe/lodestar-types";
import {GENESIS_SLOT} from "@chainsafe/lodestar-params";
import {TreeBacked} from "@chainsafe/ssz";

import {IBlsVerifier} from "../../chain/bls";
import {BackfillSyncError, BackfillSyncErrorCode} from "./errors";

Expand All @@ -14,19 +16,19 @@ export type BackfillBlock = BackfillBlockHeader & {block: allForks.SignedBeaconB

export function verifyBlockSequence(
config: IBeaconConfig,
blocks: allForkTypes.SignedBeaconBlock[],
blocks: TreeBacked<allForkTypes.SignedBeaconBlock>[],
anchorRoot: Root
): {
nextAnchor: BackfillBlock | null;
verifiedBlocks: allForkTypes.SignedBeaconBlock[];
verifiedBlocks: TreeBacked<allForkTypes.SignedBeaconBlock>[];
error?: BackfillSyncErrorCode.NOT_LINEAR;
} {
let nextRoot: Root = anchorRoot;
let nextAnchor: BackfillBlock | null = null;

const verifiedBlocks: allForkTypes.SignedBeaconBlock[] = [];
const verifiedBlocks: TreeBacked<allForkTypes.SignedBeaconBlock>[] = [];
for (const block of blocks.reverse()) {
const blockRoot = config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message);
const blockRoot = (block.message as TreeBacked<allForkTypes.BeaconBlock>).hashTreeRoot();
if (!ssz.Root.equals(blockRoot, nextRoot)) {
if (ssz.Root.equals(nextRoot, anchorRoot)) {
throw new BackfillSyncError({code: BackfillSyncErrorCode.NOT_ANCHORED});
Expand Down
11 changes: 10 additions & 1 deletion packages/lodestar/src/sync/options.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import {SLOTS_PER_EPOCH} from "@chainsafe/lodestar-params";
import {EPOCHS_PER_BATCH} from "./constants";

export type SyncOptions = {
/**
* Allow node to consider itself synced without being connected to a peer.
Expand All @@ -9,7 +12,12 @@ export type SyncOptions = {
* Should only be used for debugging or testing.
*/
disableProcessAsChainSegment?: boolean;

/**
* The batch size of slots for backfill sync can attempt to sync/process before yielding
* to sync loop. This number can be increased or decreased to make a suitable resource
* allocation to backfill sync.
*/
backfillBatchSize?: number;
/** USE FOR TESTING ONLY. Disable range sync completely */
disableRangeSync?: boolean;
/** USE FOR TESTING ONLY. Disable range sync completely */
Expand All @@ -19,4 +27,5 @@ export type SyncOptions = {
export const defaultSyncOptions: SyncOptions = {
isSingleNode: false,
disableProcessAsChainSegment: false,
backfillBatchSize: EPOCHS_PER_BATCH * SLOTS_PER_EPOCH,
};
8 changes: 5 additions & 3 deletions packages/lodestar/test/unit/sync/backfill/verify.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {BackfillSyncErrorCode, BackfillSyncError} from "./../../../../src/sync/backfill/errors";
import {Json} from "@chainsafe/ssz";
import {Json, TreeBacked} from "@chainsafe/ssz";
import {createIBeaconConfig} from "@chainsafe/lodestar-config";
import {config} from "@chainsafe/lodestar-config/default";
import {phase0, ssz} from "@chainsafe/lodestar-types";
Expand Down Expand Up @@ -44,10 +44,12 @@ describe("backfill sync - verify block sequence", function () {
});

//first 4 mainnet blocks
function getBlocks(): phase0.SignedBeaconBlock[] {
function getBlocks(): TreeBacked<phase0.SignedBeaconBlock>[] {
const json = JSON.parse(readFileSync(path.join(__dirname, "./blocks.json"), "utf-8")) as Json[];
return json.map((b) => {
return ssz.phase0.SignedBeaconBlock.fromJson(b, {case: "snake"});
return ssz.phase0.SignedBeaconBlock.createTreeBackedFromStruct(
ssz.phase0.SignedBeaconBlock.fromJson(b, {case: "snake"})
);
});
}
});