Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion packages/beacon-node/src/network/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ export const defaultNetworkOptions: NetworkOptions = {
slotsToSubscribeBeforeAggregatorDuty: 2,
// This will enable the light client server by default
disableLightClientServer: false,
// for PeerDAS, this is the same to TARGET_SUBNET_PEERS, should reavaluate after devnets
// specific option for fulu
// - this is the same to TARGET_SUBNET_PEERS
// - for fusaka-devnets, we have 25-30 peers per subnet
// - for public testnets or mainnet, average number of peers per group is SAMPLES_PER_SLOT * targetPeers / NUMBER_OF_CUSTODY_GROUPS = 6.25 so this should not be an issue
targetGroupPeers: 6,
};
6 changes: 3 additions & 3 deletions packages/beacon-node/src/network/peers/discover.ts
Original file line number Diff line number Diff line change
Expand Up @@ -486,9 +486,9 @@ export class PeerDiscovery {
if (forkSeq >= ForkSeq.fulu && peer.peerCustodyGroups !== null) {
// pre-fulu `this.groupRequests` is empty
// starting from fulu, we need to make sure we have stable subnet sampling peers first
// given CUSTODY_REQUIREMENT = 4 and 100 peers, we have 400 custody columns from peers
// with NUMBER_OF_CUSTODY_GROUPS = 128, we have 400 / 128 = 3.125 peers per column in average
// it would not be hard to find TARGET_SUBNET_PEERS(6) peers per SAMPLES_PER_SLOT(8) columns
// given SAMPLES_PER_SLOT = 8 and 100 peers, we have 800 custody columns from peers
// with NUMBER_OF_CUSTODY_GROUPS = 128, we have 800 / 128 = 6.25 peers per column in average
// it would not be hard to find TARGET_SUBNET_PEERS(6) peers per sampling columns columns and TARGET_GROUP_PEERS_PER_SUBNET(4) peers per non-sampling columns
// after some first heartbeats, we should have no more column requested, then go with conditions of prior forks
let hasMatchingGroup = false;
let groupRequestCount = 0;
Expand Down
10 changes: 9 additions & 1 deletion packages/beacon-node/src/network/peers/peerManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -342,12 +342,18 @@ export class PeerManager {
const oldMetadata = peerData.metadata;
const custodyGroupCount =
(metadata as Partial<fulu.Metadata>).custodyGroupCount ?? this.config.CUSTODY_REQUIREMENT;
const samplingGroupCount = Math.max(this.config.SAMPLES_PER_SLOT, custodyGroupCount);
const nodeId = peerData?.nodeId ?? computeNodeId(peer);
// TODO(fulu): this should be columns not groups. need to change everywhere
const custodyGroups =
oldMetadata == null || oldMetadata.custodyGroups == null || custodyGroupCount !== oldMetadata.custodyGroupCount
? getCustodyGroups(nodeId, custodyGroupCount)
: oldMetadata.custodyGroups;
const oldSamplingGroupCount = Math.max(this.config.SAMPLES_PER_SLOT, oldMetadata?.custodyGroupCount ?? 0);
const samplingGroups =
oldMetadata == null || oldMetadata.samplingGroups == null || samplingGroupCount !== oldSamplingGroupCount
? getCustodyGroups(nodeId, samplingGroupCount)
: oldMetadata.samplingGroups;
peerData.metadata = {
seqNumber: metadata.seqNumber,
attnets: metadata.attnets,
Expand All @@ -357,6 +363,7 @@ export class PeerManager {
// TODO: spec says that Clients MAY reject peers with a value less than CUSTODY_REQUIREMENT
this.config.CUSTODY_REQUIREMENT,
custodyGroups,
samplingGroups,
};
if (oldMetadata === null || oldMetadata.custodyGroupCount !== peerData.metadata.custodyGroupCount) {
void this.requestStatus(peer, this.statusCache.get());
Expand Down Expand Up @@ -560,7 +567,8 @@ export class PeerManager {
status: peerData?.status ?? null,
attnets: peerData?.metadata?.attnets ?? null,
syncnets: peerData?.metadata?.syncnets ?? null,
custodyGroups: peerData?.metadata?.custodyGroups ?? null,
// here we care samplingGroups not custodyGroups in order to know which column subnets peers subscribe to
samplingGroups: peerData?.metadata?.samplingGroups ?? null,
score: this.peerRpcScores.getScore(peer),
};
}),
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/network/peers/peersData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {NodeId} from "../subnets/interface.js";
import {ClientKind} from "./client.js";

type PeerIdStr = string;
type Metadata = fulu.Metadata & {custodyGroups: CustodyIndex[]};
type Metadata = fulu.Metadata & {custodyGroups: CustodyIndex[]; samplingGroups: CustodyIndex[]};
export type PeerSyncMeta = {
peerId: PeerIdStr;
client: string;
Expand Down
43 changes: 28 additions & 15 deletions packages/beacon-node/src/network/peers/utils/prioritizePeers.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {BitArray} from "@chainsafe/ssz";
import {Direction, PeerId} from "@libp2p/interface";
import {ATTESTATION_SUBNET_COUNT, SYNC_COMMITTEE_SUBNET_COUNT} from "@lodestar/params";
import {ATTESTATION_SUBNET_COUNT, NUMBER_OF_CUSTODY_GROUPS, SYNC_COMMITTEE_SUBNET_COUNT} from "@lodestar/params";
import {CustodyIndex, Status, SubnetID, altair, phase0} from "@lodestar/types";
import {MapDef} from "@lodestar/utils";
import {shuffle} from "../../../util/shuffle.js";
Expand All @@ -11,6 +11,12 @@ import {RequestedSubnet} from "./subnetMap.js";
/** Target number of peers we'd like to have connected to a given long-lived subnet */
const TARGET_SUBNET_PEERS = 6;

/**
* This is for non-sampling groups only. This is a very easy number to achieve given an average of 6.25 peers per column subnet on public networks.
* This is needed to always maintain some minimum peers on all subnets so that when we publish a block, we're sure we pubish to all column subnets.
*/
const TARGET_GROUP_PEERS_PER_SUBNET = 4;

/**
* This is used in the pruning logic. We avoid pruning peers on sync-committees if doing so would
* lower our peer count below this number. Instead we favour a non-uniform distribution of subnet
Expand Down Expand Up @@ -91,7 +97,7 @@ type PeerInfo = {
statusScore: StatusScore;
attnets: phase0.AttestationSubnets;
syncnets: altair.SyncSubnets;
custodyGroups: CustodyIndex[];
samplingGroups: CustodyIndex[];
attnetsTrueBitIndices: number[];
syncnetsTrueBitIndices: number[];
score: number;
Expand Down Expand Up @@ -121,7 +127,7 @@ export enum ExcessPeerDisconnectReason {
* - Reach `targetPeers`
* - If we're starved for data, prune additional peers
* - Don't exceed `maxPeers`
* - Ensure there are enough peers per active subnet
* - Ensure there are enough peers per column subnets, attestation subnets and sync committee subnets
* - Prioritize peers with good score
*
* pre-fulu samplingGroups is not used and this function returns empty groupQueries
Expand All @@ -133,7 +139,7 @@ export function prioritizePeers(
status: Status | null;
attnets: phase0.AttestationSubnets | null;
syncnets: altair.SyncSubnets | null;
custodyGroups: CustodyIndex[] | null;
samplingGroups: CustodyIndex[] | null;
score: number;
}[],
activeAttnets: RequestedSubnet[],
Expand Down Expand Up @@ -161,7 +167,7 @@ export function prioritizePeers(
statusScore: computeStatusScore(opts.status, peer.status, opts),
attnets: peer.attnets ?? attnetsZero,
syncnets: peer.syncnets ?? syncnetsZero,
custodyGroups: peer.custodyGroups ?? [],
samplingGroups: peer.samplingGroups ?? [],
attnetsTrueBitIndices: peer.attnets?.getTrueBitIndexes() ?? [],
syncnetsTrueBitIndices: peer.syncnets?.getTrueBitIndexes() ?? [],
score: peer.score,
Expand Down Expand Up @@ -210,7 +216,7 @@ function requestSubnetPeers(
connectedPeers: PeerInfo[],
activeAttnets: RequestedSubnet[],
activeSyncnets: RequestedSubnet[],
samplingGroups: CustodyIndex[] | undefined,
ourSamplingGroups: CustodyIndex[] | undefined,
opts: PrioritizePeersOpts,
metrics: NetworkCoreMetrics | null
): {
Expand Down Expand Up @@ -278,26 +284,33 @@ function requestSubnetPeers(
}
}

// column subnets, do we need queries for more peers
const targetGroupPeers = opts.targetGroupPeers;
const groupQueries: GroupQueries = new Map();
// pre-fulu
if (ourSamplingGroups == null) {
return {attnetQueries, syncnetQueries, groupQueries, dutiesByPeer};
}

// column subnets, do we need queries for more peers
const targetGroupPeersPerSamplingGroup = opts.targetGroupPeers;
const peersPerGroup = new Map<CustodyIndex, number>();
for (const peer of connectedPeers) {
const {custodyGroups} = peer;
for (const group of custodyGroups) {
const peerSamplingGroups = peer.samplingGroups;
for (const group of peerSamplingGroups) {
peersPerGroup.set(group, 1 + (peersPerGroup.get(group) ?? 0));
}
}

let groupIndex = 0;
for (const group of samplingGroups ?? []) {
const peersInGroup = peersPerGroup.get(group) ?? 0;
const ourSamplingGroupSet = new Set(ourSamplingGroups);
for (let groupIndex = 0; groupIndex < NUMBER_OF_CUSTODY_GROUPS; groupIndex++) {
const peersInGroup = peersPerGroup.get(groupIndex) ?? 0;
metrics?.peerCountPerSamplingGroup.set({groupIndex}, peersInGroup);
const targetGroupPeers = ourSamplingGroupSet.has(groupIndex)
? targetGroupPeersPerSamplingGroup
: TARGET_GROUP_PEERS_PER_SUBNET;
if (peersInGroup < targetGroupPeers) {
// We need more peers
groupQueries.set(group, targetGroupPeers - peersInGroup);
groupQueries.set(groupIndex, targetGroupPeers - peersInGroup);
}
groupIndex++;
}

return {attnetQueries, syncnetQueries, groupQueries, dutiesByPeer};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,14 @@ describe("network / peers / PeerManager", () => {
// Simulate peer1 returning a PING and STATUS message
const remoteStatus = statusCache.get();
const custodyGroupCount = config.CUSTODY_REQUIREMENT;
const samplingGroupCount = config.SAMPLES_PER_SLOT;
const remoteMetadata: NonNullable<ReturnType<PeerManager["connectedPeers"]["get"]>>["metadata"] = {
seqNumber: BigInt(1),
attnets: getAttnets(),
syncnets: getSyncnets(),
custodyGroupCount,
custodyGroups: getCustodyGroups(computeNodeId(peerId1), custodyGroupCount),
samplingGroups: getCustodyGroups(computeNodeId(peerId1), samplingGroupCount),
};
reqResp.sendPing.mockResolvedValue(remoteMetadata.seqNumber);
reqResp.sendStatus.mockResolvedValue(remoteStatus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ describe("prioritizePeers", () => {
Array.from({length: Math.floor(syncnetPercentage * SYNC_COMMITTEE_SUBNET_COUNT)}, (_, i) => i)
),
score: lowestScore + ((highestScore - lowestScore) * i) / defaultNetworkOptions.maxPeers,
custodyGroups: [],
samplingGroups: [],
status: ssz.phase0.Status.defaultValue(),
}));

Expand Down
Loading
Loading