@@ -14,9 +14,8 @@ use crate::rpc::{
1414 RequestType , ResponseTermination , RpcErrorResponse , RpcResponse , RpcSuccessResponse , RPC ,
1515} ;
1616use crate :: types:: {
17- attestation_sync_committee_topics, fork_core_topics, subnet_from_topic_hash, GossipEncoding ,
18- GossipKind , GossipTopic , SnappyTransform , Subnet , SubnetDiscovery , ALTAIR_CORE_TOPICS ,
19- BASE_CORE_TOPICS , CAPELLA_CORE_TOPICS , LIGHT_CLIENT_GOSSIP_TOPICS ,
17+ all_topics_at_fork, core_topics_to_subscribe, is_fork_non_core_topic, subnet_from_topic_hash,
18+ GossipEncoding , GossipKind , GossipTopic , SnappyTransform , Subnet , SubnetDiscovery ,
2019} ;
2120use crate :: EnrExt ;
2221use crate :: Eth2Enr ;
@@ -280,14 +279,39 @@ impl<E: EthSpec> Network<E> {
280279 // Set up a scoring update interval
281280 let update_gossipsub_scores = tokio:: time:: interval ( params. decay_interval ) ;
282281
283- let max_topics = ctx. chain_spec . attestation_subnet_count as usize
284- + SYNC_COMMITTEE_SUBNET_COUNT as usize
285- + ctx. chain_spec . blob_sidecar_subnet_count_max ( ) as usize
286- + ctx. chain_spec . data_column_sidecar_subnet_count as usize
287- + BASE_CORE_TOPICS . len ( )
288- + ALTAIR_CORE_TOPICS . len ( )
289- + CAPELLA_CORE_TOPICS . len ( ) // 0 core deneb and electra topics
290- + LIGHT_CLIENT_GOSSIP_TOPICS . len ( ) ;
282+ let current_and_future_forks = ForkName :: list_all ( ) . into_iter ( ) . filter_map ( |fork| {
283+ if fork >= ctx. fork_context . current_fork ( ) {
284+ ctx. fork_context
285+ . to_context_bytes ( fork)
286+ . map ( |fork_digest| ( fork, fork_digest) )
287+ } else {
288+ None
289+ }
290+ } ) ;
291+
292+ let all_topics_for_forks = current_and_future_forks
293+ . map ( |( fork, fork_digest) | {
294+ all_topics_at_fork :: < E > ( fork, & ctx. chain_spec )
295+ . into_iter ( )
296+ . map ( |topic| {
297+ Topic :: new ( GossipTopic :: new (
298+ topic,
299+ GossipEncoding :: default ( ) ,
300+ fork_digest,
301+ ) )
302+ . into ( )
303+ } )
304+ . collect :: < Vec < TopicHash > > ( )
305+ } )
306+ . collect :: < Vec < _ > > ( ) ;
307+
308+ // For simplicity find the fork with the most individual topics and assume all forks
309+ // have the same topic count
310+ let max_topics_at_any_fork = all_topics_for_forks
311+ . iter ( )
312+ . map ( |topics| topics. len ( ) )
313+ . max ( )
314+ . expect ( "each fork has at least 5 hardcoded core topics" ) ;
291315
292316 let possible_fork_digests = ctx. fork_context . all_fork_digests ( ) ;
293317 let filter = gossipsub:: MaxCountSubscriptionFilter {
@@ -297,9 +321,9 @@ impl<E: EthSpec> Network<E> {
297321 SYNC_COMMITTEE_SUBNET_COUNT ,
298322 ) ,
299323 // during a fork we subscribe to both the old and new topics
300- max_subscribed_topics : max_topics * 4 ,
324+ max_subscribed_topics : max_topics_at_any_fork * 4 ,
301325 // 424 in theory = (64 attestation + 4 sync committee + 7 core topics + 9 blob topics + 128 column topics) * 2
302- max_subscriptions_per_request : max_topics * 2 ,
326+ max_subscriptions_per_request : max_topics_at_any_fork * 2 ,
303327 } ;
304328
305329 // If metrics are enabled for libp2p build the configuration
@@ -332,17 +356,9 @@ impl<E: EthSpec> Network<E> {
332356 // If we are using metrics, then register which topics we want to make sure to keep
333357 // track of
334358 if ctx. libp2p_registry . is_some ( ) {
335- let topics_to_keep_metrics_for = attestation_sync_committee_topics :: < E > ( )
336- . map ( |gossip_kind| {
337- Topic :: from ( GossipTopic :: new (
338- gossip_kind,
339- GossipEncoding :: default ( ) ,
340- enr_fork_id. fork_digest ,
341- ) )
342- . into ( )
343- } )
344- . collect :: < Vec < TopicHash > > ( ) ;
345- gossipsub. register_topics_for_metrics ( topics_to_keep_metrics_for) ;
359+ for topics in all_topics_for_forks {
360+ gossipsub. register_topics_for_metrics ( topics) ;
361+ }
346362 }
347363
348364 ( gossipsub, update_gossipsub_scores)
@@ -700,38 +716,26 @@ impl<E: EthSpec> Network<E> {
700716
701717 /// Subscribe to all required topics for the `new_fork` with the given `new_fork_digest`.
702718 pub fn subscribe_new_fork_topics ( & mut self , new_fork : ForkName , new_fork_digest : [ u8 ; 4 ] ) {
703- // Subscribe to existing topics with new fork digest
719+ // Re-subscribe to non-core topics with the new fork digest
704720 let subscriptions = self . network_globals . gossipsub_subscriptions . read ( ) . clone ( ) ;
705721 for mut topic in subscriptions. into_iter ( ) {
706- topic. fork_digest = new_fork_digest;
707- self . subscribe ( topic) ;
722+ if is_fork_non_core_topic ( & topic, new_fork) {
723+ topic. fork_digest = new_fork_digest;
724+ self . subscribe ( topic) ;
725+ }
708726 }
709727
710728 // Subscribe to core topics for the new fork
711- for kind in fork_core_topics :: < E > (
712- & new_fork,
713- & self . fork_context . spec ,
729+ for kind in core_topics_to_subscribe :: < E > (
730+ new_fork,
714731 & self . network_globals . as_topic_config ( ) ,
732+ & self . fork_context . spec ,
715733 ) {
716734 let topic = GossipTopic :: new ( kind, GossipEncoding :: default ( ) , new_fork_digest) ;
717735 self . subscribe ( topic) ;
718736 }
719737
720- // TODO(das): unsubscribe from blob topics at the Fulu fork
721-
722- // Register the new topics for metrics
723- let topics_to_keep_metrics_for = attestation_sync_committee_topics :: < E > ( )
724- . map ( |gossip_kind| {
725- Topic :: from ( GossipTopic :: new (
726- gossip_kind,
727- GossipEncoding :: default ( ) ,
728- new_fork_digest,
729- ) )
730- . into ( )
731- } )
732- . collect :: < Vec < TopicHash > > ( ) ;
733- self . gossipsub_mut ( )
734- . register_topics_for_metrics ( topics_to_keep_metrics_for) ;
738+ // Already registered all possible gossipsub topics for metrics
735739 }
736740
737741 /// Unsubscribe from all topics that doesn't have the given fork_digest
0 commit comments