Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose discovery NodeId to DiscoveryPeer and LibP2PPeer #38

Merged
merged 4 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;

import io.libp2p.core.crypto.PubKey;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.units.bigints.UInt256;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.events.EventChannels;
Expand All @@ -48,6 +51,7 @@
import tech.pegasys.teku.networking.eth2.gossip.topics.Eth2GossipTopicFilter;
import tech.pegasys.teku.networking.eth2.gossip.topics.OperationProcessor;
import tech.pegasys.teku.networking.eth2.gossip.topics.ProcessedAttestationSubscriptionProvider;
import tech.pegasys.teku.networking.eth2.peers.DiscoveryNodeIdExtractor;
import tech.pegasys.teku.networking.eth2.peers.Eth2PeerManager;
import tech.pegasys.teku.networking.eth2.peers.Eth2PeerSelectionStrategy;
import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.StatusMessageFactory;
Expand All @@ -57,8 +61,10 @@
import tech.pegasys.teku.networking.p2p.discovery.DiscoveryConfig;
import tech.pegasys.teku.networking.p2p.discovery.DiscoveryNetwork;
import tech.pegasys.teku.networking.p2p.discovery.DiscoveryNetworkBuilder;
import tech.pegasys.teku.networking.p2p.discovery.discv5.DiscV5Service;
import tech.pegasys.teku.networking.p2p.gossip.PreparedGossipMessageFactory;
import tech.pegasys.teku.networking.p2p.libp2p.LibP2PNetworkBuilder;
import tech.pegasys.teku.networking.p2p.libp2p.LibP2PPeer;
import tech.pegasys.teku.networking.p2p.libp2p.LibP2PPrivateKeyLoader;
import tech.pegasys.teku.networking.p2p.libp2p.gossip.GossipTopicFilter;
import tech.pegasys.teku.networking.p2p.network.P2PNetwork;
Expand Down Expand Up @@ -143,6 +149,17 @@ public Eth2P2PNetwork build() {
final SubnetSubscriptionService syncCommitteeSubnetService = new SubnetSubscriptionService();
final SubnetSubscriptionService dataColumnSidecarSubnetService =
new SubnetSubscriptionService();

// TODO a bit hacky solution, subject to be refactored
DiscoveryNodeIdExtractor discoveryNodeIdExtractor =
peer -> {
LibP2PPeer libP2PPeer = (LibP2PPeer) peer;
PubKey libP2PPubKey = libP2PPeer.getPubKey();
Bytes discoveryNodeIdBytes = DiscV5Service.DEFAULT_NODE_RECORD_CONVERTER.convertPublicKeyToNodeId(
Bytes.wrap(libP2PPubKey.raw()));
return UInt256.fromBytes(discoveryNodeIdBytes);
};

final RpcEncoding rpcEncoding =
RpcEncoding.createSszSnappyEncoding(spec.getNetworkingConfig().getMaxChunkSize());
if (statusMessageFactory == null) {
Expand All @@ -165,7 +182,8 @@ public Eth2P2PNetwork build() {
config.getPeerRateLimit(),
config.getPeerRequestLimit(),
spec,
kzg);
kzg,
discoveryNodeIdExtractor);
final Collection<RpcMethod<?, ?, ?>> eth2RpcMethods =
eth2PeerManager.getBeaconChainMethods().all();
rpcMethods.addAll(eth2RpcMethods);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
@FunctionalInterface
public interface NodeIdToDataColumnSidecarSubnetsCalculator {

Optional<SszBitvector> calculateSubnets(NodeId nodeId, int extraSubnetCount);
Optional<SszBitvector> calculateSubnets(UInt256 nodeId, int extraSubnetCount);

NodeIdToDataColumnSidecarSubnetsCalculator NOOP = (nodeId, extraSubnetCount) -> Optional.empty();

Expand All @@ -43,7 +43,7 @@ private static NodeIdToDataColumnSidecarSubnetsCalculator createAtSlot(
return (nodeId, extraSubnetCount) -> {
List<UInt64> nodeSubnets =
miscHelpers.computeDataColumnSidecarBackboneSubnets(
UInt256.fromBytes(nodeId.toBytes()),
nodeId,
currentEpoch,
config.getCustodyRequirement() + extraSubnetCount);
return Optional.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.OptionalInt;
import java.util.function.Consumer;
import java.util.stream.IntStream;

import org.apache.tuweni.units.bigints.UInt256;
import tech.pegasys.teku.infrastructure.exceptions.InvalidConfigurationException;
import tech.pegasys.teku.infrastructure.metrics.SettableLabelledGauge;
import tech.pegasys.teku.infrastructure.ssz.collections.SszBitvector;
Expand Down Expand Up @@ -205,7 +207,7 @@ public SszBitvector getDataColumnSidecarSubnetSubscriptions(final NodeId peerId)
}

public SszBitvector getDataColumnSidecarSubnetSubscriptionsByNodeId(
final NodeId peerId, final int extraSubnetCount) {
final UInt256 peerId, final int extraSubnetCount) {
return nodeIdToDataColumnSidecarSubnetsCalculator
.calculateSubnets(peerId, extraSubnetCount)
.orElse(dataColumnSidecarSubnetSubscriptions.getSubscriptionSchema().getDefault());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
package tech.pegasys.teku.networking.eth2.gossip.subnets;

import java.util.function.IntUnaryOperator;

import org.apache.tuweni.units.bigints.UInt256;
import tech.pegasys.teku.infrastructure.ssz.collections.SszBitvector;
import tech.pegasys.teku.networking.eth2.peers.PeerScorer;
import tech.pegasys.teku.networking.p2p.discovery.DiscoveryPeer;
import tech.pegasys.teku.networking.p2p.libp2p.MultiaddrUtil;
import tech.pegasys.teku.networking.p2p.peer.NodeId;

/** Scores peers higher if they are tracking subnets that are not tracked by other peers. */
Expand Down Expand Up @@ -54,7 +55,7 @@ public int scoreCandidatePeer(DiscoveryPeer candidate) {
candidate.getPersistentAttestationSubnets(),
candidate.getSyncCommitteeSubnets(),
peerSubnetSubscriptions.getDataColumnSidecarSubnetSubscriptionsByNodeId(
MultiaddrUtil.getNodeId(candidate), candidate.getDasExtraCustodySubnetCount()));
UInt256.fromBytes(candidate.getNodeId()), candidate.getDasExtraCustodySubnetCount()));
}

// @Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes32;
import org.apache.tuweni.units.bigints.UInt256;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.ssz.SszData;
import tech.pegasys.teku.infrastructure.ssz.collections.SszBitvector;
Expand Down Expand Up @@ -78,6 +79,7 @@ class DefaultEth2Peer extends DelegatingPeer implements Eth2Peer {
private static final Logger LOG = LogManager.getLogger();

private final Spec spec;
private final UInt256 discoveryNodeId;
private final BeaconChainMethods rpcMethods;
private final StatusMessageFactory statusMessageFactory;
private final MetadataMessagesFactory metadataMessagesFactory;
Expand All @@ -104,6 +106,7 @@ class DefaultEth2Peer extends DelegatingPeer implements Eth2Peer {
DefaultEth2Peer(
final Spec spec,
final Peer peer,
final UInt256 discoveryNodeId,
final BeaconChainMethods rpcMethods,
final StatusMessageFactory statusMessageFactory,
final MetadataMessagesFactory metadataMessagesFactory,
Expand All @@ -115,6 +118,7 @@ class DefaultEth2Peer extends DelegatingPeer implements Eth2Peer {
final KZG kzg) {
super(peer);
this.spec = spec;
this.discoveryNodeId = discoveryNodeId;
this.rpcMethods = rpcMethods;
this.statusMessageFactory = statusMessageFactory;
this.metadataMessagesFactory = metadataMessagesFactory;
Expand Down Expand Up @@ -146,6 +150,11 @@ class DefaultEth2Peer extends DelegatingPeer implements Eth2Peer {
this.maxBlobsPerBlock = Suppliers.memoize(() -> getSpecConfigDeneb().getMaxBlobsPerBlock());
}

@Override
public UInt256 getDiscoveryNodeId() {
return discoveryNodeId;
}

@Override
public void updateStatus(final PeerStatus status) {
peerChainValidator
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package tech.pegasys.teku.networking.eth2.peers;

import org.apache.tuweni.units.bigints.UInt256;
import tech.pegasys.teku.networking.p2p.peer.Peer;

public interface DiscoveryNodeIdExtractor {
UInt256 calculateDiscoveryNodeId(Peer peer);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.List;
import java.util.Optional;
import org.apache.tuweni.bytes.Bytes32;
import org.apache.tuweni.units.bigints.UInt256;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.ssz.SszData;
import tech.pegasys.teku.infrastructure.ssz.collections.SszBitvector;
Expand Down Expand Up @@ -43,6 +44,7 @@ public interface Eth2Peer extends Peer, SyncSource {
static Eth2Peer create(
final Spec spec,
final Peer peer,
final UInt256 discoveryNodeId,
final BeaconChainMethods rpcMethods,
final StatusMessageFactory statusMessageFactory,
final MetadataMessagesFactory metadataMessagesFactory,
Expand All @@ -55,6 +57,7 @@ static Eth2Peer create(
return new DefaultEth2Peer(
spec,
peer,
discoveryNodeId,
rpcMethods,
statusMessageFactory,
metadataMessagesFactory,
Expand Down Expand Up @@ -135,6 +138,8 @@ void adjustDataColumnSidecarsRequest(

int getUnansweredPingCount();

UInt256 getDiscoveryNodeId();

interface PeerStatusSubscriber {
void onPeerStatus(final PeerStatus initialStatus);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package tech.pegasys.teku.networking.eth2.peers;

import java.util.Optional;

import org.hyperledger.besu.plugin.services.MetricsSystem;
import tech.pegasys.teku.infrastructure.time.TimeProvider;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
Expand All @@ -39,6 +40,7 @@ public class Eth2PeerFactory {
private final int peerRateLimit;
private final int peerRequestLimit;
private final KZG kzg;
private final DiscoveryNodeIdExtractor discoveryNodeIdExtractor;

public Eth2PeerFactory(
final Spec spec,
Expand All @@ -50,7 +52,8 @@ public Eth2PeerFactory(
final Optional<Checkpoint> requiredCheckpoint,
final int peerRateLimit,
final int peerRequestLimit,
final KZG kzg) {
final KZG kzg,
final DiscoveryNodeIdExtractor discoveryNodeIdExtractor) {
this.spec = spec;
this.metricsSystem = metricsSystem;
this.chainDataClient = chainDataClient;
Expand All @@ -61,12 +64,14 @@ public Eth2PeerFactory(
this.peerRateLimit = peerRateLimit;
this.peerRequestLimit = peerRequestLimit;
this.kzg = kzg;
this.discoveryNodeIdExtractor = discoveryNodeIdExtractor;
}

public Eth2Peer create(final Peer peer, final BeaconChainMethods rpcMethods) {
return Eth2Peer.create(
spec,
peer,
discoveryNodeIdExtractor.calculateDiscoveryNodeId(peer),
rpcMethods,
statusMessageFactory,
metadataMessagesFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ public static Eth2PeerManager create(
final int peerRateLimit,
final int peerRequestLimit,
final Spec spec,
final KZG kzg) {
final KZG kzg,
final DiscoveryNodeIdExtractor discoveryNodeIdExtractor) {

final MetadataMessagesFactory metadataMessagesFactory = new MetadataMessagesFactory();
attestationSubnetService.subscribeToUpdates(
Expand All @@ -142,7 +143,8 @@ public static Eth2PeerManager create(
requiredCheckpoint,
peerRateLimit,
peerRequestLimit,
kzg),
kzg,
discoveryNodeIdExtractor),
statusMessageFactory,
metadataMessagesFactory,
rpcEncoding,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

public class DiscoveryPeer {
private final Bytes publicKey;
private final Bytes nodeId;
private final InetSocketAddress nodeAddress;
private final Optional<EnrForkId> enrForkId;
private final SszBitvector persistentAttestationSubnets;
Expand All @@ -31,12 +32,14 @@ public class DiscoveryPeer {

public DiscoveryPeer(
final Bytes publicKey,
final Bytes nodeId,
final InetSocketAddress nodeAddress,
final Optional<EnrForkId> enrForkId,
final SszBitvector persistentAttestationSubnets,
final SszBitvector syncCommitteeSubnets,
final Optional<Integer> dasExtraCustodySubnetCount) {
this.publicKey = publicKey;
this.nodeId = nodeId;
this.nodeAddress = nodeAddress;
this.enrForkId = enrForkId;
this.persistentAttestationSubnets = persistentAttestationSubnets;
Expand All @@ -48,6 +51,10 @@ public Bytes getPublicKey() {
return publicKey;
}

public Bytes getNodeId() {
return nodeId;
}

public InetSocketAddress getNodeAddress() {
return nodeAddress;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ public Optional<String> getDiscoveryAddress() {
final DiscoveryPeer discoveryPeer =
new DiscoveryPeer(
(Bytes) nodeRecord.get(EnrField.PKEY_SECP256K1),
nodeRecord.getNodeId(),
nodeRecord.getUdpAddress().get(),
Optional.empty(),
currentSchemaDefinitionsSupplier.getAttnetsENRFieldSchema().getDefault(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.tuweni.bytes.Bytes;
import org.ethereum.beacon.discovery.schema.EnrField;
import org.ethereum.beacon.discovery.schema.NodeRecord;
import org.ethereum.beacon.discovery.schema.NodeRecordBuilder;
import tech.pegasys.teku.infrastructure.ssz.collections.SszBitvector;
import tech.pegasys.teku.infrastructure.ssz.schema.SszPrimitiveSchemas;
import tech.pegasys.teku.infrastructure.ssz.schema.collections.SszBitvectorSchema;
Expand All @@ -36,6 +37,10 @@
public class NodeRecordConverter {
private static final Logger LOG = LogManager.getLogger();

public Bytes convertPublicKeyToNodeId(Bytes publicKey) {
return new NodeRecordBuilder().publicKey(publicKey).build().getNodeId();
}

public Optional<DiscoveryPeer> convertToDiscoveryPeer(
final NodeRecord nodeRecord, final SchemaDefinitions schemaDefinitions) {
return nodeRecord
Expand Down Expand Up @@ -71,6 +76,7 @@ private static DiscoveryPeer socketAddressToDiscoveryPeer(

return new DiscoveryPeer(
((Bytes) nodeRecord.get(EnrField.PKEY_SECP256K1)),
nodeRecord.getNodeId(),
address,
enrForkId,
persistentAttestationSubnets,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import identify.pb.IdentifyOuterClass;
import io.libp2p.core.Connection;
import io.libp2p.core.PeerId;
import io.libp2p.core.crypto.PubKey;
import io.libp2p.protocol.Identify;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -51,6 +52,7 @@ public class LibP2PPeer implements Peer {
private final AtomicBoolean connected = new AtomicBoolean(true);
private final MultiaddrPeerAddress peerAddress;
private final PeerId peerId;
private final PubKey pubKey;
private volatile PeerClientType peerClientType = PeerClientType.UNKNOWN;
private volatile Optional<String> maybeAgentString = Optional.empty();

Expand All @@ -73,6 +75,7 @@ public LibP2PPeer(
this.reputationManager = reputationManager;
this.peerScoreFunction = peerScoreFunction;
this.peerId = connection.secureSession().getRemoteId();
this.pubKey = connection.secureSession().getRemotePubKey();

final NodeId nodeId = new LibP2PNodeId(peerId);
peerAddress = new MultiaddrPeerAddress(nodeId, connection.remoteAddress());
Expand Down Expand Up @@ -110,6 +113,10 @@ public Optional<String> getMaybeAgentString() {
return maybeAgentString;
}

public PubKey getPubKey() {
return pubKey;
}

@Override
public PeerAddress getAddress() {
return peerAddress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private static Multiaddr addPeerId(final Multiaddr addr, final NodeId nodeId) {
return addr.withP2P(PeerId.fromBase58(nodeId.toBase58()));
}

public static NodeId getNodeId(final DiscoveryPeer peer) {
private static NodeId getNodeId(final DiscoveryPeer peer) {
final PubKey pubKey = unmarshalSecp256k1PublicKey(peer.getPublicKey().toArrayUnsafe());
return new LibP2PNodeId(PeerId.fromPubKey(pubKey));
}
Expand Down