Skip to content

Commit

Permalink
feat: Refactor Node DNS Resolver to use vertx virtual threads (hyperl…
Browse files Browse the repository at this point in the history
…edger#7189)

Signed-off-by: Usman Saleem <usman@usmans.info>
  • Loading branch information
usmansaleem authored Jun 8, 2024
1 parent 40d6b26 commit 54a3e6a
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ public void start() {
public void stop() {
LOG.info("Stopping DNSDaemon for {}", enrLink);
periodicTaskId.ifPresent(vertx::cancelTimer);
dnsResolver.close();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,9 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;

import com.google.common.base.Splitter;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.dns.DnsClient;
import io.vertx.core.dns.DnsClientOptions;
Expand All @@ -42,9 +39,8 @@

// Adapted from https://github.com/tmio/tuweni and licensed under Apache 2.0
/** Resolves a set of ENR nodes from a host name. */
public class DNSResolver implements AutoCloseable {
public class DNSResolver {
private static final Logger LOG = LoggerFactory.getLogger(DNSResolver.class);
private final ExecutorService rawTxtRecordsExecutor = Executors.newSingleThreadExecutor();
private final String enrLink;
private long seq;
private final DnsClient dnsClient;
Expand Down Expand Up @@ -118,7 +114,7 @@ public long sequence() {
private void visitTree(final ENRTreeLink link, final DNSVisitor visitor) {
Optional<DNSEntry> optionalEntry = resolveRecord(link.domainName());
if (optionalEntry.isEmpty()) {
LOG.debug("No DNS record found for {}", link.domainName());
LOG.trace("No DNS record found for {}", link.domainName());
return;
}

Expand Down Expand Up @@ -146,32 +142,30 @@ private boolean internalVisit(
final String entryName, final String domainName, final DNSVisitor visitor) {
final Optional<DNSEntry> optionalDNSEntry = resolveRecord(entryName + "." + domainName);
if (optionalDNSEntry.isEmpty()) {
LOG.debug("No DNS record found for {}", entryName + "." + domainName);
return true;
}

final DNSEntry entry = optionalDNSEntry.get();
if (entry instanceof ENRNode node) {
// TODO: this always return true because the visitor is reference to list.add
return visitor.visit(node.nodeRecord());
} else if (entry instanceof DNSEntry.ENRTree tree) {
for (String e : tree.entries()) {
// TODO: When would this ever return false?
boolean keepGoing = internalVisit(e, domainName, visitor);
if (!keepGoing) {
return false;
switch (entry) {
case ENRNode node -> {
return visitor.visit(node.nodeRecord());
}
case DNSEntry.ENRTree tree -> {
for (String e : tree.entries()) {
boolean keepGoing = internalVisit(e, domainName, visitor);
if (!keepGoing) {
return false;
}
}
}
} else if (entry instanceof ENRTreeLink link) {
visitTree(link, visitor);
} else {
LOG.debug("Unsupported type of node {}", entry);
case ENRTreeLink link -> visitTree(link, visitor);
default -> LOG.debug("Unsupported type of node {}", entry);
}
return true;
}

/**
* Resolves one DNS record associated with the given domain name.
* Maps TXT DNS record to DNSEntry.
*
* @param domainName the domain name to query
* @return the DNS entry read from the domain. Empty if no record is found.
Expand All @@ -187,51 +181,21 @@ Optional<DNSEntry> resolveRecord(final String domainName) {
* @return the first TXT entry of the DNS record. Empty if no record is found.
*/
Optional<String> resolveRawRecord(final String domainName) {
// vertx-dns is async, kotlin coroutines allows us to await, similarly Java 21 new thread
// model would also allow us to await. For now, we will use CountDownLatch to block the
// current thread until the DNS resolution is complete.
LOG.debug("Resolving TXT records on domain: {}", domainName);
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Optional<String>> record = new AtomicReference<>(Optional.empty());
rawTxtRecordsExecutor.submit(
() -> {
dnsClient
.resolveTXT(domainName)
.onComplete(
ar -> {
if (ar.succeeded()) {
LOG.trace(
"TXT record resolved on domain {}. Result: {}", domainName, ar.result());
record.set(ar.result().stream().findFirst());
} else {
LOG.trace(
"TXT record not resolved on domain {}, because: {}",
domainName,
ar.cause().getMessage());
}
latch.countDown();
});
});

LOG.trace("Resolving TXT records on domain: {}", domainName);
try {
// causes the worker thread to wait. Once we move to Java 21, this can be simplified.
latch.await();
} catch (InterruptedException e) {
LOG.debug("Interrupted while waiting for DNS resolution");
// Future.await parks current virtual thread and waits for the result. Any failure is
// thrown as a Throwable.
return Future.await(dnsClient.resolveTXT(domainName)).stream().findFirst();
} catch (final Throwable e) {
LOG.trace("Error while resolving TXT records on domain: {}", domainName, e);
return Optional.empty();
}

return record.get();
}

private boolean checkSignature(
final ENRTreeRoot root, final SECP256K1.PublicKey pubKey, final SECP256K1.Signature sig) {
Bytes32 hash =
final Bytes32 hash =
Hash.keccak256(Bytes.wrap(root.signedContent().getBytes(StandardCharsets.UTF_8)));
return SECP256K1.verifyHashed(hash, sig, pubKey);
}

@Override
public void close() {
rawTxtRecordsExecutor.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@
import io.vertx.core.Future;
import io.vertx.core.ThreadingModel;
import io.vertx.core.Vertx;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.devp2p.EthereumNodeRecord;
import org.slf4j.Logger;
Expand Down Expand Up @@ -152,7 +151,7 @@ public class DefaultP2PNetwork implements P2PNetwork {
private final CountDownLatch shutdownLatch = new CountDownLatch(2);
private final Duration shutdownTimeout = Duration.ofSeconds(15);
private final Vertx vertx;
private final AtomicReference<Optional<Pair<String, DNSDaemon>>> dnsDaemonRef =
private final AtomicReference<Optional<DNSDaemon>> dnsDaemonRef =
new AtomicReference<>(Optional.empty());

/**
Expand Down Expand Up @@ -242,17 +241,16 @@ public void start() {
600000L,
config.getDnsDiscoveryServerOverride().orElse(null));

// TODO: Java 21, we can move to Virtual Thread model
// Use Java 21 virtual thread to deploy verticle
final DeploymentOptions options =
new DeploymentOptions()
.setThreadingModel(ThreadingModel.WORKER)
.setThreadingModel(ThreadingModel.VIRTUAL_THREAD)
.setInstances(1)
.setWorkerPoolSize(1);

final Future<String> deployId = vertx.deployVerticle(dnsDaemon, options);
final String dnsDaemonDeployId =
deployId.toCompletionStage().toCompletableFuture().join();
dnsDaemonRef.set(Optional.of(Pair.of(dnsDaemonDeployId, dnsDaemon)));
deployId.toCompletionStage().toCompletableFuture().join();
dnsDaemonRef.set(Optional.of(dnsDaemon));
});

final int listeningPort = rlpxAgent.start().join();
Expand Down Expand Up @@ -301,7 +299,7 @@ public void stop() {

// since dnsDaemon is a vertx verticle, vertx.close will undeploy it.
// However, we can safely call stop as well.
dnsDaemonRef.get().map(Pair::getRight).ifPresent(DNSDaemon::stop);
dnsDaemonRef.get().ifPresent(DNSDaemon::stop);

peerConnectionScheduler.shutdownNow();
peerDiscoveryAgent.stop().whenComplete((res, err) -> shutdownLatch.countDown());
Expand Down Expand Up @@ -358,7 +356,7 @@ public boolean removeMaintainedConnectionPeer(final Peer peer) {

@VisibleForTesting
Optional<DNSDaemon> getDnsDaemon() {
return dnsDaemonRef.get().map(Pair::getRight);
return dnsDaemonRef.get();
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.io.IOException;
import java.security.Security;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -38,12 +37,11 @@
class DNSDaemonTest {
private static final String holeskyEnr =
"enrtree://AKA3AM6LPBYEUDMVNU3BSVQJ5AD45Y7YPOHJLEF6W26QOE4VTUDPE@all.holesky.ethdisco.net";
// private static MockDNSServer mockDNSServer;
private final MockDnsServerVerticle mockDnsServerVerticle = new MockDnsServerVerticle();
private DNSDaemon dnsDaemon;

@BeforeAll
static void setup() throws IOException {
static void setup() {
Security.addProvider(new BouncyCastleProvider());
}

Expand All @@ -68,7 +66,9 @@ void testDNSDaemon(final Vertx vertx, final VertxTestContext testContext)
"localhost:" + mockDnsServerVerticle.port());

final DeploymentOptions options =
new DeploymentOptions().setThreadingModel(ThreadingModel.WORKER).setWorkerPoolSize(1);
new DeploymentOptions()
.setThreadingModel(ThreadingModel.VIRTUAL_THREAD)
.setWorkerPoolSize(1);
vertx.deployVerticle(dnsDaemon, options);
}

Expand Down Expand Up @@ -109,7 +109,9 @@ void testDNSDaemonPeriodic(final Vertx vertx, final VertxTestContext testContext
"localhost:" + mockDnsServerVerticle.port());

final DeploymentOptions options =
new DeploymentOptions().setThreadingModel(ThreadingModel.WORKER).setWorkerPoolSize(1);
new DeploymentOptions()
.setThreadingModel(ThreadingModel.VIRTUAL_THREAD)
.setWorkerPoolSize(1);
vertx.deployVerticle(dnsDaemon, options);
}

Expand Down

0 comments on commit 54a3e6a

Please sign in to comment.