Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #929: Perform async DNS resolution each time attempting connect… #930

Merged
merged 5 commits into from
Jan 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import java.net.InetAddress;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.mockito.Mockito;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import com.google.common.collect.Lists;

import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;

public class ConnectionPoolTest extends MockedPulsarServiceBaseTest {

String serviceUrl = "pulsar://non-existing-dns-name:" + BROKER_PORT;

@BeforeClass
@Override
protected void setup() throws Exception {
super.internalSetup();
}

@AfterClass
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testSingleIpAddress() throws Exception {
ClientConfiguration conf = new ClientConfiguration();
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("test"));
ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop));
PulsarClientImpl client = new PulsarClientImpl(serviceUrl, conf, eventLoop, pool);

List<InetAddress> result = Lists.newArrayList();
result.add(InetAddress.getByName("127.0.0.1"));
Mockito.when(pool.resolveName("non-existing-dns-name")).thenReturn(CompletableFuture.completedFuture(result));

client.createProducer("persistent://sample/standalone/ns/my-topic");

client.close();
}

@Test
public void testDoubleIpAddress() throws Exception {
String serviceUrl = "pulsar://non-existing-dns-name:" + BROKER_PORT;

ClientConfiguration conf = new ClientConfiguration();
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("test"));
ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop));
PulsarClientImpl client = new PulsarClientImpl(serviceUrl, conf, eventLoop, pool);

List<InetAddress> result = Lists.newArrayList();

// Add a non existent IP to the response to check that we're trying the 2nd address as well
result.add(InetAddress.getByName("127.0.0.99"));
result.add(InetAddress.getByName("127.0.0.1"));
Mockito.when(pool.resolveName("non-existing-dns-name")).thenReturn(CompletableFuture.completedFuture(result));

// Create producer should succeed by trying the 2nd IP
client.createProducer("persistent://sample/standalone/ns/my-topic");
client.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@

import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
Expand All @@ -38,30 +42,19 @@

import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.DoubleByteBuf;
import org.apache.pulsar.common.api.Commands.ChecksumType;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata.Builder;
import org.mockito.cglib.proxy.Enhancer;
import org.mockito.cglib.proxy.MethodInterceptor;
import org.mockito.cglib.proxy.MethodProxy;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand All @@ -70,8 +63,6 @@
import org.testng.annotations.Test;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.ResourceLeakDetector;

