Skip to content

Commit

Permalink
fix: handle only downloaded blocks in unknown block sync (#5656)
Browse files Browse the repository at this point in the history
* fix: handle downloaded blocks only

* chore: add unit test

* chore: fix comment
  • Loading branch information
twoeths authored Jun 16, 2023
1 parent 2eddf46 commit 7eec2e5
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 35 deletions.
34 changes: 21 additions & 13 deletions packages/beacon-node/src/sync/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,28 +55,36 @@ export interface SyncModules {
wsCheckpoint?: phase0.Checkpoint;
}

export type UnknownAndAncestorBlocks = {
unknowns: UnknownBlock[];
ancestors: DownloadedBlock[];
};

/**
* onUnknownBlock: store 1 record with undefined parentBlockRootHex & blockInput, blockRootHex as key, status pending
* onUnknownBlockParent:
* - store 1 record with known parentBlockRootHex & blockInput, blockRootHex as key, status downloaded
* - store 1 record with undefined parentBlockRootHex & blockInput, parentBlockRootHex as key, status pending
*/
export type PendingBlock = {
export type PendingBlock = UnknownBlock | DownloadedBlock;

type PendingBlockCommon = {
blockRootHex: RootHex;
peerIdStrs: Set<string>;
downloadAttempts: number;
} & (
| {
status: PendingBlockStatus.pending | PendingBlockStatus.fetching;
parentBlockRootHex: null;
blockInput: null;
}
| {
status: PendingBlockStatus.downloaded | PendingBlockStatus.processing;
parentBlockRootHex: RootHex;
blockInput: BlockInput;
}
);
};

export type UnknownBlock = PendingBlockCommon & {
status: PendingBlockStatus.pending | PendingBlockStatus.fetching;
parentBlockRootHex: null;
blockInput: null;
};

export type DownloadedBlock = PendingBlockCommon & {
status: PendingBlockStatus.downloaded | PendingBlockStatus.processing;
parentBlockRootHex: RootHex;
blockInput: BlockInput;
};

export enum PendingBlockStatus {
pending = "pending",
Expand Down
2 changes: 2 additions & 0 deletions packages/beacon-node/src/sync/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ export type SyncOptions = {
* allocation to backfill sync. The default of 0 would mean backfill sync will be skipped
*/
backfillBatchSize: number;
/** For testing only, MAX_PENDING_BLOCKS by default */
maxPendingBlocks?: number;
};

export const defaultSyncOptions: SyncOptions = {
Expand Down
37 changes: 29 additions & 8 deletions packages/beacon-node/src/sync/unknownBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {BlockError, BlockErrorCode} from "../chain/errors/index.js";
import {beaconBlocksMaybeBlobsByRoot} from "../network/reqresp/beaconBlocksMaybeBlobsByRoot.js";
import {wrapError} from "../util/wrapError.js";
import {PendingBlock, PendingBlockStatus, PendingBlockType} from "./interface.js";
import {getDescendantBlocks, getAllDescendantBlocks, getUnknownBlocks} from "./utils/pendingBlocksTree.js";
import {getDescendantBlocks, getAllDescendantBlocks, getUnknownAndAncestorBlocks} from "./utils/pendingBlocksTree.js";
import {SyncOptions} from "./options.js";

const MAX_ATTEMPTS_PER_BLOCK = 5;
Expand All @@ -29,6 +29,7 @@ export class UnknownBlockSync {
private readonly pendingBlocks = new Map<RootHex, PendingBlock>();
private readonly knownBadBlocks = new Set<RootHex>();
private readonly proposerBoostSecWindow: number;
private readonly maxPendingBlocks;

constructor(
private readonly config: ChainForkConfig,
Expand All @@ -46,6 +47,7 @@ export class UnknownBlockSync {
} else {
this.logger.debug("UnknownBlockSync disabled.");
}
this.maxPendingBlocks = opts?.maxPendingBlocks ?? MAX_PENDING_BLOCKS;

this.proposerBoostSecWindow = this.config.SECONDS_PER_SLOT / INTERVALS_PER_SLOT;

Expand Down Expand Up @@ -147,7 +149,7 @@ export class UnknownBlockSync {
}

// Limit pending blocks to prevent DOS attacks that cause OOM
const prunedItemCount = pruneSetToMax(this.pendingBlocks, MAX_PENDING_BLOCKS);
const prunedItemCount = pruneSetToMax(this.pendingBlocks, this.maxPendingBlocks);
if (prunedItemCount > 0) {
this.logger.warn(`Pruned ${prunedItemCount} pending blocks from UnknownBlockSync`);
}
Expand All @@ -169,13 +171,32 @@ export class UnknownBlockSync {
return;
}

const unknownBlocks = getUnknownBlocks(this.pendingBlocks);
if (unknownBlocks.length === 0) {
this.logger.debug("No unknown block to download", {pendingBlocks: this.pendingBlocks.size});
const {unknowns, ancestors} = getUnknownAndAncestorBlocks(this.pendingBlocks);
// it's rare when there is no unknown block
// see https://github.com/ChainSafe/lodestar/issues/5649#issuecomment-1594213550
if (unknowns.length === 0) {
let processedBlocks = 0;

for (const block of ancestors) {
// when this happens, it's likely the block and parent block are processed by head sync
if (this.chain.forkChoice.hasBlockHex(block.parentBlockRootHex)) {
processedBlocks++;
this.processBlock(block).catch((e) => {
this.logger.debug("Unexpected error - process old downloaded block", {}, e);
});
}
}

this.logger.verbose("No unknown block, process ancestor downloaded blocks", {
pendingBlocks: this.pendingBlocks.size,
ancestorBlocks: ancestors.length,
processedBlocks,
});
return;
}

for (const block of unknownBlocks) {
// most of the time there is exactly 1 unknown block
for (const block of unknowns) {
this.downloadBlock(block, connectedPeers).catch((e) => {
this.logger.debug("Unexpected error - downloadBlock", {root: block.blockRootHex}, e);
});
Expand Down Expand Up @@ -222,7 +243,7 @@ export class UnknownBlockSync {
if (parentInForkchoice) {
// Bingo! Process block. Add to pending blocks anyway for recycle the cache that prevents duplicate processing
this.processBlock(block).catch((e) => {
this.logger.debug("Unexpected error - processBlock", {}, e);
this.logger.debug("Unexpected error - process newly downloaded block", {}, e);
});
} else if (blockSlot <= finalizedSlot) {
// the common ancestor of the downloading chain and canonical chain should be at least the finalized slot and
Expand Down Expand Up @@ -314,7 +335,7 @@ export class UnknownBlockSync {
// Send child blocks to the processor
for (const descendantBlock of getDescendantBlocks(pendingBlock.blockRootHex, this.pendingBlocks)) {
this.processBlock(descendantBlock).catch((e) => {
this.logger.debug("Unexpected error - processBlock", {}, e);
this.logger.debug("Unexpected error - process descendant block", {}, e);
});
}
} else {
Expand Down
31 changes: 25 additions & 6 deletions packages/beacon-node/src/sync/utils/pendingBlocksTree.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import {RootHex} from "@lodestar/types";
import {MapDef} from "@lodestar/utils";
import {PendingBlock, PendingBlockStatus} from "../interface.js";
import {
DownloadedBlock,
PendingBlock,
PendingBlockStatus,
UnknownAndAncestorBlocks,
UnknownBlock,
} from "../interface.js";

export function getAllDescendantBlocks(blockRootHex: RootHex, blocks: Map<RootHex, PendingBlock>): PendingBlock[] {
// Do one pass over all blocks to index by parent
Expand Down Expand Up @@ -43,14 +49,27 @@ export function getDescendantBlocks(blockRootHex: RootHex, blocks: Map<RootHex,
return descendantBlocks;
}

export function getUnknownBlocks(blocks: Map<RootHex, PendingBlock>): PendingBlock[] {
const blocksToFetch: PendingBlock[] = [];
/**
* Given this chain segment unknown block n => downloaded block n + 1 => downloaded block n + 2
* return `{unknowns: [n], ancestors: []}`
*
* Given this chain segment: downloaded block n => downloaded block n + 1 => downloaded block n + 2
* return {unknowns: [], ancestors: [n]}
*/
export function getUnknownAndAncestorBlocks(blocks: Map<RootHex, PendingBlock>): UnknownAndAncestorBlocks {
const unknowns: UnknownBlock[] = [];
const ancestors: DownloadedBlock[] = [];

for (const block of blocks.values()) {
if (block.status === PendingBlockStatus.pending && block.blockInput == null && block.parentBlockRootHex == null) {
blocksToFetch.push(block);
const parentHex = block.parentBlockRootHex;
if (block.status === PendingBlockStatus.pending && block.blockInput == null && parentHex == null) {
unknowns.push(block);
}

if (parentHex && !blocks.has(parentHex)) {
ancestors.push(block);
}
}

return blocksToFetch;
return {unknowns, ancestors};
}
34 changes: 32 additions & 2 deletions packages/beacon-node/test/unit/sync/unknownBlock.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {BlockSource, getBlockInput} from "../../../src/chain/blocks/types.js";
import {ClockStopped} from "../../utils/mocks/clock.js";
import {SeenBlockProposers} from "../../../src/chain/seenCache/seenBlockProposers.js";
import {BlockError, BlockErrorCode} from "../../../src/chain/errors/blockError.js";
import {defaultSyncOptions} from "../../../src/sync/options.js";

describe("sync / UnknownBlockSync", () => {
const logger = testLogger();
Expand All @@ -38,6 +39,7 @@ describe("sync / UnknownBlockSync", () => {
reportPeer?: boolean;
seenBlock?: boolean;
wrongBlockRoot?: boolean;
maxPendingBlocks?: number;
}[] = [
{
id: "fetch and process multiple unknown blocks",
Expand Down Expand Up @@ -72,9 +74,23 @@ describe("sync / UnknownBlockSync", () => {
event: NetworkEvent.unknownBlock,
finalizedSlot: 1,
},
{
id: "downloaded blocks only",
event: NetworkEvent.unknownBlockParent,
finalizedSlot: 0,
maxPendingBlocks: 1,
},
];

for (const {id, event, finalizedSlot, reportPeer = false, seenBlock = false, wrongBlockRoot = false} of testCases) {
for (const {
id,
event,
finalizedSlot,
reportPeer = false,
seenBlock = false,
wrongBlockRoot = false,
maxPendingBlocks,
} of testCases) {
it(id, async () => {
const peer = await getRandPeerIdStr();
const blockA = ssz.phase0.SignedBeaconBlock.defaultValue();
Expand Down Expand Up @@ -130,7 +146,10 @@ describe("sync / UnknownBlockSync", () => {
// only return seenBlock for blockC
isKnown: (blockSlot) => (blockSlot === blockC.message.slot ? seenBlock : false),
};

let blockAResolver: () => void;
let blockCResolver: () => void;
const blockAProcessed = new Promise<void>((resolve) => (blockAResolver = resolve));
const blockCProcessed = new Promise<void>((resolve) => (blockCResolver = resolve));

const chain: Partial<IBeaconChain> = {
Expand All @@ -147,13 +166,17 @@ describe("sync / UnknownBlockSync", () => {
const blockRootHex = toHexString(ssz.phase0.BeaconBlock.hashTreeRoot(block.message));
forkChoiceKnownRoots.add(blockRootHex);
if (blockRootHex === blockRootHexC) blockCResolver();
if (blockRootHex === blockRootHexA) blockAResolver();
},
seenBlockProposers: seenBlockProposers as SeenBlockProposers,
};

const setTimeoutSpy = sandbox.spy(global, "setTimeout");
const processBlockSpy = sandbox.spy(chain, "processBlock");
const syncService = new UnknownBlockSync(config, network as INetwork, chain as IBeaconChain, logger, null);
const syncService = new UnknownBlockSync(config, network as INetwork, chain as IBeaconChain, logger, null, {
...defaultSyncOptions,
maxPendingBlocks,
});
if (event === NetworkEvent.unknownBlockParent) {
network.events?.emit(NetworkEvent.unknownBlockParent, {
blockInput: getBlockInput.preDeneb(config, blockC, BlockSource.gossip),
Expand All @@ -175,6 +198,13 @@ describe("sync / UnknownBlockSync", () => {
const err = await reportPeerPromise;
expect(err[0]).equal(peer);
expect([err[1], err[2]]).to.be.deep.equal([PeerAction.LowToleranceError, "BadBlockByRoot"]);
} else if (maxPendingBlocks === 1) {
await blockAProcessed;
// not able to process blockB and blockC because maxPendingBlocks is 1
expect(Array.from(forkChoiceKnownRoots.values())).to.deep.equal(
[blockRootHex0, blockRootHexA],
"Wrong blocks in mock ForkChoice"
);
} else {
// Wait for all blocks to be in ForkChoice store
await blockCProcessed;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import {expect} from "chai";
import {RootHex} from "@lodestar/types";
import {PendingBlock, PendingBlockStatus} from "../../../../src/sync/index.js";
import {PendingBlock, PendingBlockStatus, UnknownAndAncestorBlocks} from "../../../../src/sync/index.js";
import {
getAllDescendantBlocks,
getDescendantBlocks,
getUnknownBlocks,
getUnknownAndAncestorBlocks,
} from "../../../../src/sync/utils/pendingBlocksTree.js";

describe("sync / pendingBlocksTree", () => {
Expand All @@ -13,14 +13,14 @@ describe("sync / pendingBlocksTree", () => {
blocks: {block: string; parent: string | null}[];
getAllDescendantBlocks: {block: string; res: string[]}[];
getDescendantBlocks: {block: string; res: string[]}[];
getUnknownBlocks: string[];
getUnknownOrAncestorBlocks: {unknowns: string[]; ancestors: string[]};
}[] = [
{
id: "empty case",
blocks: [],
getAllDescendantBlocks: [{block: "0A", res: []}],
getDescendantBlocks: [{block: "0A", res: []}],
getUnknownBlocks: [],
getUnknownOrAncestorBlocks: {unknowns: [], ancestors: []},
},
{
id: "two branches with multiple blocks",
Expand All @@ -44,7 +44,7 @@ describe("sync / pendingBlocksTree", () => {
{block: "3C", res: ["4C"]},
{block: "3B", res: []},
],
getUnknownBlocks: ["0A"],
getUnknownOrAncestorBlocks: {unknowns: ["0A"], ancestors: ["4C"]},
},
];

Expand Down Expand Up @@ -72,7 +72,7 @@ describe("sync / pendingBlocksTree", () => {
}

it("getUnknownBlocks", () => {
expect(toRes(getUnknownBlocks(blocks))).to.deep.equal(testCase.getUnknownBlocks);
expect(toRes2(getUnknownAndAncestorBlocks(blocks))).to.deep.equal(testCase.getUnknownOrAncestorBlocks);
});
});
}
Expand All @@ -81,3 +81,10 @@ describe("sync / pendingBlocksTree", () => {
function toRes(blocks: PendingBlock[]): string[] {
return blocks.map((block) => block.blockRootHex);
}

function toRes2(blocks: UnknownAndAncestorBlocks): {unknowns: string[]; ancestors: string[]} {
return {
unknowns: blocks.unknowns.map((block) => block.blockRootHex),
ancestors: blocks.ancestors.map((block) => block.blockRootHex),
};
}

0 comments on commit 7eec2e5

Please sign in to comment.