Skip to content

Commit

Permalink
fine, we'll make our own timers
Browse files Browse the repository at this point in the history
Signed-off-by: Danno Ferrin <danno.ferrin@gmail.com>
  • Loading branch information
shemnon committed May 28, 2020
1 parent ce9e619 commit 8777ab1
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,18 @@

import org.hyperledger.besu.crypto.NodeKey;
import org.hyperledger.besu.ethereum.p2p.config.DiscoveryConfiguration;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.IndirectVertxTimerUtil;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.Packet;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerDiscoveryController;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.PeerDiscoveryController.AsyncExecutor;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.TimerUtil;
import org.hyperledger.besu.ethereum.p2p.discovery.internal.VertxTimerUtil;
import org.hyperledger.besu.ethereum.p2p.permissions.PeerPermissions;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.nat.NatService;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.util.NetworkUtility;

import java.io.IOException;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.util.OptionalInt;
Expand Down Expand Up @@ -84,7 +83,7 @@ private IntSupplier pendingTaskCounter(final EventLoopGroup eventLoopGroup) {

@Override
protected TimerUtil createTimer() {
return new VertxTimerUtil(vertx);
return new IndirectVertxTimerUtil(vertx);
}

@Override
Expand All @@ -94,22 +93,22 @@ protected AsyncExecutor createWorkerExecutor() {

@Override
protected CompletableFuture<InetSocketAddress> listenForConnections() {
CompletableFuture<InetSocketAddress> future = new CompletableFuture<>();
final CompletableFuture<InetSocketAddress> future = new CompletableFuture<>();
vertx
.createDatagramSocket(new DatagramSocketOptions().setIpV6(NetworkUtility.isIPv6Available()))
.listen(
config.getBindPort(), config.getBindHost(), res -> handleListenerSetup(res, future));
return future;
}

protected void handleListenerSetup(
private void handleListenerSetup(
final AsyncResult<DatagramSocket> listenResult,
final CompletableFuture<InetSocketAddress> addressFuture) {
if (listenResult.failed()) {
Throwable cause = listenResult.cause();
LOG.error("An exception occurred when starting the peer discovery agent", cause);

if (cause instanceof BindException || cause instanceof SocketException) {
if (cause instanceof SocketException) {
cause =
new PeerDiscoveryServiceException(
String.format(
Expand All @@ -136,15 +135,15 @@ protected void handleListenerSetup(
socket.exceptionHandler(this::handleException);
socket.handler(this::handlePacket);

InetSocketAddress address =
final InetSocketAddress address =
new InetSocketAddress(socket.localAddress().host(), socket.localAddress().port());
addressFuture.complete(address);
}

@Override
protected CompletableFuture<Void> sendOutgoingPacket(
final DiscoveryPeer peer, final Packet packet) {
CompletableFuture<Void> result = new CompletableFuture<>();
final CompletableFuture<Void> result = new CompletableFuture<>();
socket.send(
packet.encode(),
peer.getEndpoint().getUdpPort(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.p2p.discovery.internal;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import io.vertx.core.Vertx;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class IndirectVertxTimerUtil implements TimerUtil {

private static final Logger LOG = LogManager.getLogger();

private final ScheduledExecutorService secheduledExecutor =
Executors.newSingleThreadScheduledExecutor();
private final AtomicLong nextId = new AtomicLong(0);
private final Map<Long, ScheduledFuture<?>> timers = new HashMap<>();
private final Vertx vertx;

public IndirectVertxTimerUtil(final Vertx vertx) {
this.vertx = vertx;
}

@Override
public long setPeriodic(final long delayInMs, final TimerHandler handler) {
final long id = nextId.get();
timers.put(
id,
secheduledExecutor.scheduleAtFixedRate(
() -> vertx.executeBlocking(e -> handler.handle(), r -> {}),
delayInMs,
delayInMs,
TimeUnit.MILLISECONDS));
return id;
}

@Override
public long setTimer(final long delayInMs, final TimerHandler handler) {
LOG.debug("calling VertxTimerUtil.setTimer {} delayInMs {} handler", delayInMs, handler);
final long id = nextId.get();
timers.put(
id,
secheduledExecutor.schedule(
() -> vertx.executeBlocking(e -> handler.handle(), r -> timers.remove(id)),
delayInMs,
TimeUnit.MILLISECONDS));
return id;
}

@Override
public void cancelTimer(final long timerId) {
final ScheduledFuture<?> timer = timers.remove(timerId);
if (timer != null) {
timer.cancel(true);
}
}
}
10 changes: 5 additions & 5 deletions gradle/versions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ dependencyManagement {

dependency 'io.reactivex.rxjava2:rxjava:2.2.16'

dependency 'io.vertx:vertx-auth-jwt:3.8.4'
dependency 'io.vertx:vertx-codegen:3.8.4'
dependency 'io.vertx:vertx-core:3.8.4'
dependency 'io.vertx:vertx-unit:3.8.4'
dependency 'io.vertx:vertx-web:3.8.4'
dependency 'io.vertx:vertx-auth-jwt:3.8.5'
dependency 'io.vertx:vertx-codegen:3.8.5'
dependency 'io.vertx:vertx-core:3.8.5'
dependency 'io.vertx:vertx-unit:3.8.5'
dependency 'io.vertx:vertx-web:3.8.5'

dependency 'junit:junit:4.13'

Expand Down

0 comments on commit 8777ab1

Please sign in to comment.