Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pan 3238] Fix rlpx startup #114

Merged
merged 3 commits into from
Oct 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,19 @@ public CompletableFuture<Integer> start() {
}

setupListeners();
return connectionInitializer.start();
return connectionInitializer
.start()
.thenApply(
(socketAddress) -> {
LOG.info("P2P RLPx agent started and listening on {}.", socketAddress);
return socketAddress.getPort();
})
.whenComplete(
(res, err) -> {
if (err != null) {
LOG.error("Failed to start P2P RLPx agent.", err);
}
});
}

public CompletableFuture<Void> stop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.hyperledger.besu.ethereum.p2p.peers.Peer;
import org.hyperledger.besu.ethereum.p2p.rlpx.ConnectCallback;

import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;

public interface ConnectionInitializer {
Expand All @@ -25,9 +26,9 @@ public interface ConnectionInitializer {
* Start the connection initializer. Begins listening for incoming connections. Start allowing
* outbound connections.
*
* @return The port on which we're listening for incoming connections.
* @return The address on which we're listening for incoming connections.
*/
CompletableFuture<Integer> start();
CompletableFuture<InetSocketAddress> start();

/**
* Shutdown the connection initializer. Stop listening for incoming connections and stop
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
*/
package org.hyperledger.besu.ethereum.p2p.rlpx.connections.netty;

import static com.google.common.base.Preconditions.checkState;

import org.hyperledger.besu.crypto.SECP256K1.KeyPair;
import org.hyperledger.besu.ethereum.p2p.config.RlpxConfiguration;
import org.hyperledger.besu.ethereum.p2p.discovery.DiscoveryPeer;
Expand Down Expand Up @@ -48,12 +46,9 @@
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.SingleThreadEventExecutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class NettyConnectionInitializer implements ConnectionInitializer {

private static final Logger LOG = LogManager.getLogger();
private static final int TIMEOUT_SECONDS = 10;

private final KeyPair keyPair;
Expand Down Expand Up @@ -95,8 +90,8 @@ public NettyConnectionInitializer(
}

@Override
public CompletableFuture<Integer> start() {
final CompletableFuture<Integer> listeningPortFuture = new CompletableFuture<>();
public CompletableFuture<InetSocketAddress> start() {
final CompletableFuture<InetSocketAddress> listeningPortFuture = new CompletableFuture<>();
if (!started.compareAndSet(false, true)) {
listeningPortFuture.completeExceptionally(
new IllegalStateException(
Expand All @@ -114,19 +109,17 @@ public CompletableFuture<Integer> start() {
future -> {
final InetSocketAddress socketAddress =
(InetSocketAddress) server.channel().localAddress();
final String message =
String.format(
"Unable start up P2P network on %s:%s. Check for port conflicts.",
config.getBindHost(), config.getBindPort());

if (!future.isSuccess()) {
LOG.error(message, future.cause());
if (!future.isSuccess() || socketAddress == null) {
final String message =
String.format(
"Unable start listening on %s:%s. Check for port conflicts.",
config.getBindHost(), config.getBindPort());
listeningPortFuture.completeExceptionally(
new IllegalStateException(message, future.cause()));
return;
}
checkState(socketAddress != null, message);

LOG.info("P2P network started and listening on {}", socketAddress);
final int listeningPort = socketAddress.getPort();
listeningPortFuture.complete(listeningPort);
listeningPortFuture.complete(socketAddress);
});

return listeningPortFuture;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.hyperledger.besu.ethereum.p2p.rlpx.ConnectCallback;
import org.hyperledger.besu.util.Subscribers;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -52,8 +53,10 @@ public void simulateIncomingConnection(final PeerConnection incomingConnection)
}

@Override
public CompletableFuture<Integer> start() {
return CompletableFuture.completedFuture(NEXT_PORT.incrementAndGet());
public CompletableFuture<InetSocketAddress> start() {
InetSocketAddress socketAddress =
new InetSocketAddress("127.0.0.1", NEXT_PORT.incrementAndGet());
return CompletableFuture.completedFuture(socketAddress);
}

@Override
Expand Down