Skip to content

Commit

Permalink
fix: make SLOTS_TO_SUBSCRIBE_IN_ADVANCE as hidden cli param (#5819)
Browse files Browse the repository at this point in the history
  • Loading branch information
twoeths authored Jul 31, 2023
1 parent 19b723c commit 487aef9
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 21 deletions.
2 changes: 2 additions & 0 deletions packages/beacon-node/src/network/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,6 @@ export const defaultNetworkOptions: NetworkOptions = {
gossipsubDHigh: 9,
// TODO set to false in order to release 1.9.0 in a timely manner
useWorker: false,
// subscribe 2 slots before aggregator dutied slot to get stable mesh peers as monitored on goerli
slotsToSubscribeBeforeAggregatorDuty: 2,
};
19 changes: 8 additions & 11 deletions packages/beacon-node/src/network/subnets/attnetsService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ enum SubnetSource {
random = "random",
}

/** As monitored on goerli, we only need to subscribe 2 slots before aggregator dutied slot to get stable mesh peers */
const SLOTS_TO_SUBSCRIBE_IN_ADVANCE = 2;

/**
* Manage random (long lived) subnets and committee (short lived) subnets.
* - PeerManager uses attnetsService to know which peers are requried for duties
Expand Down Expand Up @@ -82,18 +79,18 @@ export class AttnetsService implements IAttnetsService {
private readonly metadata: MetadataController,
private readonly logger: Logger,
private readonly metrics: NetworkCoreMetrics | null,
private readonly opts?: SubnetsServiceOpts & SubnetsServiceTestOpts
private readonly opts: SubnetsServiceOpts & SubnetsServiceTestOpts
) {
// if subscribeAllSubnets, we act like we have >= ATTESTATION_SUBNET_COUNT validators connecting to this node
// so that we have enough subnet topic peers, see https://github.com/ChainSafe/lodestar/issues/4921
if (this.opts?.subscribeAllSubnets) {
if (this.opts.subscribeAllSubnets) {
for (let subnet = 0; subnet < ATTESTATION_SUBNET_COUNT; subnet++) {
this.committeeSubnets.request({subnet, toSlot: Infinity});
}
}

this.randBetweenFn = this.opts?.randBetweenFn ?? randBetween;
this.shuffleFn = this.opts?.shuffleFn ?? shuffle;
this.randBetweenFn = this.opts.randBetweenFn ?? randBetween;
this.shuffleFn = this.opts.shuffleFn ?? shuffle;
if (metrics) {
metrics.attnetsService.subscriptionsRandom.addCollect(() => this.onScrapeLodestarMetrics(metrics));
}
Expand Down Expand Up @@ -160,21 +157,21 @@ export class AttnetsService implements IAttnetsService {
unsubscribeSubnetsFromPrevFork(prevFork: ForkName): void {
this.logger.info("Unsuscribing to random attnets from prev fork", {prevFork});
for (let subnet = 0; subnet < ATTESTATION_SUBNET_COUNT; subnet++) {
if (!this.opts?.subscribeAllSubnets) {
if (!this.opts.subscribeAllSubnets) {
this.gossip.unsubscribeTopic({type: gossipType, fork: prevFork, subnet});
}
}
}

/**
* Run per slot.
* - Subscribe to gossip subnets `${SLOTS_TO_SUBSCRIBE_IN_ADVANCE}` slots in advance
* - Subscribe to gossip subnets 2 slots in advance
* - Unsubscribe from expired subnets
*/
private onSlot = (clockSlot: Slot): void => {
try {
for (const [dutiedSlot, subnets] of this.aggregatorSlotSubnet.entries()) {
if (dutiedSlot === clockSlot + SLOTS_TO_SUBSCRIBE_IN_ADVANCE) {
if (dutiedSlot === clockSlot + this.opts.slotsToSubscribeBeforeAggregatorDuty) {
// Trigger gossip subscription first, in batch
if (subnets.size > 0) {
this.subscribeToSubnets(Array.from(subnets), SubnetSource.committee);
Expand Down Expand Up @@ -353,7 +350,7 @@ export class AttnetsService implements IAttnetsService {
/** Trigger a gossip un-subscrition only if no-one is still subscribed */
private unsubscribeSubnets(subnets: number[], slot: Slot, src: SubnetSource): void {
// No need to unsubscribeTopic(). Return early to prevent repetitive extra work
if (this.opts?.subscribeAllSubnets) return;
if (this.opts.subscribeAllSubnets) return;

const forks = getActiveForks(this.config, this.clock.currentEpoch);
for (const subnet of subnets) {
Expand Down
15 changes: 6 additions & 9 deletions packages/beacon-node/src/network/subnets/dllAttnetsService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ enum SubnetSource {
longLived = "long_lived",
}

/** As monitored on goerli, we only need to subscribe 2 slots before aggregator dutied slot to get stable mesh peers */
const SLOTS_TO_SUBSCRIBE_IN_ADVANCE = 2;

/**
* Manage deleterministic long lived (DLL) subnets and short lived subnets.
* - PeerManager uses attnetsService to know which peers are required for duties and long lived subscriptions
Expand Down Expand Up @@ -58,11 +55,11 @@ export class DLLAttnetsService implements IAttnetsService {
private readonly logger: Logger,
private readonly metrics: NetworkCoreMetrics | null,
private readonly nodeId: NodeId | null,
private readonly opts?: SubnetsServiceOpts
private readonly opts: SubnetsServiceOpts
) {
// if subscribeAllSubnets, we act like we have >= ATTESTATION_SUBNET_COUNT validators connecting to this node
// so that we have enough subnet topic peers, see https://github.com/ChainSafe/lodestar/issues/4921
if (this.opts?.subscribeAllSubnets) {
if (this.opts.subscribeAllSubnets) {
for (let subnet = 0; subnet < ATTESTATION_SUBNET_COUNT; subnet++) {
this.committeeSubnets.request({subnet, toSlot: Infinity});
}
Expand Down Expand Up @@ -147,21 +144,21 @@ export class DLLAttnetsService implements IAttnetsService {
unsubscribeSubnetsFromPrevFork(prevFork: ForkName): void {
this.logger.info("Unsuscribing to long lived attnets from prev fork", {prevFork});
for (let subnet = 0; subnet < ATTESTATION_SUBNET_COUNT; subnet++) {
if (!this.opts?.subscribeAllSubnets) {
if (!this.opts.subscribeAllSubnets) {
this.gossip.unsubscribeTopic({type: gossipType, fork: prevFork, subnet});
}
}
}

/**
* Run per slot.
* - Subscribe to gossip subnets `${SLOTS_TO_SUBSCRIBE_IN_ADVANCE}` slots in advance
* - Subscribe to gossip subnets 2 slots in advance
* - Unsubscribe from expired subnets
*/
private onSlot = (clockSlot: Slot): void => {
try {
for (const [dutiedSlot, subnets] of this.aggregatorSlotSubnet.entries()) {
if (dutiedSlot === clockSlot + SLOTS_TO_SUBSCRIBE_IN_ADVANCE) {
if (dutiedSlot === clockSlot + this.opts.slotsToSubscribeBeforeAggregatorDuty) {
// Trigger gossip subscription first, in batch
if (subnets.size > 0) {
this.subscribeToSubnets(Array.from(subnets), SubnetSource.committee);
Expand Down Expand Up @@ -290,7 +287,7 @@ export class DLLAttnetsService implements IAttnetsService {
**/
private unsubscribeSubnets(subnets: number[], slot: Slot, src: SubnetSource): void {
// No need to unsubscribeTopic(). Return early to prevent repetitive extra work
if (this.opts?.subscribeAllSubnets) return;
if (this.opts.subscribeAllSubnets) return;

const forks = getActiveForks(this.config, this.clock.currentEpoch);
for (const subnet of subnets) {
Expand Down
1 change: 1 addition & 0 deletions packages/beacon-node/src/network/subnets/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export type ShuffleFn = <T>(arr: T[]) => T[];
export type SubnetsServiceOpts = {
deterministicLongLivedAttnets?: boolean;
subscribeAllSubnets?: boolean;
slotsToSubscribeBeforeAggregatorDuty: number;
};

export type SubnetsServiceTestOpts = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ describe("AttnetsService", function () {
getCurrentSlot(config, Math.floor(Date.now() / 1000));
metadata = new MetadataController({}, {config, onSetValue: () => null});
service = new AttnetsService(config, clock, gossipStub, metadata, logger, null, {
slotsToSubscribeBeforeAggregatorDuty: 2,
randBetweenFn,
shuffleFn: shuffleFn as ShuffleFn,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ describe("DLLAttnetsService", () => {
// load getCurrentSlot first, vscode not able to debug without this
getCurrentSlot(config, Math.floor(Date.now() / 1000));
metadata = new MetadataController({}, {config, onSetValue: () => null});
service = new DLLAttnetsService(config, clock, gossipStub, metadata, logger, null, nodeId);
service = new DLLAttnetsService(config, clock, gossipStub, metadata, logger, null, nodeId, {
slotsToSubscribeBeforeAggregatorDuty: 2,
});
});

afterEach(() => {
Expand Down
11 changes: 11 additions & 0 deletions packages/cli/src/options/beaconNodeOptions/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export type NetworkArgs = {
targetPeers?: number;
deterministicLongLivedAttnets?: boolean;
subscribeAllSubnets?: boolean;
slotsToSubscribeBeforeAggregatorDuty?: number;
disablePeerScoring?: boolean;
mdns?: boolean;
"network.maxPeers"?: number;
Expand Down Expand Up @@ -135,6 +136,8 @@ export function parseArgs(args: NetworkArgs): IBeaconNodeOptions["network"] {
localMultiaddrs: [localMu, localMu6].filter(Boolean) as string[],
deterministicLongLivedAttnets: args["deterministicLongLivedAttnets"],
subscribeAllSubnets: args["subscribeAllSubnets"],
slotsToSubscribeBeforeAggregatorDuty:
args["slotsToSubscribeBeforeAggregatorDuty"] ?? defaultOptions.network.slotsToSubscribeBeforeAggregatorDuty,
disablePeerScoring: args["disablePeerScoring"],
connectToDiscv5Bootnodes: args["network.connectToDiscv5Bootnodes"],
discv5FirstQueryDelayMs: args["network.discv5FirstQueryDelayMs"],
Expand Down Expand Up @@ -235,6 +238,14 @@ export const options: CliCommandOptions<NetworkArgs> = {
group: "network",
},

slotsToSubscribeBeforeAggregatorDuty: {
hidden: true,
type: "number",
description: "Number of slots before an aggregator duty to subscribe to subnets",
defaultDescription: String(defaultOptions.network.slotsToSubscribeBeforeAggregatorDuty),
group: "network",
},

disablePeerScoring: {
type: "boolean",
description: "Disable peer scoring, used for testing on devnets",
Expand Down
2 changes: 2 additions & 0 deletions packages/cli/test/unit/options/beaconNodeOptions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ describe("options / beaconNodeOptions", () => {
targetPeers: 25,
deterministicLongLivedAttnets: true,
subscribeAllSubnets: true,
slotsToSubscribeBeforeAggregatorDuty: 1,
disablePeerScoring: true,
mdns: false,
"network.maxPeers": 30,
Expand Down Expand Up @@ -180,6 +181,7 @@ describe("options / beaconNodeOptions", () => {
localMultiaddrs: ["/ip4/127.0.0.1/tcp/9001"],
deterministicLongLivedAttnets: true,
subscribeAllSubnets: true,
slotsToSubscribeBeforeAggregatorDuty: 1,
disablePeerScoring: true,
connectToDiscv5Bootnodes: true,
discv5FirstQueryDelayMs: 1000,
Expand Down

0 comments on commit 487aef9

Please sign in to comment.