Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions packages/beacon-node/src/network/core/networkCore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ import {ReqRespBeaconNode} from "../reqresp/ReqRespBeaconNode.js";
import {GetReqRespHandlerFn, OutgoingRequestArgs} from "../reqresp/types.js";
import {LocalStatusCache} from "../statusCache.js";
import {AttnetsService} from "../subnets/attnetsService.js";
import {CommitteeSubscription, IAttnetsService, computeNodeId} from "../subnets/interface.js";
import {CommitteeSubscription, IAttnetsService, NodeId, computeNodeId} from "../subnets/interface.js";
import {SyncnetsService} from "../subnets/syncnetsService.js";
import {getConnectionsMap} from "../util.js";
import {NetworkCoreMetrics, createNetworkCoreMetrics} from "./metrics.js";
import {INetworkCore, MultiaddrStr, PeerIdStr} from "./types.js";

type Mods = {
nodeId: NodeId;
libp2p: Libp2p;
gossip: Eth2Gossipsub;
reqResp: ReqRespBeaconNode;
Expand Down Expand Up @@ -98,13 +99,15 @@ export class NetworkCore implements INetworkCore {
private readonly clock: IClock;
private readonly statusCache: LocalStatusCache;
private readonly metrics: NetworkCoreMetrics | null;
private readonly nodeId: NodeId;
private readonly opts: NetworkOptions;

// Internal state
private readonly subscribedForks = new Set<ForkName>();
private closed = false;

constructor(modules: Mods) {
this.nodeId = modules.nodeId;
this.libp2p = modules.libp2p;
this.gossip = modules.gossip;
this.reqResp = modules.reqResp;
Expand Down Expand Up @@ -172,8 +175,11 @@ export class NetworkCore implements INetworkCore {
opts
);

const nodeId = computeNodeId(peerId);

const gossip = new Eth2Gossipsub(opts, {
config,
nodeId,
libp2p,
logger,
metricsRegister: metricsRegistry,
Expand All @@ -193,7 +199,6 @@ export class NetworkCore implements INetworkCore {
// should be called before AttnetsService constructor so that node subscribe to deterministic attnet topics
await gossip.start();

const nodeId = computeNodeId(peerId);
const attnetsService = new AttnetsService(config, clock, gossip, metadata, logger, metrics, nodeId, opts);
const syncnetsService = new SyncnetsService(config, clock, gossip, metadata, logger, metrics, opts);

Expand Down Expand Up @@ -230,6 +235,7 @@ export class NetworkCore implements INetworkCore {
metadata.upstreamValues(clock.currentEpoch);

return new NetworkCore({
nodeId,
libp2p,
reqResp,
gossip,
Expand Down Expand Up @@ -506,7 +512,7 @@ export class NetworkCore implements INetworkCore {
this.subscribedForks.add(fork);
const {subscribeAllSubnets, disableLightClientServer} = this.opts;

for (const topic of getCoreTopicsAtFork(config, fork, {
for (const topic of getCoreTopicsAtFork(config, fork, this.nodeId, {
subscribeAllSubnets,
disableLightClientServer,
})) {
Expand All @@ -519,7 +525,7 @@ export class NetworkCore implements INetworkCore {
this.subscribedForks.delete(fork);
const {subscribeAllSubnets, disableLightClientServer} = this.opts;

for (const topic of getCoreTopicsAtFork(config, fork, {
for (const topic of getCoreTopicsAtFork(config, fork, this.nodeId, {
subscribeAllSubnets,
disableLightClientServer,
})) {
Expand Down
14 changes: 10 additions & 4 deletions packages/beacon-node/src/network/gossip/gossipsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {GossipTopic, GossipType} from "./interface.js";
import {Eth2GossipsubMetrics, createEth2GossipsubMetrics} from "./metrics.js";
import {GossipTopicCache, getCoreTopicsAtFork, stringifyGossipTopic} from "./topic.js";

import {NodeId} from "../subnets/interface.js";
import {
GOSSIP_D,
GOSSIP_D_HIGH,
Expand All @@ -40,6 +41,7 @@ export type Eth2Context = {

export type Eth2GossipsubModules = {
config: BeaconConfig;
nodeId: NodeId;
libp2p: Libp2p;
logger: Logger;
metricsRegister: RegistryMetricCreator | null;
Expand Down Expand Up @@ -87,7 +89,7 @@ export class Eth2Gossipsub extends GossipSub {
const gossipTopicCache = new GossipTopicCache(modules.config);

const scoreParams = computeGossipPeerScoreParams(modules);
const {config, logger, metricsRegister, peersData, events} = modules;
const {config, logger, metricsRegister, peersData, events, nodeId} = modules;

// Gossipsub parameters defined here:
// https://github.com/ethereum/consensus-specs/blob/v1.1.10/specs/phase0/p2p-interface.md#the-gossip-domain-gossipsub
Expand Down Expand Up @@ -126,7 +128,7 @@ export class Eth2Gossipsub extends GossipSub {
),
metricsRegister: metricsRegister as MetricsRegister | null,
metricsTopicStrToLabel: metricsRegister
? getMetricsTopicStrToLabel(config, {disableLightClientServer: opts.disableLightClientServer ?? false})
? getMetricsTopicStrToLabel(config, nodeId, {disableLightClientServer: opts.disableLightClientServer ?? false})
: undefined,
asyncValidation: true,

Expand Down Expand Up @@ -326,11 +328,15 @@ function attSubnetLabel(subnet: SubnetID): string {
return `0${subnet}`;
}

function getMetricsTopicStrToLabel(config: BeaconConfig, opts: {disableLightClientServer: boolean}): TopicStrToLabel {
function getMetricsTopicStrToLabel(
config: BeaconConfig,
nodeId: NodeId,
opts: {disableLightClientServer: boolean}
): TopicStrToLabel {
const metricsTopicStrToLabel = new Map<TopicStr, TopicLabel>();

for (const {name: fork} of config.forksAscendingEpochOrder) {
const topics = getCoreTopicsAtFork(config, fork, {
const topics = getCoreTopicsAtFork(config, fork, nodeId, {
subscribeAllSubnets: true,
disableLightClientServer: opts.disableLightClientServer,
});
Expand Down
10 changes: 8 additions & 2 deletions packages/beacon-node/src/network/gossip/topic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import {
import {Attestation, SingleAttestation, ssz, sszTypesFor} from "@lodestar/types";

import {GossipAction, GossipActionError, GossipErrorCode} from "../../chain/errors/gossipValidation.js";
import {getDataColumns} from "../../util/dataColumns.js";
import {NodeId} from "../subnets/interface.js";
import {DEFAULT_ENCODING} from "./constants.js";
import {GossipEncoding, GossipTopic, GossipTopicTypeMap, GossipType, SSZTypeOfGossipTopic} from "./interface.js";

Expand Down Expand Up @@ -229,6 +231,7 @@ export function parseGossipTopic(forkDigestContext: ForkDigestContext, topicStr:
export function getCoreTopicsAtFork(
config: ChainConfig,
fork: ForkName,
nodeId: NodeId,
opts: {subscribeAllSubnets?: boolean; disableLightClientServer?: boolean}
): GossipTopicTypeMap[keyof GossipTopicTypeMap][] {
// Common topics for all forks
Expand All @@ -242,8 +245,11 @@ export function getCoreTopicsAtFork(

// After fulu also track data_column_sidecar_{index}
if (ForkSeq[fork] >= ForkSeq.fulu) {
// TODO: @matthewkeil check if this needs to be updated for custody groups
for (let index = 0; index < DATA_COLUMN_SIDECAR_SUBNET_COUNT; index++) {
const sampleSubnets = getDataColumns(
nodeId,
Math.max(config.CUSTODY_REQUIREMENT, config.NODE_CUSTODY_REQUIREMENT, config.SAMPLES_PER_SLOT)
);
for (const index of sampleSubnets) {
topics.push({type: GossipType.data_column_sidecar, index});
}
}
Expand Down
Loading