Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix flaky test testDuplicateConcurrentSubscribeCommand #11089

Conversation

hangc0276
Copy link
Contributor

Motivation

When running test, it get the following exception accidentally

Error:  testDuplicateConcurrentSubscribeCommand(org.apache.pulsar.broker.service.ServerCnxTest)  Time elapsed: 0.75 s  <<< FAILURE!
java.lang.AssertionError: Response is not CommandError but org.apache.pulsar.common.api.proto.CommandSuccess@6d68db expected [true] but found [false]
	at org.testng.Assert.fail(Assert.java:99)
	at org.testng.Assert.failNotEquals(Assert.java:1037)
	at org.testng.Assert.assertTrue(Assert.java:45)
	at org.apache.pulsar.broker.service.ServerCnxTest.testDuplicateConcurrentSubscribeCommand(ServerCnxTest.java:767)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:132)
	at org.testng.internal.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:45)
	at org.testng.internal.InvokeMethodRunnable.call(InvokeMethodRunnable.java:73)
	at org.testng.internal.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

The reason is in the test logic, it subscribe the topic in Exclusive mode first, and then subscribe second time and check the subscribe status. If the first subscribe status returns faster than the second subscribe, it will get the above exception.

Modifications

  1. Use Awaitility.await().untilAsserted to wait the second subscribe status return, otherwise the test will be terminated by test timeout(30s)
  2. format ServerCnx#handleSubscribe code style

@mattisonchao
Copy link
Member

mattisonchao commented Jun 25, 2021

If the first subscription status returns faster than the second subscription, the second subscription will always return sendSuccessResponse, because the first subscription is completed and will not cause an exception, then they will enter this if branch:

ServerCnx.java  953:961

                         if (existingConsumerFuture != null) {
                            if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
                                Consumer consumer = existingConsumerFuture.getNow(null);
                                log.info("[{}] Consumer with the same id is already created:"
                                         + " consumerId={}, consumer={}",
                                         remoteAddress, consumerId, consumer);
                                commandSender.sendSuccessResponse(requestId);
                                return null;
                            } else { //... some code}

So I don’t think we need Awaitility,

Do I understand it right? : )

@codelipenghui codelipenghui merged commit b71bc65 into apache:master Jun 25, 2021
bharanic-dev pushed a commit to bharanic-dev/pulsar that referenced this pull request Mar 18, 2022
### Motivation
When running test, it get the following exception accidentally
```
Error:  testDuplicateConcurrentSubscribeCommand(org.apache.pulsar.broker.service.ServerCnxTest)  Time elapsed: 0.75 s  <<< FAILURE!
java.lang.AssertionError: Response is not CommandError but org.apache.pulsar.common.api.proto.CommandSuccess@6d68db expected [true] but found [false]
	at org.testng.Assert.fail(Assert.java:99)
	at org.testng.Assert.failNotEquals(Assert.java:1037)
	at org.testng.Assert.assertTrue(Assert.java:45)
	at org.apache.pulsar.broker.service.ServerCnxTest.testDuplicateConcurrentSubscribeCommand(ServerCnxTest.java:767)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:132)
	at org.testng.internal.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:45)
	at org.testng.internal.InvokeMethodRunnable.call(InvokeMethodRunnable.java:73)
	at org.testng.internal.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
```
The reason is in the test logic, it subscribe the topic in `Exclusive` mode first, and then subscribe second time and check the subscribe status. If the first subscribe status returns faster than the second subscribe, it will get the above exception.

### Modifications
1. Use `Awaitility.await().untilAsserted` to wait the second subscribe status return, otherwise the test will be terminated by test timeout(30s)
2. format `ServerCnx#handleSubscribe` code style
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants