Skip to content

Commit

Permalink
Update Ethereum Node Record When We Pass Network Upgrade Blocks (hype…
Browse files Browse the repository at this point in the history
…rledger#1998)


Signed-off-by: Ratan Rai Sur <ratan.r.sur@gmail.com>
  • Loading branch information
RatanRSur authored Mar 16, 2021
1 parent 9a138ad commit 30ef443
Show file tree
Hide file tree
Showing 12 changed files with 169 additions and 52 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# Changelog

## 21.1.3

### Additions and Improvements

### Bug Fixes
* Ethereum Node Records are now dynamically recalculated when we pass network upgrade blocks. This allows for better peering through transitions without needing to restart the node. [\#1998](https://github.com/hyperledger/besu/pull/1998)

### Early Access Features

#### Previously identified known issues

## 21.1.2

### Berlin Network Upgrade
Expand Down
1 change: 1 addition & 0 deletions besu/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ dependencies {
testImplementation project(path: ':crypto', configuration: 'testSupportArtifacts')
testImplementation project(':testutil')

testImplementation 'tech.pegasys.discovery:discovery'
testImplementation 'com.google.auto.service:auto-service'
testImplementation 'com.squareup.okhttp3:okhttp'
testImplementation 'junit:junit'
Expand Down
13 changes: 13 additions & 0 deletions besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,19 @@ public Runner build() {
.build();

final P2PNetwork network = networkRunner.getNetwork();
// ForkId in Ethereum Node Record needs updating when we transition to a new protocol spec
context
.getBlockchain()
.observeBlockAdded(
blockAddedEvent -> {
if (protocolSchedule
.streamMilestoneBlocks()
.anyMatch(
blockNumber ->
blockNumber == blockAddedEvent.getBlock().getHeader().getNumber())) {
network.updateNodeRecord();
}
});
nodePermissioningController.ifPresent(
n ->
n.setInsufficientPeersPermissioningProvider(
Expand Down
83 changes: 74 additions & 9 deletions besu/src/test/java/org/hyperledger/besu/RunnerBuilderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package org.hyperledger.besu;

import static org.assertj.core.api.Assertions.assertThat;
import static org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider.createInMemoryBlockchain;
import static org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueSegmentIdentifier.BLOCKCHAIN;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand All @@ -24,31 +26,45 @@
import org.hyperledger.besu.consensus.common.bft.protocol.BftProtocolManager;
import org.hyperledger.besu.consensus.ibft.protocol.IbftSubProtocol;
import org.hyperledger.besu.controller.BesuController;
import org.hyperledger.besu.crypto.KeyPairSecurityModule;
import org.hyperledger.besu.crypto.NodeKey;
import org.hyperledger.besu.crypto.SECPPublicKey;
import org.hyperledger.besu.crypto.SECP256K1;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.api.graphql.GraphQLConfiguration;
import org.hyperledger.besu.ethereum.api.jsonrpc.JsonRpcConfiguration;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketConfiguration;
import org.hyperledger.besu.ethereum.blockcreation.MiningCoordinator;
import org.hyperledger.besu.ethereum.chain.DefaultBlockchain;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockDataGenerator;
import org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider;
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.core.PrivacyParameters;
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.mainnet.MainnetBlockHeaderFunctions;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.p2p.config.SubProtocolConfiguration;
import org.hyperledger.besu.ethereum.p2p.peers.EnodeURL;
import org.hyperledger.besu.ethereum.storage.StorageProvider;
import org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueStorageProvider;
import org.hyperledger.besu.metrics.ObservableMetricsSystem;
import org.hyperledger.besu.metrics.prometheus.MetricsConfiguration;
import org.hyperledger.besu.nat.NatMethod;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.stream.Stream;

import io.vertx.core.Vertx;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.units.bigints.UInt64;
import org.ethereum.beacon.discovery.schema.NodeRecord;
import org.ethereum.beacon.discovery.schema.NodeRecordFactory;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -71,8 +87,8 @@ public void setup() {
final EthProtocolManager ethProtocolManager = mock(EthProtocolManager.class);
final EthContext ethContext = mock(EthContext.class);
final ProtocolContext protocolContext = mock(ProtocolContext.class);
final NodeKey nodeKey = mock(NodeKey.class);
final SECPPublicKey publicKey = mock(SECPPublicKey.class);
final NodeKey nodeKey =
new NodeKey(new KeyPairSecurityModule(new SECP256K1().generateKeyPair()));

when(subProtocolConfiguration.getProtocolManagers())
.thenReturn(
Expand All @@ -86,13 +102,12 @@ public void setup() {
when(ethProtocolManager.ethContext()).thenReturn(ethContext);
when(subProtocolConfiguration.getSubProtocols())
.thenReturn(Collections.singletonList(new IbftSubProtocol()));
when(protocolContext.getBlockchain()).thenReturn(mock(MutableBlockchain.class));
when(publicKey.getEncodedBytes()).thenReturn(Bytes.of(new byte[64]));
when(nodeKey.getPublicKey()).thenReturn(publicKey);
when(protocolContext.getBlockchain()).thenReturn(mock(DefaultBlockchain.class));

when(besuController.getProtocolManager()).thenReturn(ethProtocolManager);
when(besuController.getSubProtocolConfiguration()).thenReturn(subProtocolConfiguration);
when(besuController.getProtocolContext()).thenReturn(protocolContext);
when(besuController.getProtocolSchedule()).thenReturn(mock(ProtocolSchedule.class));
when(besuController.getNodeKey()).thenReturn(nodeKey);
when(besuController.getMiningParameters()).thenReturn(mock(MiningParameters.class));
when(besuController.getPrivacyParameters()).thenReturn(mock(PrivacyParameters.class));
Expand All @@ -104,11 +119,10 @@ public void setup() {
@Test
public void enodeUrlShouldHaveAdvertisedHostWhenDiscoveryDisabled() {
final String p2pAdvertisedHost = "172.0.0.1";
final int p2pListenPort = 30301;
final int p2pListenPort = 30302;

final Runner runner =
new RunnerBuilder()
.discovery(true)
.p2pListenInterface("0.0.0.0")
.p2pListenPort(p2pListenPort)
.p2pAdvertisedHost(p2pAdvertisedHost)
Expand All @@ -133,8 +147,59 @@ public void enodeUrlShouldHaveAdvertisedHostWhenDiscoveryDisabled() {
.ipAddress(p2pAdvertisedHost)
.discoveryPort(0)
.listeningPort(p2pListenPort)
.nodeId(new byte[64])
.nodeId(besuController.getNodeKey().getPublicKey().getEncoded())
.build();
assertThat(runner.getLocalEnode().orElseThrow()).isEqualTo(expectedEodeURL);
}

@Test
public void movingAcrossProtocolSpecsUpdatesNodeRecord() {
final BlockDataGenerator gen = new BlockDataGenerator();
final String p2pAdvertisedHost = "172.0.0.1";
final int p2pListenPort = 30301;
final StorageProvider storageProvider = new InMemoryKeyValueStorageProvider();
final Block genesisBlock = gen.genesisBlock();
final MutableBlockchain blockchain =
createInMemoryBlockchain(genesisBlock, new MainnetBlockHeaderFunctions());
when(besuController.getProtocolContext().getBlockchain()).thenReturn(blockchain);
final Runner runner =
new RunnerBuilder()
.discovery(true)
.p2pListenInterface("0.0.0.0")
.p2pListenPort(p2pListenPort)
.p2pAdvertisedHost(p2pAdvertisedHost)
.p2pEnabled(true)
.natMethod(NatMethod.NONE)
.besuController(besuController)
.ethNetworkConfig(mock(EthNetworkConfig.class))
.metricsSystem(mock(ObservableMetricsSystem.class))
.jsonRpcConfiguration(mock(JsonRpcConfiguration.class))
.graphQLConfiguration(mock(GraphQLConfiguration.class))
.webSocketConfiguration(mock(WebSocketConfiguration.class))
.metricsConfiguration(mock(MetricsConfiguration.class))
.vertx(Vertx.vertx())
.dataDir(dataDir.getRoot().toPath())
.storageProvider(storageProvider)
.forkIdSupplier(() -> Collections.singletonList(Bytes.EMPTY))
.build();
runner.start();
when(besuController.getProtocolSchedule().streamMilestoneBlocks())
.thenAnswer(__ -> Stream.of(1L, 2L));
for (int i = 0; i < 2; ++i) {
final Block block =
gen.block(
BlockDataGenerator.BlockOptions.create()
.setBlockNumber(1 + i)
.setParentHash(blockchain.getChainHeadHash()));
blockchain.appendBlock(block, gen.receipts(block));
assertThat(
storageProvider
.getStorageBySegmentIdentifier(BLOCKCHAIN)
.get("local-enr-seqno".getBytes(StandardCharsets.UTF_8))
.map(Bytes::of)
.map(NodeRecordFactory.DEFAULT::fromBytes)
.map(NodeRecord::getSeq))
.contains(UInt64.valueOf(2 + i));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public String listMilestones() {
.collect(Collectors.joining(", ", "[", "]"));
}

@Override
public Stream<Long> streamMilestoneBlocks() {
return protocolSpecs.stream()
.sorted(Comparator.comparing(ScheduledProtocolSpec::getBlock))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

import java.math.BigInteger;
import java.util.Optional;
import java.util.stream.Stream;

public interface ProtocolSchedule {

ProtocolSpec getByBlockNumber(long number);

public Stream<Long> streamMilestoneBlocks();

Optional<BigInteger> getChainId();

void setTransactionFilter(TransactionFilter transactionFilter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ public boolean isDiscoveryEnabled() {
public Optional<EnodeURL> getLocalEnode() {
return Optional.empty();
}

@Override
public void updateNodeRecord() {}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public abstract class PeerDiscoveryAgent {

private final StorageProvider storageProvider;
private final Supplier<List<Bytes>> forkIdSupplier;
private String advertisedAddress;

protected PeerDiscoveryAgent(
final NodeKey nodeKey,
Expand All @@ -123,7 +124,7 @@ protected PeerDiscoveryAgent(
this.config = config;
this.nodeKey = nodeKey;

id = nodeKey.getPublicKey().getEncodedBytes();
this.id = nodeKey.getPublicKey().getEncodedBytes();

this.storageProvider = storageProvider;
this.forkIdSupplier = forkIdSupplier;
Expand All @@ -147,8 +148,7 @@ public CompletableFuture<Integer> start(final int tcpPort) {
LOG.info("Starting peer discovery agent on host={}, port={}", host, port);

// override advertised host if we detect an external IP address via NAT manager
final String advertisedAddress =
natService.queryExternalIPAddress(config.getAdvertisedHost());
this.advertisedAddress = natService.queryExternalIPAddress(config.getAdvertisedHost());

return listenForConnections()
.thenApply(
Expand All @@ -166,9 +166,7 @@ public CompletableFuture<Integer> start(final int tcpPort) {
this.localNode = Optional.of(ourNode);
isActive = true;
LOG.info("P2P peer discovery agent started and listening on {}", localAddress);
addLocalNodeRecord(id, advertisedAddress, tcpPort, discoveryPort)
.ifPresent(
nodeRecord -> localNode.ifPresent(peer -> peer.setNodeRecord(nodeRecord)));
ourNode.setNodeRecord(updateNodeRecord());
startController(ourNode);
return discoveryPort;
});
Expand All @@ -178,52 +176,58 @@ public CompletableFuture<Integer> start(final int tcpPort) {
}
}

private Optional<NodeRecord> addLocalNodeRecord(
final Bytes nodeId,
final String advertisedAddress,
final Integer tcpPort,
final Integer udpPort) {
public NodeRecord updateNodeRecord() {
final KeyValueStorage keyValueStorage =
storageProvider.getStorageBySegmentIdentifier(KeyValueSegmentIdentifier.BLOCKCHAIN);
final NodeRecordFactory nodeRecordFactory = NodeRecordFactory.DEFAULT;
final Optional<NodeRecord> existingNodeRecord =
keyValueStorage
.get(Bytes.of(SEQ_NO_STORE_KEY.getBytes(UTF_8)).toArray())
.map(Bytes::of)
.flatMap(b -> Optional.of(nodeRecordFactory.fromBytes(b)));
.map(nodeRecordFactory::fromBytes);

final Bytes addressBytes = Bytes.of(InetAddresses.forString(advertisedAddress).getAddress());
if (existingNodeRecord.isEmpty()
|| !existingNodeRecord.get().get(EnrField.PKEY_SECP256K1).equals(nodeId)
|| !addressBytes.equals(existingNodeRecord.get().get(EnrField.IP_V4))
|| !tcpPort.equals(existingNodeRecord.get().get(EnrField.TCP))
|| !udpPort.equals(existingNodeRecord.get().get(EnrField.UDP))) {
final UInt64 sequenceNumber =
existingNodeRecord.map(NodeRecord::getSeq).orElse(UInt64.ZERO).add(1);
final NodeRecord nodeRecord =
nodeRecordFactory.createFromValues(
sequenceNumber,
new EnrField(EnrField.ID, IdentitySchema.V4),
new EnrField(EnrField.PKEY_SECP256K1, Functions.compressPublicKey(nodeId)),
new EnrField(EnrField.IP_V4, addressBytes),
new EnrField(EnrField.TCP, tcpPort),
new EnrField(EnrField.UDP, udpPort),
new EnrField("eth", Collections.singletonList(forkIdSupplier.get())));
nodeRecord.setSignature(
nodeKey
.sign(Hash.keccak256(nodeRecord.serializeNoSignature()))
.encodedBytes()
.slice(0, 64));

final KeyValueStorageTransaction keyValueStorageTransaction =
keyValueStorage.startTransaction();
keyValueStorageTransaction.put(
Bytes.wrap(SEQ_NO_STORE_KEY.getBytes(UTF_8)).toArray(), nodeRecord.serialize().toArray());
keyValueStorageTransaction.commit();
return Optional.of(nodeRecord);
}

return existingNodeRecord;
final Optional<EnodeURL> maybeEnodeURL = localNode.map(DiscoveryPeer::getEnodeURL);
final Integer discoveryPort = maybeEnodeURL.flatMap(EnodeURL::getDiscoveryPort).orElse(0);
final Integer listeningPort = maybeEnodeURL.flatMap(EnodeURL::getListeningPort).orElse(0);
final String forkIdEnrField = "eth";
return existingNodeRecord
.filter(
nodeRecord ->
id.equals(nodeRecord.get(EnrField.PKEY_SECP256K1))
&& addressBytes.equals(nodeRecord.get(EnrField.IP_V4))
&& discoveryPort.equals(nodeRecord.get(EnrField.UDP))
&& listeningPort.equals(nodeRecord.get(EnrField.TCP))
&& forkIdSupplier.get().equals(nodeRecord.get(forkIdEnrField)))
.orElseGet(
() -> {
final UInt64 sequenceNumber =
existingNodeRecord.map(NodeRecord::getSeq).orElse(UInt64.ZERO).add(1);
final NodeRecord nodeRecord =
nodeRecordFactory.createFromValues(
sequenceNumber,
new EnrField(EnrField.ID, IdentitySchema.V4),
new EnrField(EnrField.PKEY_SECP256K1, Functions.compressPublicKey(id)),
new EnrField(EnrField.IP_V4, addressBytes),
new EnrField(EnrField.TCP, listeningPort),
new EnrField(EnrField.UDP, discoveryPort),
new EnrField(
forkIdEnrField, Collections.singletonList(forkIdSupplier.get())));
nodeRecord.setSignature(
nodeKey
.sign(Hash.keccak256(nodeRecord.serializeNoSignature()))
.encodedBytes()
.slice(0, 64));

LOG.info("Writing node record to disk. {}", nodeRecord);
final KeyValueStorageTransaction keyValueStorageTransaction =
keyValueStorage.startTransaction();
keyValueStorageTransaction.put(
Bytes.wrap(SEQ_NO_STORE_KEY.getBytes(UTF_8)).toArray(),
nodeRecord.serialize().toArray());
keyValueStorageTransaction.commit();
return nodeRecord;
});
}

public void addPeerRequirement(final PeerRequirement peerRequirement) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,11 @@ private void setLocalNode(
localNode.setEnode(localEnode);
}

@Override
public void updateNodeRecord() {
peerDiscoveryAgent.updateNodeRecord();
}

public static class Builder {

private Vertx vertx;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ public Optional<EnodeURL> getLocalEnode() {
return Optional.empty();
}

@Override
public void updateNodeRecord() {}

@Override
public void close() throws IOException {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,6 @@ default int getPeerCount() {
* otherwise.
*/
Optional<EnodeURL> getLocalEnode();

void updateNodeRecord();
}
Loading

0 comments on commit 30ef443

Please sign in to comment.