Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 49 additions & 45 deletions beacon_node/lighthouse_network/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ use crate::rpc::{
RequestType, ResponseTermination, RpcErrorResponse, RpcResponse, RpcSuccessResponse, RPC,
};
use crate::types::{
attestation_sync_committee_topics, fork_core_topics, subnet_from_topic_hash, GossipEncoding,
GossipKind, GossipTopic, SnappyTransform, Subnet, SubnetDiscovery, ALTAIR_CORE_TOPICS,
BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS,
all_topics_at_fork, core_topics_to_subscribe, is_fork_non_core_topic, subnet_from_topic_hash,
GossipEncoding, GossipKind, GossipTopic, SnappyTransform, Subnet, SubnetDiscovery,
};
use crate::EnrExt;
use crate::Eth2Enr;
Expand Down Expand Up @@ -280,14 +279,39 @@ impl<E: EthSpec> Network<E> {
// Set up a scoring update interval
let update_gossipsub_scores = tokio::time::interval(params.decay_interval);

let max_topics = ctx.chain_spec.attestation_subnet_count as usize
+ SYNC_COMMITTEE_SUBNET_COUNT as usize
+ ctx.chain_spec.blob_sidecar_subnet_count_max() as usize
+ ctx.chain_spec.data_column_sidecar_subnet_count as usize
+ BASE_CORE_TOPICS.len()
+ ALTAIR_CORE_TOPICS.len()
+ CAPELLA_CORE_TOPICS.len() // 0 core deneb and electra topics
+ LIGHT_CLIENT_GOSSIP_TOPICS.len();
let current_and_future_forks = ForkName::list_all().into_iter().filter_map(|fork| {
if fork >= ctx.fork_context.current_fork() {
ctx.fork_context
.to_context_bytes(fork)
.map(|fork_digest| (fork, fork_digest))
} else {
None
}
});

let all_topics_for_forks = current_and_future_forks
.map(|(fork, fork_digest)| {
all_topics_at_fork::<E>(fork, &ctx.chain_spec)
.into_iter()
.map(|topic| {
Topic::new(GossipTopic::new(
topic,
GossipEncoding::default(),
fork_digest,
))
.into()
})
.collect::<Vec<TopicHash>>()
})
.collect::<Vec<_>>();

// For simplicity find the fork with the most individual topics and assume all forks
// have the same topic count
let max_topics_at_any_fork = all_topics_for_forks
.iter()
.map(|topics| topics.len())
.max()
.expect("each fork has at least 5 hardcoded core topics");

let possible_fork_digests = ctx.fork_context.all_fork_digests();
let filter = gossipsub::MaxCountSubscriptionFilter {
Expand All @@ -297,9 +321,9 @@ impl<E: EthSpec> Network<E> {
SYNC_COMMITTEE_SUBNET_COUNT,
),
// during a fork we subscribe to both the old and new topics
max_subscribed_topics: max_topics * 4,
max_subscribed_topics: max_topics_at_any_fork * 4,
// 424 in theory = (64 attestation + 4 sync committee + 7 core topics + 9 blob topics + 128 column topics) * 2
max_subscriptions_per_request: max_topics * 2,
max_subscriptions_per_request: max_topics_at_any_fork * 2,
};

// If metrics are enabled for libp2p build the configuration
Expand Down Expand Up @@ -332,17 +356,9 @@ impl<E: EthSpec> Network<E> {
// If we are using metrics, then register which topics we want to make sure to keep
// track of
if ctx.libp2p_registry.is_some() {
let topics_to_keep_metrics_for = attestation_sync_committee_topics::<E>()
.map(|gossip_kind| {
Topic::from(GossipTopic::new(
gossip_kind,
GossipEncoding::default(),
enr_fork_id.fork_digest,
))
.into()
})
.collect::<Vec<TopicHash>>();
gossipsub.register_topics_for_metrics(topics_to_keep_metrics_for);
for topics in all_topics_for_forks {
gossipsub.register_topics_for_metrics(topics);
}
}

(gossipsub, update_gossipsub_scores)
Expand Down Expand Up @@ -700,38 +716,26 @@ impl<E: EthSpec> Network<E> {

/// Subscribe to all required topics for the `new_fork` with the given `new_fork_digest`.
pub fn subscribe_new_fork_topics(&mut self, new_fork: ForkName, new_fork_digest: [u8; 4]) {
// Subscribe to existing topics with new fork digest
// Re-subscribe to non-core topics with the new fork digest
let subscriptions = self.network_globals.gossipsub_subscriptions.read().clone();
for mut topic in subscriptions.into_iter() {
topic.fork_digest = new_fork_digest;
self.subscribe(topic);
if is_fork_non_core_topic(&topic, new_fork) {
topic.fork_digest = new_fork_digest;
self.subscribe(topic);
}
}

// Subscribe to core topics for the new fork
for kind in fork_core_topics::<E>(
&new_fork,
&self.fork_context.spec,
for kind in core_topics_to_subscribe::<E>(
new_fork,
&self.network_globals.as_topic_config(),
&self.fork_context.spec,
) {
let topic = GossipTopic::new(kind, GossipEncoding::default(), new_fork_digest);
self.subscribe(topic);
}

// TODO(das): unsubscribe from blob topics at the Fulu fork

// Register the new topics for metrics
let topics_to_keep_metrics_for = attestation_sync_committee_topics::<E>()
.map(|gossip_kind| {
Topic::from(GossipTopic::new(
gossip_kind,
GossipEncoding::default(),
new_fork_digest,
))
.into()
})
.collect::<Vec<TopicHash>>();
self.gossipsub_mut()
.register_topics_for_metrics(topics_to_keep_metrics_for);
// Already registered all possible gossipsub topics for metrics
}

/// Unsubscribe from all topics that doesn't have the given fork_digest
Expand Down
2 changes: 2 additions & 0 deletions beacon_node/lighthouse_network/src/types/globals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ impl<E: EthSpec> NetworkGlobals<E> {
/// Returns the TopicConfig to compute the set of Gossip topics for a given fork
pub fn as_topic_config(&self) -> TopicConfig {
TopicConfig {
enable_light_client_server: self.config.enable_light_client_server,
subscribe_all_subnets: self.config.subscribe_all_subnets,
subscribe_all_data_column_subnets: self.config.subscribe_all_data_column_subnets,
sampling_subnets: &self.sampling_subnets,
}
Expand Down
5 changes: 2 additions & 3 deletions beacon_node/lighthouse_network/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ pub use pubsub::{PubsubMessage, SnappyTransform};
pub use subnet::{Subnet, SubnetDiscovery};
pub use sync_state::{BackFillState, SyncState};
pub use topics::{
attestation_sync_committee_topics, core_topics_to_subscribe, fork_core_topics,
subnet_from_topic_hash, GossipEncoding, GossipKind, GossipTopic, TopicConfig,
ALTAIR_CORE_TOPICS, BASE_CORE_TOPICS, CAPELLA_CORE_TOPICS, LIGHT_CLIENT_GOSSIP_TOPICS,
all_topics_at_fork, core_topics_to_subscribe, is_fork_non_core_topic, subnet_from_topic_hash,
GossipEncoding, GossipKind, GossipTopic, TopicConfig,
};
Loading