Skip to content

Commit

Permalink
Fix p2p PeerInfo handling (PegaSysEng#1428)
Browse files Browse the repository at this point in the history
  • Loading branch information
mbaxter authored May 10, 2019
1 parent 0afb17c commit 141b9c5
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,56 +135,44 @@ public class DefaultP2PNetwork implements P2PNetwork {
private static final Logger LOG = LogManager.getLogger();
private static final int TIMEOUT_SECONDS = 30;

private ChannelFuture server;
private final EventLoopGroup boss = new NioEventLoopGroup(1);
private final EventLoopGroup workers = new NioEventLoopGroup(1);
private final ScheduledExecutorService peerConnectionScheduler =
Executors.newSingleThreadScheduledExecutor();

final Map<Capability, Subscribers<Consumer<Message>>> protocolCallbacks =
new ConcurrentHashMap<>();

private final Subscribers<Consumer<PeerConnection>> connectCallbacks = new Subscribers<>();

private final Subscribers<DisconnectCallback> disconnectCallbacks = new Subscribers<>();

private final Callbacks callbacks = new Callbacks(protocolCallbacks, disconnectCallbacks);

private final PeerDiscoveryAgent peerDiscoveryAgent;
private final PeerBlacklist peerBlacklist;

private final NetworkingConfiguration config;
private final List<Capability> supportedCapabilities;
private OptionalLong peerBondedObserverId = OptionalLong.empty();
private OptionalLong peerDroppedObserverId = OptionalLong.empty();

@VisibleForTesting public final Collection<Peer> peerMaintainConnectionList;

@VisibleForTesting public final PeerConnectionRegistry connections;

@VisibleForTesting
public final Map<Peer, CompletableFuture<PeerConnection>> pendingConnections =
new ConcurrentHashMap<>();

private final EventLoopGroup boss = new NioEventLoopGroup(1);

private final EventLoopGroup workers = new NioEventLoopGroup(1);

private volatile PeerInfo ourPeerInfo;

private final SECP256K1.KeyPair keyPair;

private ChannelFuture server;

private final int maxPeers;

private final List<SubProtocol> subProtocols;

private final LabelledMetric<Counter> outboundMessagesCounter;

private final String advertisedHost;

private final SECP256K1.KeyPair keyPair;
private final BytesValue nodeId;
private volatile OptionalInt listeningPort = OptionalInt.empty();
private volatile Optional<EnodeURL> localEnode = Optional.empty();
private volatile Optional<PeerInfo> ourPeerInfo = Optional.empty();

private final PeerBlacklist peerBlacklist;
private final Optional<NodePermissioningController> nodePermissioningController;
private final Optional<Blockchain> blockchain;

@VisibleForTesting final Collection<Peer> peerMaintainConnectionList;
@VisibleForTesting final PeerConnectionRegistry connections;

@VisibleForTesting
final Map<Peer, CompletableFuture<PeerConnection>> pendingConnections = new ConcurrentHashMap<>();

final Map<Capability, Subscribers<Consumer<Message>>> protocolCallbacks =
new ConcurrentHashMap<>();
private final Subscribers<Consumer<PeerConnection>> connectCallbacks = new Subscribers<>();
private final Subscribers<DisconnectCallback> disconnectCallbacks = new Subscribers<>();
private final Callbacks callbacks = new Callbacks(protocolCallbacks, disconnectCallbacks);

private final LabelledMetric<Counter> outboundMessagesCounter;
private OptionalLong blockAddedObserverId = OptionalLong.empty();
private OptionalLong peerBondedObserverId = OptionalLong.empty();
private OptionalLong peerDroppedObserverId = OptionalLong.empty();

private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean stopped = new AtomicBoolean(false);
Expand Down Expand Up @@ -225,8 +213,8 @@ public class DefaultP2PNetwork implements P2PNetwork {
this.peerMaintainConnectionList = new HashSet<>();
this.connections = new PeerConnectionRegistry(metricsSystem);

this.nodeId = this.keyPair.getPublicKey().getEncodedBytes();
this.subProtocols = config.getSupportedProtocols();
this.advertisedHost = config.getDiscovery().getAdvertisedHost();
this.maxPeers = config.getRlpx().getMaxPeers();

peerDiscoveryAgent.addPeerRequirement(() -> connections.size() >= maxPeers);
Expand Down Expand Up @@ -271,8 +259,12 @@ private Supplier<Integer> pendingTaskCounter(final EventLoopGroup eventLoopGroup
.sum();
}

/** Start listening for incoming connections */
private void startListening() {
/**
* Start listening for incoming connections.
*
* @return The port on which we're listening for incoming connections.
*/
private int startListening() {
server =
new ServerBootstrap()
.group(boss, workers)
Expand All @@ -293,13 +285,15 @@ private void startListening() {
LOG.error(message, future.cause());
}
checkState(socketAddress != null, message);
listeningPort = OptionalInt.of(socketAddress.getPort());
ourPeerInfo =
new PeerInfo(
5,
config.getClientId(),
supportedCapabilities,
socketAddress.getPort(),
this.keyPair.getPublicKey().getEncodedBytes());
Optional.of(
new PeerInfo(
5,
config.getClientId(),
supportedCapabilities,
listeningPort.getAsInt(),
nodeId));
LOG.info("P2PNetwork started and listening on {}", socketAddress);
latch.countDown();
});
Expand All @@ -309,6 +303,7 @@ private void startListening() {
if (!latch.await(1, TimeUnit.MINUTES)) {
throw new RuntimeException("Timed out while waiting for network startup");
}
return listeningPort.getAsInt();
} catch (final InterruptedException e) {
throw new RuntimeException("Interrupted before startup completed", e);
}
Expand All @@ -332,7 +327,7 @@ protected void initChannel(final SocketChannel ch) {
new HandshakeHandlerInbound(
keyPair,
subProtocols,
ourPeerInfo,
ourPeerInfo.get(),
connectionFuture,
callbacks,
connections,
Expand Down Expand Up @@ -437,8 +432,14 @@ public Stream<DiscoveryPeer> streamDiscoveredPeers() {

@Override
public CompletableFuture<PeerConnection> connect(final Peer peer) {
LOG.trace("Initiating connection to peer: {}", peer.getId());
final CompletableFuture<PeerConnection> connectionFuture = new CompletableFuture<>();
if (!localEnode.isPresent()) {
connectionFuture.completeExceptionally(
new IllegalStateException("Attempt to connect to peer before network is ready"));
return connectionFuture;
}

LOG.trace("Initiating connection to peer: {}", peer.getId());
final EnodeURL enode = peer.getEnodeURL();
final CompletableFuture<PeerConnection> existingPendingConnection =
pendingConnections.putIfAbsent(peer, connectionFuture);
Expand Down Expand Up @@ -471,7 +472,7 @@ protected void initChannel(final SocketChannel ch) {
keyPair,
peer,
subProtocols,
ourPeerInfo,
ourPeerInfo.get(),
connectionFuture,
callbacks,
connections,
Expand Down Expand Up @@ -528,8 +529,8 @@ public void start() {
LOG.warn("Attempted to start an already started " + getClass().getSimpleName());
}

startListening();
peerDiscoveryAgent.start(ourPeerInfo.getPort()).join();
final int listeningPort = startListening();
peerDiscoveryAgent.start(listeningPort).join();
peerBondedObserverId =
OptionalLong.of(peerDiscoveryAgent.observePeerBondedEvents(handlePeerBondedEvent()));
peerDroppedObserverId =
Expand Down Expand Up @@ -609,22 +610,22 @@ private boolean isPeerAllowed(final Peer peer) {
}

private boolean isPeerAllowed(final EnodeURL enode) {
final Optional<EnodeURL> maybeEnode = getLocalEnode();
if (!maybeEnode.isPresent()) {
// If the network isn't ready yet, deny connections
return false;
}
final EnodeURL localEnode = maybeEnode.get();

if (peerBlacklist.contains(enode.getNodeId())) {
return false;
}
if (enode.getNodeId().equals(ourPeerInfo.getNodeId())) {
if (enode.getNodeId().equals(nodeId)) {
// Peer matches our node id
return false;
}

Optional<EnodeURL> maybeEnode = getLocalEnode();
if (!maybeEnode.isPresent()) {
// If local enode isn't yet available we can't evaluate permissions
return false;
}
return nodePermissioningController
.map(c -> c.isPermitted(maybeEnode.get(), enode))
.orElse(true);
return nodePermissioningController.map(c -> c.isPermitted(localEnode, enode)).orElse(true);
}

@VisibleForTesting
Expand Down Expand Up @@ -701,12 +702,10 @@ public Optional<EnodeURL> getLocalEnode() {
}

private void createLocalEnode() {
if (localEnode.isPresent()) {
if (localEnode.isPresent() || !listeningPort.isPresent()) {
return;
}

final BytesValue nodeId = ourPeerInfo.getNodeId();
final int listeningPort = ourPeerInfo.getPort();
final OptionalInt discoveryPort =
peerDiscoveryAgent
.getAdvertisedPeer()
Expand All @@ -718,8 +717,8 @@ private void createLocalEnode() {
final EnodeURL localEnode =
EnodeURL.builder()
.nodeId(nodeId)
.ipAddress(advertisedHost)
.listeningPort(listeningPort)
.ipAddress(config.getDiscovery().getAdvertisedHost())
.listeningPort(listeningPort.getAsInt())
.discoveryPort(discoveryPort)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package tech.pegasys.pantheon.ethereum.p2p.network;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Java6Assertions.assertThatThrownBy;
import static org.assertj.core.api.Java6Assertions.catchThrowable;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
Expand Down Expand Up @@ -112,6 +113,17 @@ public void addingMaintainedNetworkPeerStartsConnection() {
verify(network, times(1)).connect(peer);
}

@Test
public void addMaintainConnectionPeer_beforeStartingNetwork() {
final DefaultP2PNetwork network = mockNetwork();
final Peer peer = mockPeer();

assertThat(network.addMaintainConnectionPeer(peer)).isTrue();

assertThat(network.peerMaintainConnectionList).contains(peer);
verify(network, never()).connect(peer);
}

@Test
public void addingRepeatMaintainedPeersReturnsFalse() {
final P2PNetwork network = network();
Expand Down Expand Up @@ -194,6 +206,7 @@ public void shouldSendClientQuittingWhenNetworkStops() {
@Test
public void shouldntAttemptNewConnectionToPendingPeer() {
final P2PNetwork network = network();
network.start();
final Peer peer = mockPeer();

final CompletableFuture<PeerConnection> connectingFuture = network.connect(peer);
Expand Down Expand Up @@ -477,6 +490,18 @@ public void attemptPeerConnections_withNoSlotsAvailable() {
verify(network, times(0)).connect(any());
}

@Test
public void connect_beforeStartingNetwork() {
final DefaultP2PNetwork network = network();
final Peer peer = mockPeer();

final CompletableFuture<PeerConnection> connectionResult = network.connect(peer);
assertThat(connectionResult).isCompletedExceptionally();
assertThatThrownBy(connectionResult::get)
.hasCauseInstanceOf(IllegalStateException.class)
.hasMessageContaining("Attempt to connect to peer before network is ready");
}

private DiscoveryPeer createDiscoveryPeer() {
return createDiscoveryPeer(Peer.randomId(), 999);
}
Expand Down

0 comments on commit 141b9c5

Please sign in to comment.