Skip to content

Commit

Permalink
[improve][misc] reduce WhiteBox usage (apache#17578)
Browse files Browse the repository at this point in the history
Master Issue: apache#16912

Remove `WhiteBox` usage in `pulsar-client` and `pulsar-client-tools`. We then have only usages in `pulsar-broker`, `pulsar-io-elastic-search`, and `testmocks`.

cc @nicoloboschi @lhotari @Shoothzj @eolivelli 

### Documentation

<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->

- [ ] `doc-required` 
(Your PR needs to update docs and you will update later)

- [x] `doc-not-needed` 
(Please explain why)

- [ ] `doc` 
(Your PR contains doc changes)

- [ ] `doc-complete`
(Docs have been already added)
  • Loading branch information
tisonkun authored Sep 10, 2022
1 parent 736aefe commit 06c14cb
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ public static class RootParams {

protected JCommander jcommander;
IUsageFormatter usageFormatter;
CmdProduce produceCommand;
CmdConsume consumeCommand;
protected CmdProduce produceCommand;
protected CmdConsume consumeCommand;
CmdGenerateDocumentation generateDocumentation;

public PulsarClientTool(Properties properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.jline.reader.impl.LineReaderImpl;
import org.jline.terminal.Terminal;
import org.jline.terminal.TerminalBuilder;
import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -110,11 +109,11 @@ protected PulsarAdminBuilder createAdminBuilder(Properties properties) {

@Override
protected ClientShell createClientShell(Properties properties) {
final ClientShell clientShell = new ClientShell(properties);
final CmdProduce cmdProduce = mock(CmdProduce.class);
cmdProduceHolder.set(cmdProduce);
Whitebox.setInternalState(clientShell, "produceCommand", cmdProduceHolder.get());
return clientShell;
return new ClientShell(properties) {{
this.produceCommand = cmdProduce;
}};
}

@Override
Expand All @@ -127,7 +126,7 @@ protected void exit(int exitCode) {
}

private static class SystemExitCalledException extends RuntimeException {
private int code;
private final int code;

public SystemExitCalledException(int code) {
this.code = code;
Expand All @@ -145,7 +144,7 @@ public void setup() throws Exception {


@Test
public void testInteractiveMode() throws Exception{
public void testInteractiveMode() throws Exception {
Terminal terminal = TerminalBuilder.builder().build();
final MockLineReader linereader = new MockLineReader(terminal);

Expand All @@ -163,7 +162,7 @@ public void testInteractiveMode() throws Exception{
}

@Test
public void testFileMode() throws Exception{
public void testFileMode() throws Exception {
Terminal terminal = TerminalBuilder.builder().build();
final MockLineReader linereader = new MockLineReader(terminal);
final Properties props = new Properties();
Expand Down Expand Up @@ -194,7 +193,7 @@ public void testFileModeExitOnError() throws Exception {
try {
testPulsarShell.run((a) -> linereader, (a) -> terminal);
fail();
} catch (SystemExitCalledException ex) {
} catch (SystemExitCalledException ex) {
assertEquals(ex.code, 1);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,11 @@ public EventLoopGroup eventLoopGroup() {
return eventLoopGroup;
}

@VisibleForTesting
void setLookup(LookupService lookup) {
this.lookup = lookup;
}

public LookupService getLookup() {
return lookup;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotSame;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
Expand All @@ -39,7 +38,6 @@
import io.netty.channel.EventLoopGroup;
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.DefaultThreadFactory;

import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
Expand All @@ -49,7 +47,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadFactory;
import java.util.regex.Pattern;

import lombok.Cleanup;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.PulsarClientException;
Expand All @@ -64,37 +61,22 @@
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.mockito.Mockito;
import org.powermock.reflect.Whitebox;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/**
* PulsarClientImpl unit tests.
*/
public class PulsarClientImplTest {
private PulsarClientImpl clientImpl;
private EventLoopGroup eventLoopGroup;

@BeforeMethod(alwaysRun = true)
public void setup() throws PulsarClientException {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl("pulsar://localhost:6650");
initializeEventLoopGroup(conf);
clientImpl = new PulsarClientImpl(conf, eventLoopGroup);
}

private void initializeEventLoopGroup(ClientConfigurationData conf) {
ThreadFactory threadFactory = new DefaultThreadFactory("client-test-stats", Thread.currentThread().isDaemon());
eventLoopGroup = EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), false, threadFactory);
}

@AfterMethod(alwaysRun = true)
public void teardown() throws Exception {
if (clientImpl != null) {
clientImpl.close();
clientImpl = null;
}
if (eventLoopGroup != null) {
eventLoopGroup.shutdownGracefully().get();
eventLoopGroup = null;
Expand All @@ -103,9 +85,13 @@ public void teardown() throws Exception {

@Test
public void testIsClosed() throws Exception {
assertFalse(clientImpl.isClosed());
clientImpl.close();
assertTrue(clientImpl.isClosed());
ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl("pulsar://localhost:6650");
initializeEventLoopGroup(conf);
PulsarClientImpl client = new PulsarClientImpl(conf, eventLoopGroup);
assertFalse(client.isClosed());
client.close();
assertTrue(client.isClosed());
}

@Test
Expand Down Expand Up @@ -138,8 +124,11 @@ public void testConsumerIsClosed() throws Exception {
.thenReturn(CompletableFuture.completedFuture(mock(ProducerResponse.class)));
when(pool.getConnection(any(InetSocketAddress.class), any(InetSocketAddress.class)))
.thenReturn(CompletableFuture.completedFuture(cnx));
Whitebox.setInternalState(clientImpl, "cnxPool", pool);
Whitebox.setInternalState(clientImpl, "lookup", lookup);

ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl("pulsar://localhost:6650");
PulsarClientImpl client = new PulsarClientImpl(conf, eventLoopGroup, pool);
client.setLookup(lookup);

List<ConsumerBase<byte[]>> consumers = new ArrayList<>();
/**
Expand All @@ -148,24 +137,24 @@ public void testConsumerIsClosed() throws Exception {
ConsumerConfigurationData<byte[]> consumerConf0 = new ConsumerConfigurationData<>();
consumerConf0.setSubscriptionName("test-subscription0");
consumerConf0.setTopicsPattern(Pattern.compile("test-topic"));
consumers.add((ConsumerBase) clientImpl.subscribeAsync(consumerConf0).get());
consumers.add((ConsumerBase<byte[]>) client.subscribeAsync(consumerConf0).get());
/**
* {@link org.apache.pulsar.client.impl.PulsarClientImpl#singleTopicSubscribeAsync}
*/
ConsumerConfigurationData<byte[]> consumerConf1 = new ConsumerConfigurationData<>();
consumerConf1.setSubscriptionName("test-subscription1");
consumerConf1.setTopicNames(Collections.singleton("test-topic"));
consumers.add((ConsumerBase) clientImpl.subscribeAsync(consumerConf1).get());
consumers.add((ConsumerBase<byte[]>) client.subscribeAsync(consumerConf1).get());
/**
* {@link org.apache.pulsar.client.impl.PulsarClientImpl#multiTopicSubscribeAsync}
*/
ConsumerConfigurationData<byte[]> consumerConf2 = new ConsumerConfigurationData<>();
consumerConf2.setSubscriptionName("test-subscription2");
consumers.add((ConsumerBase) clientImpl.subscribeAsync(consumerConf2).get());
consumers.add((ConsumerBase<byte[]>) client.subscribeAsync(consumerConf2).get());

consumers.forEach(consumer ->
assertNotSame(consumer.getState(), HandlerState.State.Closed));
clientImpl.close();
client.close();
consumers.forEach(consumer ->
assertSame(consumer.getState(), HandlerState.State.Closed));
}
Expand Down Expand Up @@ -199,27 +188,22 @@ public void testInitializeWithTimer() throws PulsarClientException {
client.timer().stop();
}

@Test(expectedExceptions = PulsarClientException.class)
@Test(expectedExceptions = PulsarClientException.InvalidConfigurationException.class)
public void testNewTransactionWhenDisable() throws Exception {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl("pulsar://localhost:6650");
conf.setEnableTransaction(false);
PulsarClientImpl pulsarClient = null;
try {
pulsarClient = new PulsarClientImpl(conf);
} catch (PulsarClientException e) {
e.printStackTrace();
try (PulsarClientImpl client = new PulsarClientImpl(conf)) {
client.newTransaction();
}
pulsarClient.newTransaction();
}

@Test
public void testResourceCleanup() throws PulsarClientException {
ClientConfigurationData conf = clientImpl.conf;
public void testResourceCleanup() throws Exception {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl("");
initializeEventLoopGroup(conf);
ConnectionPool connectionPool = new ConnectionPool(conf, eventLoopGroup);
try {
try (ConnectionPool connectionPool = new ConnectionPool(conf, eventLoopGroup)) {
assertThrows(() -> new PulsarClientImpl(conf, eventLoopGroup, connectionPool));
} finally {
// Externally passed eventLoopGroup should not be shutdown.
Expand All @@ -229,7 +213,10 @@ public void testResourceCleanup() throws PulsarClientException {

@Test
public void testInitializingWithExecutorProviders() throws PulsarClientException {
ClientConfigurationData conf = clientImpl.conf;
ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl("pulsar://localhost:6650");
initializeEventLoopGroup(conf);

@Cleanup("shutdownNow")
ExecutorProvider executorProvider = new ExecutorProvider(2, "shared-executor");
@Cleanup("shutdownNow")
Expand All @@ -256,11 +243,14 @@ public void testInitializingWithExecutorProviders() throws PulsarClientException
expectedExceptionsMessageRegExp = "Both externalExecutorProvider and internalExecutorProvider must be " +
"specified or unspecified.")
public void testBothExecutorProvidersMustBeSpecified() throws PulsarClientException {
ClientConfigurationData conf = clientImpl.conf;
ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl("pulsar://localhost:6650");
initializeEventLoopGroup(conf);
@Cleanup("shutdownNow")
ExecutorProvider executorProvider = new ExecutorProvider(2, "shared-executor");
@Cleanup
PulsarClientImpl client2 = PulsarClientImpl.builder().conf(conf)
PulsarClientImpl ignore = PulsarClientImpl.builder()
.conf(conf)
.internalExecutorProvider(executorProvider)
.build();
}
Expand Down

0 comments on commit 06c14cb

Please sign in to comment.