Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #929: Perform async DNS resolution each time attempting connect… #930

Merged
merged 5 commits into from
Jan 2, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Issue #929: Perform async DNS resolution each time attempting connect…
…ion to service Url

Also attempt to connecto to all IP addresses present in DNS record
  • Loading branch information
merlimat committed Nov 30, 2017
commit 019bfed6fece25e02d2abfe4f7212fb1b76694ba
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.slf4j.LoggerFactory;

import io.netty.buffer.ByteBuf;
import io.netty.resolver.InetSocketAddressResolver;

public class BinaryProtoLookupService implements LookupService {

Expand All @@ -49,7 +50,10 @@ public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, bool
URI uri;
try {
uri = new URI(serviceUrl);
this.serviceAddress = new InetSocketAddress(uri.getHost(), uri.getPort());

// Don't attempt to resolve the hostname in DNS at this point. It will be done each time when attempting to
// connect
this.serviceAddress = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
} catch (Exception e) {
log.error("Invalid service-url {} provided {}", serviceUrl, e.getMessage(), e);
throw new PulsarClientException.InvalidServiceURL(e);
Expand All @@ -59,7 +63,8 @@ public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, bool
/**
* Calls broker binaryProto-lookup api to find broker-service address which can serve a given topic.
*
* @param destination: topic-name
* @param destination:
* topic-name
* @return broker-socket-address that serves given topic
*/
public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(DestinationName destination) {
Expand All @@ -74,7 +79,6 @@ public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(D
return getPartitionedTopicMetadata(serviceAddress, destination);
}


private CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> findBroker(InetSocketAddress socketAddress,
boolean authoritative, DestinationName destination) {
CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> addressFuture = new CompletableFuture<>();
Expand All @@ -93,7 +97,7 @@ private CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> findBroker
uri = new URI(serviceUrl);
}

InetSocketAddress responseBrokerAddress = new InetSocketAddress(uri.getHost(), uri.getPort());
InetSocketAddress responseBrokerAddress = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());

// (2) redirect to given address if response is: redirect
if (lookupDataResult.redirect) {
Expand Down Expand Up @@ -138,7 +142,6 @@ private CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> findBroker
return addressFuture;
}


private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(InetSocketAddress socketAddress,
DestinationName destination) {

Expand Down Expand Up @@ -170,7 +173,7 @@ private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
}

public String getServiceUrl() {
return serviceAddress.toString();
return serviceAddress.toString();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.security.cert.X509Certificate;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -37,6 +40,7 @@

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
Expand All @@ -47,6 +51,9 @@
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.resolver.dns.DnsNameResolver;
import io.netty.resolver.dns.DnsNameResolverBuilder;
import io.netty.util.concurrent.Future;

public class ConnectionPool implements Closeable {
private final ConcurrentHashMap<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> pool;
Expand All @@ -55,6 +62,8 @@ public class ConnectionPool implements Closeable {
private final EventLoopGroup eventLoopGroup;
private final int maxConnectionsPerHosts;

private final DnsNameResolver dnsResolver;

private static final int MaxMessageSize = 5 * 1024 * 1024;
public static final String TLS_HANDLER = "tls";

Expand Down Expand Up @@ -101,6 +110,8 @@ public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("handler", new ClientCnx(client));
}
});

this.dnsResolver = new DnsNameResolverBuilder(eventLoopGroup.next()).build();
}

private static final Random random = new Random();
Expand All @@ -114,18 +125,20 @@ public CompletableFuture<ClientCnx> getConnection(final InetSocketAddress addres
* <p>
* The connection can either be created or be coming from the pool itself.
* <p>
* When specifying multiple addresses, the logicalAddress is used as a tag for the broker,
* while the physicalAddress is where the connection is actually happening.
* When specifying multiple addresses, the logicalAddress is used as a tag for the broker, while the physicalAddress
* is where the connection is actually happening.
* <p>
* These two addresses can be different when the client is forced to connect through
* a proxy layer. Essentially, the pool is using the logical address as a way to
* decide whether to reuse a particular connection.
* These two addresses can be different when the client is forced to connect through a proxy layer. Essentially, the
* pool is using the logical address as a way to decide whether to reuse a particular connection.
*
* @param logicalAddress the address to use as the broker tag
* @param physicalAddress the real address where the TCP connection should be made
* @param logicalAddress
* the address to use as the broker tag
* @param physicalAddress
* the real address where the TCP connection should be made
* @return a future that will produce the ClientCnx object
*/
public CompletableFuture<ClientCnx> getConnection(InetSocketAddress logicalAddress, InetSocketAddress physicalAddress) {
public CompletableFuture<ClientCnx> getConnection(InetSocketAddress logicalAddress,
InetSocketAddress physicalAddress) {
if (maxConnectionsPerHosts == 0) {
// Disable pooling
return createConnection(logicalAddress, physicalAddress, -1);
Expand All @@ -146,17 +159,10 @@ private CompletableFuture<ClientCnx> createConnection(InetSocketAddress logicalA
final CompletableFuture<ClientCnx> cnxFuture = new CompletableFuture<ClientCnx>();

// Trigger async connect to broker
bootstrap.connect(physicalAddress).addListener((ChannelFuture future) -> {
if (!future.isSuccess()) {
log.warn("Failed to open connection to {} : {}", physicalAddress, future.cause().getClass().getSimpleName());
cnxFuture.completeExceptionally(new PulsarClientException(future.cause()));
cleanupConnection(logicalAddress, connectionKey, cnxFuture);
return;
}

log.info("[{}] Connected to server", future.channel());
createConnection(physicalAddress).thenAccept(channel -> {
log.info("[{}] Connected to server", channel);

future.channel().closeFuture().addListener(v -> {
channel.closeFuture().addListener(v -> {
// Remove connection from pool when it gets closed
if (log.isDebugEnabled()) {
log.debug("Removing closed connection from pool: {}", v);
Expand All @@ -166,10 +172,10 @@ private CompletableFuture<ClientCnx> createConnection(InetSocketAddress logicalA

// We are connected to broker, but need to wait until the connect/connected handshake is
// complete
final ClientCnx cnx = (ClientCnx) future.channel().pipeline().get("handler");
if (!future.channel().isActive() || cnx == null) {
final ClientCnx cnx = (ClientCnx) channel.pipeline().get("handler");
if (!channel.isActive() || cnx == null) {
if (log.isDebugEnabled()) {
log.debug("[{}] Connection was already closed by the time we got notified", future.channel());
log.debug("[{}] Connection was already closed by the time we got notified", channel);
}
cnxFuture.completeExceptionally(new ChannelException("Connection already closed"));
return;
Expand All @@ -195,14 +201,91 @@ private CompletableFuture<ClientCnx> createConnection(InetSocketAddress logicalA
cnx.ctx().close();
return null;
});
}).exceptionally(exception -> {
log.warn("Failed to open connection to {} : {}", physicalAddress, exception.getClass().getSimpleName());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any specific reason to log exception.getClass().getSimpleName() instead message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had that already in the existing code. It was to just use "ConnectionError" type names I think

cnxFuture.completeExceptionally(new PulsarClientException(exception));
cleanupConnection(logicalAddress, connectionKey, cnxFuture);
return null;
});

return cnxFuture;
}

/**
* Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS server
*/
private CompletableFuture<Channel> createConnection(InetSocketAddress unresolvedAddress) {
String hostname = unresolvedAddress.getHostString();
int port = unresolvedAddress.getPort();

// Resolve DNS --> Attempt to connect to all IP addresses until once succeeds
return resolveName(hostname)
.thenCompose(inetAddresses -> connectToResolvedAddresses(inetAddresses.iterator(), port));
}

/**
* Try to connect to a sequence of IP addresses until a successfull connection can be made, or fail if no address is
* working
*/
private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetAddress> unresolvedAddresses, int port) {
CompletableFuture<Channel> future = new CompletableFuture<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall. 👍

If the good address is last in the iterator, instead of iterating to resolve every time, should we add some sort of affinity to reverse order? That'll complicate the logic though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing is that DNS servers typically do that rotation of IPs already, for load balancing.

In any case, right now we're just trying to 1st IP :) (and then we stick to it...)


connectToAddress(unresolvedAddresses.next(), port).thenAccept(channel -> {
// Successfully connected to server
future.complete(channel);
}).exceptionally(exception -> {
if (unresolvedAddresses.hasNext()) {
// Try next IP address
connectToResolvedAddresses(unresolvedAddresses, port).thenAccept(channel -> {
future.complete(channel);
}).exceptionally(ex -> {
// This is already unwinding the recursive call
future.completeExceptionally(ex);
return null;
});
} else {
// Failed to connect to any IP address
future.completeExceptionally(exception);
}
return null;
});

return future;
}

private CompletableFuture<List<InetAddress>> resolveName(String hostname) {
CompletableFuture<List<InetAddress>> future = new CompletableFuture<>();
dnsResolver.resolveAll(hostname).addListener((Future<List<InetAddress>> resolveFuture) -> {
if (resolveFuture.isSuccess()) {
future.complete(resolveFuture.get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible that resolveFuture.get() returns null incase it doesn't find or it will not return successful future when it doesn't find?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that if the dns operation completes successfully, it should always carry at least 1 IP address

} else {
future.completeExceptionally(resolveFuture.cause());
}
});
return future;
}

/**
* Attempt to establish a TCP connection to an already resolved single IP address
*/
private CompletableFuture<Channel> connectToAddress(InetAddress ipAddress, int port) {
CompletableFuture<Channel> future = new CompletableFuture<>();

bootstrap.connect(ipAddress, port).addListener((ChannelFuture channelFuture) -> {
if (channelFuture.isSuccess()) {
future.complete(channelFuture.channel());
} else {
future.completeExceptionally(channelFuture.cause());
}
});

return future;
}

@Override
public void close() throws IOException {
eventLoopGroup.shutdownGracefully();
dnsResolver.close();
}

private void cleanupConnection(InetSocketAddress address, int connectionKey,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(D
uri = new URI(serviceUrl);
}

InetSocketAddress brokerAddress = new InetSocketAddress(uri.getHost(), uri.getPort());
InetSocketAddress brokerAddress = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
return CompletableFuture.completedFuture(Pair.of(brokerAddress, brokerAddress));
} catch (Exception e) {
// Failed to parse url
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private void performLookup(long clientRequestId, String topic, String brokerServ
return;
}

InetSocketAddress addr = new InetSocketAddress(brokerURI.getHost(), brokerURI.getPort());
InetSocketAddress addr = InetSocketAddress.createUnresolved(brokerURI.getHost(), brokerURI.getPort());
if (log.isDebugEnabled()) {
log.debug("Getting connections to '{}'", addr);
}
Expand Down