public class MessageIdTest extends BrokerTestBase {
private static final Logger log = LoggerFactory.getLogger(MessageIdTest.class);
Expand Down Expand Up @@ -264,24 +255,24 @@ public void partitionedProducerSend() throws PulsarClientException, PulsarAdminE
/**
* Verifies: different versions of broker-deployment (one broker understands Checksum and other
* doesn't in that case remove checksum before sending to broker-2)
*
*
* client first produce message with checksum and then retries to send message due to connection unavailable. But this time, if
* broker doesn't understand checksum: then client should remove checksum from the message before sending to broker.
*
* 1. stop broker
* 2. client compute checksum and add into message
* 3. produce 2 messages and corrupt 1 message
* 4. start broker with lower version (which doesn't support checksum)
* 5. client reconnects to broker and due to incompatibility of version: removes checksum from message
* 6. broker doesn't do checksum validation and persist message
*
* 1. stop broker
* 2. client compute checksum and add into message
* 3. produce 2 messages and corrupt 1 message
* 4. start broker with lower version (which doesn't support checksum)
* 5. client reconnects to broker and due to incompatibility of version: removes checksum from message
* 6. broker doesn't do checksum validation and persist message
* 7. client receives ack
*
*
* @throws Exception
*/
@Test
public void testChecksumVersionComptability() throws Exception {
final String topicName = "persistent://prop/use/ns-abc/topic1";

// 1. producer connect
ProducerImpl prod = (ProducerImpl) pulsarClient.createProducer(topicName);
ProducerImpl producer = spy(prod);
Expand All @@ -302,7 +293,8 @@ public void testChecksumVersionComptability() throws Exception {
// mock-value from brokerChecksumSupportedVersion
((PulsarClientImpl) pulsarClient).timer().stop();

ClientCnx mockClientCnx = spy(new ClientCnx((PulsarClientImpl) pulsarClient));
ClientCnx mockClientCnx = spy(
new ClientCnx(new ClientConfiguration(), ((PulsarClientImpl) pulsarClient).eventLoopGroup()));
doReturn(producer.brokerChecksumSupportedVersion() - 1).when(mockClientCnx).getRemoteEndpointProtocolVersion();
prod.setClientCnx(mockClientCnx);

Expand Down Expand Up @@ -366,7 +358,8 @@ public void testChecksumReconnection() throws Exception {
((PulsarClientImpl) pulsarClient).timer().stop();

// set clientCnx mock to get non-checksum supported version
ClientCnx mockClientCnx = spy(new ClientCnx((PulsarClientImpl) pulsarClient));
ClientCnx mockClientCnx = spy(
new ClientCnx(new ClientConfiguration(), ((PulsarClientImpl) pulsarClient).eventLoopGroup()));
doReturn(producer.brokerChecksumSupportedVersion() - 1).when(mockClientCnx).getRemoteEndpointProtocolVersion();
prod.setClientCnx(mockClientCnx);

Expand Down Expand Up @@ -408,22 +401,22 @@ public void testChecksumReconnection() throws Exception {
assertEquals(new String(msg.getData()), "message-3");

}


/**
* Verifies: if message is corrupted before sending to broker and if broker gives checksum error: then
* 1. Client-Producer recomputes checksum with modified data
* 2. Retry message-send again
* 3. Broker verifies checksum
* 3. Broker verifies checksum
* 4. client receives send-ack success
*
*
* @throws Exception
*/
@Test
public void testCorruptMessageRemove() throws Exception {

final String topicName = "persistent://prop/use/ns-abc/retry-topic";

ProducerConfiguration config = new ProducerConfiguration();
config.setSendTimeout(10, TimeUnit.MINUTES);
// 1. producer connect
Expand Down Expand Up @@ -489,25 +482,26 @@ public void testCorruptMessageRemove() throws Exception {
assertFalse(producer.verifyLocalBufferIsNotCorrupted(op));

assertEquals(producer.getPendingQueueSize(), 0);

// [2] test-recoverChecksumError functionality
stopBroker();
MessageImpl msg1 = (MessageImpl) MessageBuilder.create().setContent("message-1".getBytes()).build();
future = producer.sendAsync(msg1);
ClientCnx cnx = spy(new ClientCnx((PulsarClientImpl)pulsarClient) {});
ClientCnx cnx = spy(
new ClientCnx(new ClientConfiguration(), ((PulsarClientImpl) pulsarClient).eventLoopGroup()));
String exc = "broker is already stopped";
// when client-try to recover checksum by resending to broker: throw exception as broker is stopped
doThrow(new IllegalStateException(exc)).when(cnx).ctx();
try {
producer.recoverChecksumError(cnx, 1);
producer.recoverChecksumError(cnx, 1);
fail("it should call : resendMessages() => which should throw above mocked exception");
}catch(IllegalStateException e) {
assertEquals(exc, e.getMessage());
}

producer.close();
consumer.close();
producer = null; // clean reference of mocked producer
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.slf4j.LoggerFactory;

import io.netty.buffer.ByteBuf;
import io.netty.resolver.InetSocketAddressResolver;

public class BinaryProtoLookupService implements LookupService {

Expand All @@ -49,7 +50,10 @@ public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, bool
URI uri;
try {
uri = new URI(serviceUrl);
this.serviceAddress = new InetSocketAddress(uri.getHost(), uri.getPort());

// Don't attempt to resolve the hostname in DNS at this point. It will be done each time when attempting to
// connect
this.serviceAddress = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
} catch (Exception e) {
log.error("Invalid service-url {} provided {}", serviceUrl, e.getMessage(), e);
throw new PulsarClientException.InvalidServiceURL(e);
Expand All @@ -59,7 +63,8 @@ public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, bool
/**
* Calls broker binaryProto-lookup api to find broker-service address which can serve a given topic.
*
* @param destination: topic-name
* @param destination:
* topic-name
* @return broker-socket-address that serves given topic
*/
public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(DestinationName destination) {
Expand All @@ -74,7 +79,6 @@ public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(D
return getPartitionedTopicMetadata(serviceAddress, destination);
}


private CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> findBroker(InetSocketAddress socketAddress,
boolean authoritative, DestinationName destination) {
CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> addressFuture = new CompletableFuture<>();
Expand All @@ -93,7 +97,7 @@ private CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> findBroker
uri = new URI(serviceUrl);
}

InetSocketAddress responseBrokerAddress = new InetSocketAddress(uri.getHost(), uri.getPort());
InetSocketAddress responseBrokerAddress = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());

// (2) redirect to given address if response is: redirect
if (lookupDataResult.redirect) {
Expand Down Expand Up @@ -138,7 +142,6 @@ private CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> findBroker
return addressFuture;
}


private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(InetSocketAddress socketAddress,
DestinationName destination) {

Expand Down Expand Up @@ -170,7 +173,7 @@ private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
}

public String getServiceUrl() {
return serviceAddress.toString();
return serviceAddress.toString();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult;
import org.apache.pulsar.common.api.Commands;
Expand Down Expand Up @@ -87,14 +88,12 @@ enum State {
None, SentConnectFrame, Ready
}

public ClientCnx(PulsarClientImpl pulsarClient) {
public ClientCnx(ClientConfiguration conf, EventLoopGroup eventLoopGroup) {
super(30, TimeUnit.SECONDS);
this.pendingLookupRequestSemaphore = new Semaphore(pulsarClient.getConfiguration().getConcurrentLookupRequest(),
true);
this.authentication = pulsarClient.getConfiguration().getAuthentication();
this.eventLoopGroup = pulsarClient.eventLoopGroup();
this.maxNumberOfRejectedRequestPerConnection = pulsarClient.getConfiguration()
.getMaxNumberOfRejectedRequestPerConnection();
this.pendingLookupRequestSemaphore = new Semaphore(conf.getConcurrentLookupRequest(), true);
this.authentication = conf.getAuthentication();
this.eventLoopGroup = eventLoopGroup;
this.maxNumberOfRejectedRequestPerConnection = conf.getMaxNumberOfRejectedRequestPerConnection();
this.state = State.None;
}

Expand Down
Loading