From 8777ab1af7361edfca4c80c19a28f65e6e043019 Mon Sep 17 00:00:00 2001 From: Danno Ferrin Date: Wed, 27 May 2020 22:20:44 -0600 Subject: [PATCH] fine, we'll make our own timers Signed-off-by: Danno Ferrin --- .../discovery/VertxPeerDiscoveryAgent.java | 15 ++-- .../internal/IndirectVertxTimerUtil.java | 76 +++++++++++++++++++ gradle/versions.gradle | 10 +-- 3 files changed, 88 insertions(+), 13 deletions(-) create mode 100644 ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/IndirectVertxTimerUtil.java diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/VertxPeerDiscoveryAgent.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/VertxPeerDiscoveryAgent.java index 9753a90cf8c..7733b9959e8 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/VertxPeerDiscoveryAgent.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/VertxPeerDiscoveryAgent.java @@ -18,11 +18,11 @@ import org.hyperledger.besu.crypto.NodeKey; import org.hyperledger.besu.ethereum.p2p.config.DiscoveryConfiguration; +import org.hyperledger.besu.ethereum.p2p.discovery.internal.IndirectVertxTimerUtil; import org.hyperledger.besu.ethereum.p2p.discovery.internal.Packet; import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerDiscoveryController; import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerDiscoveryController.AsyncExecutor; import org.hyperledger.besu.ethereum.p2p.discovery.internal.TimerUtil; -import org.hyperledger.besu.ethereum.p2p.discovery.internal.VertxTimerUtil; import org.hyperledger.besu.ethereum.p2p.permissions.PeerPermissions; import org.hyperledger.besu.metrics.BesuMetricCategory; import org.hyperledger.besu.nat.NatService; @@ -30,7 +30,6 @@ import org.hyperledger.besu.util.NetworkUtility; import java.io.IOException; -import java.net.BindException; import java.net.InetSocketAddress; import java.net.SocketException; import java.util.OptionalInt; @@ -84,7 +83,7 @@ private IntSupplier pendingTaskCounter(final EventLoopGroup eventLoopGroup) { @Override protected TimerUtil createTimer() { - return new VertxTimerUtil(vertx); + return new IndirectVertxTimerUtil(vertx); } @Override @@ -94,7 +93,7 @@ protected AsyncExecutor createWorkerExecutor() { @Override protected CompletableFuture listenForConnections() { - CompletableFuture future = new CompletableFuture<>(); + final CompletableFuture future = new CompletableFuture<>(); vertx .createDatagramSocket(new DatagramSocketOptions().setIpV6(NetworkUtility.isIPv6Available())) .listen( @@ -102,14 +101,14 @@ protected CompletableFuture listenForConnections() { return future; } - protected void handleListenerSetup( + private void handleListenerSetup( final AsyncResult listenResult, final CompletableFuture addressFuture) { if (listenResult.failed()) { Throwable cause = listenResult.cause(); LOG.error("An exception occurred when starting the peer discovery agent", cause); - if (cause instanceof BindException || cause instanceof SocketException) { + if (cause instanceof SocketException) { cause = new PeerDiscoveryServiceException( String.format( @@ -136,7 +135,7 @@ protected void handleListenerSetup( socket.exceptionHandler(this::handleException); socket.handler(this::handlePacket); - InetSocketAddress address = + final InetSocketAddress address = new InetSocketAddress(socket.localAddress().host(), socket.localAddress().port()); addressFuture.complete(address); } @@ -144,7 +143,7 @@ protected void handleListenerSetup( @Override protected CompletableFuture sendOutgoingPacket( final DiscoveryPeer peer, final Packet packet) { - CompletableFuture result = new CompletableFuture<>(); + final CompletableFuture result = new CompletableFuture<>(); socket.send( packet.encode(), peer.getEndpoint().getUdpPort(), diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/IndirectVertxTimerUtil.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/IndirectVertxTimerUtil.java new file mode 100644 index 00000000000..d6324584f50 --- /dev/null +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/internal/IndirectVertxTimerUtil.java @@ -0,0 +1,76 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.p2p.discovery.internal; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import io.vertx.core.Vertx; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class IndirectVertxTimerUtil implements TimerUtil { + + private static final Logger LOG = LogManager.getLogger(); + + private final ScheduledExecutorService secheduledExecutor = + Executors.newSingleThreadScheduledExecutor(); + private final AtomicLong nextId = new AtomicLong(0); + private final Map> timers = new HashMap<>(); + private final Vertx vertx; + + public IndirectVertxTimerUtil(final Vertx vertx) { + this.vertx = vertx; + } + + @Override + public long setPeriodic(final long delayInMs, final TimerHandler handler) { + final long id = nextId.get(); + timers.put( + id, + secheduledExecutor.scheduleAtFixedRate( + () -> vertx.executeBlocking(e -> handler.handle(), r -> {}), + delayInMs, + delayInMs, + TimeUnit.MILLISECONDS)); + return id; + } + + @Override + public long setTimer(final long delayInMs, final TimerHandler handler) { + LOG.debug("calling VertxTimerUtil.setTimer {} delayInMs {} handler", delayInMs, handler); + final long id = nextId.get(); + timers.put( + id, + secheduledExecutor.schedule( + () -> vertx.executeBlocking(e -> handler.handle(), r -> timers.remove(id)), + delayInMs, + TimeUnit.MILLISECONDS)); + return id; + } + + @Override + public void cancelTimer(final long timerId) { + final ScheduledFuture timer = timers.remove(timerId); + if (timer != null) { + timer.cancel(true); + } + } +} diff --git a/gradle/versions.gradle b/gradle/versions.gradle index 41f616b0e83..04329b1be31 100644 --- a/gradle/versions.gradle +++ b/gradle/versions.gradle @@ -56,11 +56,11 @@ dependencyManagement { dependency 'io.reactivex.rxjava2:rxjava:2.2.16' - dependency 'io.vertx:vertx-auth-jwt:3.8.4' - dependency 'io.vertx:vertx-codegen:3.8.4' - dependency 'io.vertx:vertx-core:3.8.4' - dependency 'io.vertx:vertx-unit:3.8.4' - dependency 'io.vertx:vertx-web:3.8.4' + dependency 'io.vertx:vertx-auth-jwt:3.8.5' + dependency 'io.vertx:vertx-codegen:3.8.5' + dependency 'io.vertx:vertx-core:3.8.5' + dependency 'io.vertx:vertx-unit:3.8.5' + dependency 'io.vertx:vertx-web:3.8.5' dependency 'junit:junit:4.13'