Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
https://eips.ethereum.org/EIPS/eip-2481

Signed-off-by: Ratan Rai Sur <ratan.r.sur@gmail.com>
  • Loading branch information
RatanRSur authored Aug 25, 2021
1 parent 68f222b commit f888983
Show file tree
Hide file tree
Showing 28 changed files with 1,056 additions and 234 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- A native library was added for the alternative signature algorithm secp256r1, which will be used by default [#2630](https://github.com/hyperledger/besu/pull/2630)
- The command line option --Xsecp-native-enabled was added as an alias for --Xsecp256k1-native-enabled [#2630](https://github.com/hyperledger/besu/pull/2630)
- Added Labelled gauges for metrics [#2646](https://github.com/hyperledger/besu/pull/2646)
- support for `eth/66` networking protocol [#2365](https://github.com/hyperledger/besu/pull/2365)

### Bug Fixes
- Consider effective price and effective priority fee in transaction replacement rules [\#2529](https://github.com/hyperledger/besu/issues/2529)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,33 @@ public Transaction(
v);
}

public Transaction(
final long nonce,
final Wei gasPrice,
final long gasLimit,
final Address to,
final Wei value,
final SECPSignature signature,
final Bytes payload,
final Optional<BigInteger> chainId,
final Optional<BigInteger> v) {
this(
TransactionType.FRONTIER,
nonce,
Optional.of(gasPrice),
Optional.empty(),
Optional.empty(),
gasLimit,
Optional.of(to),
value,
signature,
payload,
Optional.empty(),
null,
chainId,
v);
}

/**
* Instantiates a transaction instance.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public TransactionReceipt(
revertReason);
}

private TransactionReceipt(
public TransactionReceipt(
final TransactionType transactionType,
final int status,
final long cumulativeGasUsed,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@
*/
package org.hyperledger.besu.ethereum.eth;

import static java.util.stream.Collectors.toUnmodifiableList;

import org.hyperledger.besu.ethereum.eth.messages.EthPV62;
import org.hyperledger.besu.ethereum.eth.messages.EthPV63;
import org.hyperledger.besu.ethereum.eth.messages.EthPV65;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.Capability;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.stream.Stream;

/**
* Eth protocol messages as defined in
Expand All @@ -34,11 +36,12 @@ public class EthProtocol implements SubProtocol {
public static final Capability ETH63 = Capability.create(NAME, EthVersion.V63);
public static final Capability ETH64 = Capability.create(NAME, EthVersion.V64);
public static final Capability ETH65 = Capability.create(NAME, EthVersion.V65);
public static final Capability ETH66 = Capability.create(NAME, EthVersion.V66);

private static final EthProtocol INSTANCE = new EthProtocol();

private static final List<Integer> eth62Messages =
Arrays.asList(
List.of(
EthPV62.STATUS,
EthPV62.NEW_BLOCK_HASHES,
EthPV62.TRANSACTIONS,
Expand All @@ -48,22 +51,35 @@ public class EthProtocol implements SubProtocol {
EthPV62.BLOCK_BODIES,
EthPV62.NEW_BLOCK);

private static final List<Integer> eth63Messages = new ArrayList<>(eth62Messages);

static {
eth63Messages.addAll(
Arrays.asList(
EthPV63.GET_NODE_DATA, EthPV63.NODE_DATA, EthPV63.GET_RECEIPTS, EthPV63.RECEIPTS));
}
private static final List<Integer> eth63Messages =
Stream.concat(
eth62Messages.stream(),
Stream.of(
EthPV63.GET_NODE_DATA, EthPV63.NODE_DATA, EthPV63.GET_RECEIPTS, EthPV63.RECEIPTS))
.collect(toUnmodifiableList());

private static final List<Integer> eth65Messages = new ArrayList<>(eth63Messages);
private static final List<Integer> eth65Messages =
Stream.concat(
eth63Messages.stream(),
Stream.of(
EthPV65.NEW_POOLED_TRANSACTION_HASHES,
EthPV65.GET_POOLED_TRANSACTIONS,
EthPV65.POOLED_TRANSACTIONS))
.collect(toUnmodifiableList());

static {
eth65Messages.addAll(
Arrays.asList(
EthPV65.NEW_POOLED_TRANSACTION_HASHES,
public static boolean requestIdCompatible(final int code) {
return Set.of(
EthPV62.GET_BLOCK_HEADERS,
EthPV62.BLOCK_HEADERS,
EthPV62.GET_BLOCK_BODIES,
EthPV62.BLOCK_BODIES,
EthPV65.GET_POOLED_TRANSACTIONS,
EthPV65.POOLED_TRANSACTIONS));
EthPV65.POOLED_TRANSACTIONS,
EthPV63.GET_NODE_DATA,
EthPV63.NODE_DATA,
EthPV63.GET_RECEIPTS,
EthPV63.RECEIPTS)
.contains(code);
}

@Override
Expand All @@ -79,6 +95,7 @@ public int messageSpace(final int protocolVersion) {
case EthVersion.V63:
case EthVersion.V64:
case EthVersion.V65:
case EthVersion.V66:
// same number of messages in each range, eth65 defines messages in the middle of the
// range defined by eth63 and eth64 defines no new ranges.
return 17;
Expand All @@ -96,6 +113,7 @@ public boolean isValidMessageCode(final int protocolVersion, final int code) {
case EthVersion.V64:
return eth63Messages.contains(code);
case EthVersion.V65:
case EthVersion.V66:
return eth65Messages.contains(code);
default:
return false;
Expand Down Expand Up @@ -149,5 +167,6 @@ public static class EthVersion {
public static final int V63 = 63;
public static final int V64 = 64;
public static final int V65 = 65;
public static final int V66 = 66;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,53 +14,31 @@
*/
package org.hyperledger.besu.ethereum.eth.manager;

import org.hyperledger.besu.ethereum.p2p.rlpx.connections.PeerConnection;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage;
import org.hyperledger.besu.ethereum.rlp.RLPException;
import org.hyperledger.besu.util.Subscribers;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class EthMessages {
private static final Logger LOG = LogManager.getLogger();

private final Map<Integer, Subscribers<MessageCallback>> listenersByCode =
new ConcurrentHashMap<>();
private final Map<Integer, MessageResponseConstructor> messageResponseConstructorsByCode =
new ConcurrentHashMap<>();

void dispatch(final EthMessage message) {
final int code = message.getData().getCode();
Optional<MessageData> dispatch(final EthMessage ethMessage) {
final int code = ethMessage.getData().getCode();

// trigger arbitrary side-effecting listeners
Optional.ofNullable(listenersByCode.get(code))
.ifPresent(
listeners -> listeners.forEach(messageCallback -> messageCallback.exec(message)));

try {
Optional.ofNullable(messageResponseConstructorsByCode.get(code))
.map(messageResponseConstructor -> messageResponseConstructor.response(message.getData()))
.ifPresent(
messageData -> {
try {
message.getPeer().send(messageData);
} catch (final PeerConnection.PeerNotConnected __) {
// Peer disconnected before we could respond - nothing to do
}
});
listeners -> listeners.forEach(messageCallback -> messageCallback.exec(ethMessage)));

} catch (final RLPException e) {
LOG.debug(
"Received malformed message {} , disconnecting: {}",
message.getData().getData(),
message.getPeer(),
e);
message.getPeer().disconnect(DisconnectMessage.DisconnectReason.BREACH_OF_PROTOCOL);
}
return Optional.ofNullable(messageResponseConstructorsByCode.get(code))
.map(
messageResponseConstructor ->
messageResponseConstructor.response(ethMessage.getData()));
}

public void subscribe(final int messageCode, final MessageCallback callback) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.core.Hash;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.messages.EthPV62;
import org.hyperledger.besu.ethereum.eth.messages.EthPV63;
import org.hyperledger.besu.ethereum.eth.messages.EthPV65;
Expand Down Expand Up @@ -60,22 +61,30 @@ public class EthPeer {

private final int maxTrackedSeenBlocks = 300;

private final Set<Hash> knownBlocks;
private final Set<Hash> knownBlocks =
Collections.newSetFromMap(
Collections.synchronizedMap(
new LinkedHashMap<>(16, 0.75f, true) {
@Override
protected boolean removeEldestEntry(final Map.Entry<Hash, Boolean> eldest) {
return size() > maxTrackedSeenBlocks;
}
}));
private final String protocolName;
private final Clock clock;
private final List<NodeMessagePermissioningProvider> permissioningProviders;
private final ChainState chainHeadState;
private final ChainState chainHeadState = new ChainState();
private final AtomicBoolean statusHasBeenSentToPeer = new AtomicBoolean(false);
private final AtomicBoolean statusHasBeenReceivedFromPeer = new AtomicBoolean(false);
private final AtomicBoolean fullyValidated = new AtomicBoolean(false);
private final AtomicInteger lastProtocolVersion = new AtomicInteger(0);

private volatile long lastRequestTimestamp = 0;
private final RequestManager headersRequestManager = new RequestManager(this);
private final RequestManager bodiesRequestManager = new RequestManager(this);
private final RequestManager receiptsRequestManager = new RequestManager(this);
private final RequestManager nodeDataRequestManager = new RequestManager(this);
private final RequestManager pooledTransactionsRequestManager = new RequestManager(this);
private final RequestManager headersRequestManager;
private final RequestManager bodiesRequestManager;
private final RequestManager receiptsRequestManager;
private final RequestManager nodeDataRequestManager;
private final RequestManager pooledTransactionsRequestManager;

private final AtomicReference<Consumer<EthPeer>> onStatusesExchanged = new AtomicReference<>();
private final PeerReputation reputation = new PeerReputation();
Expand All @@ -93,21 +102,18 @@ public EthPeer(
this.protocolName = protocolName;
this.clock = clock;
this.permissioningProviders = permissioningProviders;
knownBlocks =
Collections.newSetFromMap(
Collections.synchronizedMap(
new LinkedHashMap<>(16, 0.75f, true) {
@Override
protected boolean removeEldestEntry(final Map.Entry<Hash, Boolean> eldest) {
return size() > maxTrackedSeenBlocks;
}
}));
this.chainHeadState = new ChainState();
this.onStatusesExchanged.set(onStatusesExchanged);
for (final PeerValidator peerValidator : peerValidators) {
validationStatus.put(peerValidator, false);
}
peerValidators.forEach(peerValidator -> validationStatus.put(peerValidator, false));
fullyValidated.set(peerValidators.isEmpty());

final boolean supportsRequestId =
getAgreedCapabilities().stream()
.anyMatch(capability -> capability.compareTo(EthProtocol.ETH66) >= 0);
this.headersRequestManager = new RequestManager(this, supportsRequestId);
this.bodiesRequestManager = new RequestManager(this, supportsRequestId);
this.receiptsRequestManager = new RequestManager(this, supportsRequestId);
this.nodeDataRequestManager = new RequestManager(this, supportsRequestId);
this.pooledTransactionsRequestManager = new RequestManager(this, supportsRequestId);
}

public void markValidated(final PeerValidator validator) {
Expand Down Expand Up @@ -204,13 +210,6 @@ public RequestManager.ResponseStream getHeadersByNumber(
return sendRequest(headersRequestManager, message);
}

private RequestManager.ResponseStream sendRequest(
final RequestManager requestManager, final MessageData messageData) throws PeerNotConnected {
lastRequestTimestamp = clock.millis();
return requestManager.dispatchRequest(
() -> connection.sendForProtocol(protocolName, messageData));
}

public RequestManager.ResponseStream getBodies(final List<Hash> blockHashes)
throws PeerNotConnected {
final GetBlockBodiesMessage message = GetBlockBodiesMessage.create(blockHashes);
Expand All @@ -235,6 +234,13 @@ public RequestManager.ResponseStream getPooledTransactions(final List<Hash> hash
return sendRequest(pooledTransactionsRequestManager, message);
}

private RequestManager.ResponseStream sendRequest(
final RequestManager requestManager, final MessageData messageData) throws PeerNotConnected {
lastRequestTimestamp = clock.millis();
return requestManager.dispatchRequest(
msgData -> connection.sendForProtocol(protocolName, msgData), messageData);
}

boolean validateReceivedMessage(final EthMessage message) {
checkArgument(message.getPeer().equals(this), "Mismatched message sent to peer for dispatch");
switch (message.getData().getCode()) {
Expand Down Expand Up @@ -277,30 +283,28 @@ boolean validateReceivedMessage(final EthMessage message) {
/**
* Routes messages originating from this peer to listeners.
*
* @param message the message to dispatch
* @param ethMessage the Eth message to dispatch
*/
void dispatch(final EthMessage message) {
checkArgument(message.getPeer().equals(this), "Mismatched message sent to peer for dispatch");
switch (message.getData().getCode()) {
void dispatch(final EthMessage ethMessage) {
checkArgument(
ethMessage.getPeer().equals(this), "Mismatched Eth message sent to peer for dispatch");
final int messageCode = ethMessage.getData().getCode();
reputation.resetTimeoutCount(messageCode);
switch (messageCode) {
case EthPV62.BLOCK_HEADERS:
reputation.resetTimeoutCount(EthPV62.GET_BLOCK_HEADERS);
headersRequestManager.dispatchResponse(message);
headersRequestManager.dispatchResponse(ethMessage);
break;
case EthPV62.BLOCK_BODIES:
reputation.resetTimeoutCount(EthPV62.GET_BLOCK_BODIES);
bodiesRequestManager.dispatchResponse(message);
bodiesRequestManager.dispatchResponse(ethMessage);
break;
case EthPV63.RECEIPTS:
reputation.resetTimeoutCount(EthPV63.GET_RECEIPTS);
receiptsRequestManager.dispatchResponse(message);
receiptsRequestManager.dispatchResponse(ethMessage);
break;
case EthPV63.NODE_DATA:
reputation.resetTimeoutCount(EthPV63.GET_NODE_DATA);
nodeDataRequestManager.dispatchResponse(message);
nodeDataRequestManager.dispatchResponse(ethMessage);
break;
case EthPV65.POOLED_TRANSACTIONS:
reputation.resetTimeoutCount(EthPV65.GET_POOLED_TRANSACTIONS);
pooledTransactionsRequestManager.dispatchResponse(message);
pooledTransactionsRequestManager.dispatchResponse(ethMessage);
break;
default:
// Nothing to do
Expand Down
Loading

0 comments on commit f888983

Please sign in to comment.