From 54a3e6a2871635cf7b46b2ffd88bbfdb508cccc9 Mon Sep 17 00:00:00 2001 From: Usman Saleem Date: Sat, 8 Jun 2024 20:18:55 +1000 Subject: [PATCH] feat: Refactor Node DNS Resolver to use vertx virtual threads (#7189) Signed-off-by: Usman Saleem --- .../ethereum/p2p/discovery/dns/DNSDaemon.java | 1 - .../p2p/discovery/dns/DNSResolver.java | 84 ++++++------------- .../p2p/network/DefaultP2PNetwork.java | 16 ++-- .../p2p/discovery/dns/DNSDaemonTest.java | 12 +-- 4 files changed, 38 insertions(+), 75 deletions(-) diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/dns/DNSDaemon.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/dns/DNSDaemon.java index d17c846be79..ef794ae2047 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/dns/DNSDaemon.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/dns/DNSDaemon.java @@ -88,7 +88,6 @@ public void start() { public void stop() { LOG.info("Stopping DNSDaemon for {}", enrLink); periodicTaskId.ifPresent(vertx::cancelTimer); - dnsResolver.close(); } /** diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/dns/DNSResolver.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/dns/DNSResolver.java index d42304ed043..0be4ca619d1 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/dns/DNSResolver.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/discovery/dns/DNSResolver.java @@ -24,12 +24,9 @@ import java.util.Collections; import java.util.List; import java.util.Optional; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicReference; import com.google.common.base.Splitter; +import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.dns.DnsClient; import io.vertx.core.dns.DnsClientOptions; @@ -42,9 +39,8 @@ // Adapted from https://github.com/tmio/tuweni and licensed under Apache 2.0 /** Resolves a set of ENR nodes from a host name. */ -public class DNSResolver implements AutoCloseable { +public class DNSResolver { private static final Logger LOG = LoggerFactory.getLogger(DNSResolver.class); - private final ExecutorService rawTxtRecordsExecutor = Executors.newSingleThreadExecutor(); private final String enrLink; private long seq; private final DnsClient dnsClient; @@ -118,7 +114,7 @@ public long sequence() { private void visitTree(final ENRTreeLink link, final DNSVisitor visitor) { Optional optionalEntry = resolveRecord(link.domainName()); if (optionalEntry.isEmpty()) { - LOG.debug("No DNS record found for {}", link.domainName()); + LOG.trace("No DNS record found for {}", link.domainName()); return; } @@ -146,32 +142,30 @@ private boolean internalVisit( final String entryName, final String domainName, final DNSVisitor visitor) { final Optional optionalDNSEntry = resolveRecord(entryName + "." + domainName); if (optionalDNSEntry.isEmpty()) { - LOG.debug("No DNS record found for {}", entryName + "." + domainName); return true; } final DNSEntry entry = optionalDNSEntry.get(); - if (entry instanceof ENRNode node) { - // TODO: this always return true because the visitor is reference to list.add - return visitor.visit(node.nodeRecord()); - } else if (entry instanceof DNSEntry.ENRTree tree) { - for (String e : tree.entries()) { - // TODO: When would this ever return false? - boolean keepGoing = internalVisit(e, domainName, visitor); - if (!keepGoing) { - return false; + switch (entry) { + case ENRNode node -> { + return visitor.visit(node.nodeRecord()); + } + case DNSEntry.ENRTree tree -> { + for (String e : tree.entries()) { + boolean keepGoing = internalVisit(e, domainName, visitor); + if (!keepGoing) { + return false; + } } } - } else if (entry instanceof ENRTreeLink link) { - visitTree(link, visitor); - } else { - LOG.debug("Unsupported type of node {}", entry); + case ENRTreeLink link -> visitTree(link, visitor); + default -> LOG.debug("Unsupported type of node {}", entry); } return true; } /** - * Resolves one DNS record associated with the given domain name. + * Maps TXT DNS record to DNSEntry. * * @param domainName the domain name to query * @return the DNS entry read from the domain. Empty if no record is found. @@ -187,51 +181,21 @@ Optional resolveRecord(final String domainName) { * @return the first TXT entry of the DNS record. Empty if no record is found. */ Optional resolveRawRecord(final String domainName) { - // vertx-dns is async, kotlin coroutines allows us to await, similarly Java 21 new thread - // model would also allow us to await. For now, we will use CountDownLatch to block the - // current thread until the DNS resolution is complete. - LOG.debug("Resolving TXT records on domain: {}", domainName); - final CountDownLatch latch = new CountDownLatch(1); - final AtomicReference> record = new AtomicReference<>(Optional.empty()); - rawTxtRecordsExecutor.submit( - () -> { - dnsClient - .resolveTXT(domainName) - .onComplete( - ar -> { - if (ar.succeeded()) { - LOG.trace( - "TXT record resolved on domain {}. Result: {}", domainName, ar.result()); - record.set(ar.result().stream().findFirst()); - } else { - LOG.trace( - "TXT record not resolved on domain {}, because: {}", - domainName, - ar.cause().getMessage()); - } - latch.countDown(); - }); - }); - + LOG.trace("Resolving TXT records on domain: {}", domainName); try { - // causes the worker thread to wait. Once we move to Java 21, this can be simplified. - latch.await(); - } catch (InterruptedException e) { - LOG.debug("Interrupted while waiting for DNS resolution"); + // Future.await parks current virtual thread and waits for the result. Any failure is + // thrown as a Throwable. + return Future.await(dnsClient.resolveTXT(domainName)).stream().findFirst(); + } catch (final Throwable e) { + LOG.trace("Error while resolving TXT records on domain: {}", domainName, e); + return Optional.empty(); } - - return record.get(); } private boolean checkSignature( final ENRTreeRoot root, final SECP256K1.PublicKey pubKey, final SECP256K1.Signature sig) { - Bytes32 hash = + final Bytes32 hash = Hash.keccak256(Bytes.wrap(root.signedContent().getBytes(StandardCharsets.UTF_8))); return SECP256K1.verifyHashed(hash, sig, pubKey); } - - @Override - public void close() { - rawTxtRecordsExecutor.shutdown(); - } } diff --git a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/network/DefaultP2PNetwork.java b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/network/DefaultP2PNetwork.java index 573effa173a..55a536976fe 100644 --- a/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/network/DefaultP2PNetwork.java +++ b/ethereum/p2p/src/main/java/org/hyperledger/besu/ethereum/p2p/network/DefaultP2PNetwork.java @@ -82,7 +82,6 @@ import io.vertx.core.Future; import io.vertx.core.ThreadingModel; import io.vertx.core.Vertx; -import org.apache.commons.lang3.tuple.Pair; import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.devp2p.EthereumNodeRecord; import org.slf4j.Logger; @@ -152,7 +151,7 @@ public class DefaultP2PNetwork implements P2PNetwork { private final CountDownLatch shutdownLatch = new CountDownLatch(2); private final Duration shutdownTimeout = Duration.ofSeconds(15); private final Vertx vertx; - private final AtomicReference>> dnsDaemonRef = + private final AtomicReference> dnsDaemonRef = new AtomicReference<>(Optional.empty()); /** @@ -242,17 +241,16 @@ public void start() { 600000L, config.getDnsDiscoveryServerOverride().orElse(null)); - // TODO: Java 21, we can move to Virtual Thread model + // Use Java 21 virtual thread to deploy verticle final DeploymentOptions options = new DeploymentOptions() - .setThreadingModel(ThreadingModel.WORKER) + .setThreadingModel(ThreadingModel.VIRTUAL_THREAD) .setInstances(1) .setWorkerPoolSize(1); final Future deployId = vertx.deployVerticle(dnsDaemon, options); - final String dnsDaemonDeployId = - deployId.toCompletionStage().toCompletableFuture().join(); - dnsDaemonRef.set(Optional.of(Pair.of(dnsDaemonDeployId, dnsDaemon))); + deployId.toCompletionStage().toCompletableFuture().join(); + dnsDaemonRef.set(Optional.of(dnsDaemon)); }); final int listeningPort = rlpxAgent.start().join(); @@ -301,7 +299,7 @@ public void stop() { // since dnsDaemon is a vertx verticle, vertx.close will undeploy it. // However, we can safely call stop as well. - dnsDaemonRef.get().map(Pair::getRight).ifPresent(DNSDaemon::stop); + dnsDaemonRef.get().ifPresent(DNSDaemon::stop); peerConnectionScheduler.shutdownNow(); peerDiscoveryAgent.stop().whenComplete((res, err) -> shutdownLatch.countDown()); @@ -358,7 +356,7 @@ public boolean removeMaintainedConnectionPeer(final Peer peer) { @VisibleForTesting Optional getDnsDaemon() { - return dnsDaemonRef.get().map(Pair::getRight); + return dnsDaemonRef.get(); } @VisibleForTesting diff --git a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/dns/DNSDaemonTest.java b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/dns/DNSDaemonTest.java index cb465093d07..93074d37104 100644 --- a/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/dns/DNSDaemonTest.java +++ b/ethereum/p2p/src/test/java/org/hyperledger/besu/ethereum/p2p/discovery/dns/DNSDaemonTest.java @@ -16,7 +16,6 @@ import static org.assertj.core.api.Assertions.assertThat; -import java.io.IOException; import java.security.Security; import java.util.concurrent.atomic.AtomicInteger; @@ -38,12 +37,11 @@ class DNSDaemonTest { private static final String holeskyEnr = "enrtree://AKA3AM6LPBYEUDMVNU3BSVQJ5AD45Y7YPOHJLEF6W26QOE4VTUDPE@all.holesky.ethdisco.net"; - // private static MockDNSServer mockDNSServer; private final MockDnsServerVerticle mockDnsServerVerticle = new MockDnsServerVerticle(); private DNSDaemon dnsDaemon; @BeforeAll - static void setup() throws IOException { + static void setup() { Security.addProvider(new BouncyCastleProvider()); } @@ -68,7 +66,9 @@ void testDNSDaemon(final Vertx vertx, final VertxTestContext testContext) "localhost:" + mockDnsServerVerticle.port()); final DeploymentOptions options = - new DeploymentOptions().setThreadingModel(ThreadingModel.WORKER).setWorkerPoolSize(1); + new DeploymentOptions() + .setThreadingModel(ThreadingModel.VIRTUAL_THREAD) + .setWorkerPoolSize(1); vertx.deployVerticle(dnsDaemon, options); } @@ -109,7 +109,9 @@ void testDNSDaemonPeriodic(final Vertx vertx, final VertxTestContext testContext "localhost:" + mockDnsServerVerticle.port()); final DeploymentOptions options = - new DeploymentOptions().setThreadingModel(ThreadingModel.WORKER).setWorkerPoolSize(1); + new DeploymentOptions() + .setThreadingModel(ThreadingModel.VIRTUAL_THREAD) + .setWorkerPoolSize(1); vertx.deployVerticle(dnsDaemon, options); }