Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HystrixCollapser completes without emitting #955

Closed
amitcse opened this issue Oct 21, 2015 · 31 comments
Closed

HystrixCollapser completes without emitting #955

amitcse opened this issue Oct 21, 2015 · 31 comments

Comments

@amitcse
Copy link

amitcse commented Oct 21, 2015

Hi, I am facing a weird issue wherein the observable returned by HystrixCollapser completes without emitting any item. Since I am using HystrixCollapser.toObservable() inside to do composition using RxJava operator, this is leading to items being missed from the data flow.
On debugging I have found that the HystrixCollapser do call the HystrixCommand.run() and mapResponsetoRequests function is also called.
I am unable to debug this more because this behaviour is completely random.

For reference, my rx observable composition is something like this,

Observable.from(id1, id2, id3, ....)
.flatMap(id -> collapser1(id))
.flatMap(collapser1Result -> {
    collapser2Observable = collapser2(collapser1Result);
    collapser3Observable = collapser3(collapser1Result);
    collapser4Observable = collapser4(collapser1Result);

    return Observable.zip(collapser2Observable, collapser3Observable, collapser4Observable, 
        (collapser2Res, collapser3Res, collapser4Res) 
                    -> makeFinalResult(collapser2Res, collapser3Res, collapser4Res));
})
.map(finalResult -> doSomeTransformations(finalResult))
.toList().toBlocking().single();

Since any of collapser2/3/4 completes without emitting, this lead to zip being called and corresponding related result goes missing from the final list.

@amitcse
Copy link
Author

amitcse commented Oct 21, 2015

Looks similar to #815

@amitcse amitcse closed this as completed Oct 21, 2015
@amitcse amitcse reopened this Oct 21, 2015
@mattrjacobs
Copy link
Contributor

@amitcse Are you able to provide implementations of those collapsers that would work in a unit test?

@amitcse
Copy link
Author

amitcse commented Oct 25, 2015

@mattrjacobs I don't see the HystrixObservableCollapserTest.stressTestRequestCollapser in master. Can you share this test with me so that I can try writing a test for this behaviour and also try to debug this more ?

@mattrjacobs
Copy link
Contributor

I think I removed this code when I fixed the previous HystrixObservableCollapser bug. (#890). At the time, my thought was I had replaced this success-only test with much better coverage of all paths through the collapser. If you're able to reproduce your bug, please let me know

  @Test
    public void testTwoRequests() throws Exception {
        HystrixMockedTime time = new HystrixMockedTime();
        TestCollapserTimer timer = new TestCollapserTimer(time);
        HystrixObservableCollapser<String, String, String, String> collapser1 = new TestRequestCollapser(timer, 1);
        HystrixObservableCollapser<String, String, String, String> collapser2 = new TestRequestCollapser(timer, 2);
        Future<String> response1 = collapser1.observe().toBlocking().toFuture();
        Future<String> response2 = collapser2.observe().toBlocking().toFuture();
        timer.incrementTime(10); // let time pass that equals the default delay/period

        assertEquals("1", response1.get());
        assertEquals("2", response2.get());

        assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size());

        HystrixCollapserMetrics metrics = collapser1.getMetrics();
        assertSame(metrics, collapser2.getMetrics());

        time.increment(1000);
        assertEquals(2L, metrics.getRollingCount(HystrixRollingNumberEvent.COLLAPSER_REQUEST_BATCHED));
        assertEquals(1L, metrics.getRollingCount(HystrixRollingNumberEvent.COLLAPSER_BATCH));
        assertEquals(0L, metrics.getRollingCount(HystrixRollingNumberEvent.RESPONSE_FROM_CACHE));
    }

    @Test
    public void stressTestRequestCollapser() throws Exception {
        for(int i = 0; i < 100; i++) {
            init();
            testTwoRequests();
            cleanup();
        }
    }

@amitcse
Copy link
Author

amitcse commented Nov 1, 2015

Thanks. I will try this and let you know if I am able to find something.

@amitcse
Copy link
Author

amitcse commented Nov 8, 2015

@mattrjacobs Sorry for the late reply. I do see stressTestRequestCollapser failing. It is failing for me at 2000 also. For 1000 runs, it fails for some run.

Stacktrace:

Observable onError
java.util.concurrent.ExecutionException: Observable onError
    at rx.internal.operators.BlockingOperatorToFuture$2.getValue(BlockingOperatorToFuture.java:122)
    at rx.internal.operators.BlockingOperatorToFuture$2.get(BlockingOperatorToFuture.java:108)
    at com.netflix.hystrix.HystrixObservableCollapserTest.testTwoRequests(HystrixObservableCollapserTest.java:151)
    at com.netflix.hystrix.HystrixObservableCollapserTest.stressTestRequestCollapser(HystrixObservableCollapserTest.java:249)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:45)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
    at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
    at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:30)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:68)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:47)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:300)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:105)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:56)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:64)
    at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:50)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
    at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
    at org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
    at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
    at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:106)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
    at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:360)
    at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:54)
    at org.gradle.internal.concurrent.StoppableExecutorImpl$1.run(StoppableExecutorImpl.java:40)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: missing value!
    at com.netflix.hystrix.HystrixObservableCollapserTest$TestRequestCollapser.onMissingResponse(HystrixObservableCollapserTest.java:665)
    at com.netflix.hystrix.HystrixObservableCollapser$1$1.call(HystrixObservableCollapser.java:207)
    at rx.Observable$12.onError(Observable.java:4493)
    at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:65)
    at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:70)
    at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:70)
    at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$1$1.onError(OperatorOnErrorResumeNextViaFunction.java:86)
    at rx.Observable$ThrowObservable$1.call(Observable.java:9600)
    at rx.Observable$ThrowObservable$1.call(Observable.java:9590)
    at rx.Observable.unsafeSubscribe(Observable.java:7710)
    at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$1.onError(OperatorOnErrorResumeNextViaFunction.java:100)
    at com.netflix.hystrix.AbstractCommand$CommandHookApplication$1.onError(AbstractCommand.java:1334)
    at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:70)
    at com.netflix.hystrix.AbstractCommand$DeprecatedOnCompleteWithValueHookApplication$1.onError(AbstractCommand.java:1434)
    at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:70)
    at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:70)
    at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$1$1.onError(OperatorOnErrorResumeNextViaFunction.java:86)
    at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:70)
    at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:70)
    at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:70)
    at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:70)
    at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$1$1.onError(OperatorOnErrorResumeNextViaFunction.java:86)
    at rx.Observable$ThrowObservable$1.call(Observable.java:9600)
    at rx.Observable$ThrowObservable$1.call(Observable.java:9590)
    at rx.Observable.unsafeSubscribe(Observable.java:7710)
    at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$1.onError(OperatorOnErrorResumeNextViaFunction.java:100)
    at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:70)
    at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:70)
    at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:70)
    at com.netflix.hystrix.AbstractCommand$DeprecatedOnFallbackHookApplication$1.onError(AbstractCommand.java:1514)
    at com.netflix.hystrix.AbstractCommand$FallbackHookApplication$1.onError(AbstractCommand.java:1404)
    at rx.Observable$ThrowObservable$1.call(Observable.java:9600)
    at rx.Observable$ThrowObservable$1.call(Observable.java:9590)
    at rx.Observable$2.call(Observable.java:162)
    at rx.Observable$2.call(Observable.java:154)
    at rx.Observable$2.call(Observable.java:162)
    at rx.Observable$2.call(Observable.java:154)
    at rx.Observable$2.call(Observable.java:162)
    at rx.Observable$2.call(Observable.java:154)
    at rx.Observable$2.call(Observable.java:162)
    at rx.Observable$2.call(Observable.java:154)
    at rx.Observable$2.call(Observable.java:162)
    at rx.Observable$2.call(Observable.java:154)
    at rx.Observable$2.call(Observable.java:162)
    at rx.Observable$2.call(Observable.java:154)
    at rx.Observable$2.call(Observable.java:162)
    at rx.Observable$2.call(Observable.java:154)
    at rx.Observable$2.call(Observable.java:162)
    at rx.Observable$2.call(Observable.java:154)
    at rx.Observable$2.call(Observable.java:162)
    at rx.Observable$2.call(Observable.java:154)
    at rx.Observable$2.call(Observable.java:162)
    at rx.Observable$2.call(Observable.java:154)
    at rx.Observable.unsafeSubscribe(Observable.java:7710)
    at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$1.onError(OperatorOnErrorResumeNextViaFunction.java:100)
    at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:70)
    at rx.internal.operators.OperatorDoOnEach$1.onError(OperatorDoOnEach.java:70)
    at com.netflix.hystrix.AbstractCommand$HystrixObservableTimeoutOperator$1.run(AbstractCommand.java:951)
    at com.netflix.hystrix.strategy.concurrency.HystrixContextRunnable$1.call(HystrixContextRunnable.java:41)
    at com.netflix.hystrix.strategy.concurrency.HystrixContextRunnable$1.call(HystrixContextRunnable.java:37)
    at com.netflix.hystrix.strategy.concurrency.HystrixContextRunnable.run(HystrixContextRunnable.java:57)
    at com.netflix.hystrix.AbstractCommand$HystrixObservableTimeoutOperator$2.tick(AbstractCommand.java:971)
    at com.netflix.hystrix.util.HystrixTimer$1.run(HystrixTimer.java:98)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    ... 3 more

In this failure, onErrorResponse is called though. For the error that I am seeing, mapResponseToRequests succeeds, but onNext is not called.
Do you see why this test may be failing ?

@mattrjacobs
Copy link
Contributor

@amitcse That does look like a real issue. Thanks for the repro. I'll re-add this unit test and figure out whats going on

@amitranjan-fk
Copy link

Hi @mattrjacobs , I was able to reproduce this issue of HystrixCollapser emitting onComplete event without onNext event, by writing a test API in my project in the same composition manner (using sample/dummy commands and collapsers) as I have described above and doing wrk (load test) on the test endpoint at 55QPS for 45 minutes. I tried several run and the error rate was always less than 0.001%.
But I am still not able to reproduce it in the Hystrix test, maybe because when we run a for loop on the test, each run blocks for the test to finish and the number of commands/collapsers is never too high.
If I am not able to write a test for this behaviour, I will share the test with you by copying my test resource into a new project.

Also, I would like point out that the behaviour I am seeing is very similar to #257 .

@mattrjacobs
Copy link
Contributor

Interesting. Thanks for the details. I'll see if I can write a test that does more concurrent work and make the failure more likely.

@amitcse
Copy link
Author

amitcse commented Nov 18, 2015

@mattrjacobs Were you able to reproduce with concurrent test ? If you could give me any example of concurrent test, I would also try to write the test ?

@mattrjacobs
Copy link
Contributor

I haven't had time yet, but this is on my list for today. Will let you know

@mattrjacobs
Copy link
Contributor

@amitcse I just added a test that spins up m threads to each do n collapser submissions that should result in 3 emissions each.

On my local machine, with m=4, n=2 and 1000 sequential trials, I wasn't able to trigger the missing value case. I'm interested to see what happens in the CI environments.

Can you take a look at those tests, and see if there's anything you can think to add?

@amitcse
Copy link
Author

amitcse commented Nov 23, 2015

Thanks @mattrjacobs for the test. I ran the test for 50000 trials with m=10, n=5 and was able to reproduce on trial 943. Take a look at the trace. This is the same issue I am facing.

TRIAL : 942
Runnable done on thread : pool-943-thread-8
Runnable done on thread : pool-943-thread-1
Runnable done on thread : pool-943-thread-3
Runnable done on thread : pool-943-thread-7
Runnable done on thread : pool-943-thread-6
Runnable done on thread : pool-943-thread-2
Runnable done on thread : pool-943-thread-4
Runnable done on thread : pool-943-thread-9
Runnable done on thread : pool-943-thread-10
Runnable done on thread : pool-943-thread-5
Received : [0:0]
Received : [7:7]
Received : [9:9]
Received : [10:10]
Received : [11:11]
Received : [5:5]
Received : [3:3]
Received : [14:14]
Received : [6:6]
Received : [12:12]
Received : [1:1]
Received : [13:13]
Received : [19:19]
Received : [4:4]
Received : [2:2]
Received : [16:16]
Received : [17:17]
Received : [25:25]
Received : [15:15]
Received : [26:26]
Received : [27:27]
Received : [20:20]
Received : [21:21]
Received : [22:22]
Received : [23:23]
Received : [31:31]
Received : [30:30]
Received : [24:24]
Received : [18:18]
Received : [28:28]
Received : [34:34]
Received : [35:35]
Received : [33:33]
Received : [36:36]
Received : [39:39]
Received : [41:41]
Received : [37:37]
Received : [29:29]
Received : [43:43]
Received : [44:44]
Received : [38:38]
Received : [42:42]
Received : [8:8]
Received : [45:45]
Received : [46:46]
Received : [47:47]
Received : [48:48]
Received : [40:40]
Received : [32:32]
Received : [49:49]
TRIAL : 943
Runnable done on thread : pool-944-thread-4
Runnable done on thread : pool-944-thread-2
Runnable done on thread : pool-944-thread-5
Runnable done on thread : pool-944-thread-1
Runnable done on thread : pool-944-thread-7
Runnable done on thread : pool-944-thread-10
Runnable done on thread : pool-944-thread-3
Runnable done on thread : pool-944-thread-9
Runnable done on thread : pool-944-thread-8
Runnable done on thread : pool-944-thread-6
Received : []

Number of onNext events differ; expected: 1, actual: 0
java.lang.AssertionError: Number of onNext events differ; expected: 1, actual: 0
    at rx.observers.TestSubscriber.assertValueCount(TestSubscriber.java:479)
    at com.netflix.hystrix.HystrixObservableCollapserTest.testCollapserUnderConcurrency(HystrixObservableCollapserTest.java:625)
    at com.netflix.hystrix.HystrixObservableCollapserTest.testConcurrencyInTightLoop(HystrixObservableCollapserTest.java:636)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:45)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
    at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
    at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:30)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:68)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:47)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:300)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:105)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:56)
    at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:64)
    at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:50)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
    at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
    at org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
    at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
    at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:106)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
    at org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:360)
    at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:54)
    at org.gradle.internal.concurrent.StoppableExecutorImpl$1.run(StoppableExecutorImpl.java:40)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

@mattrjacobs
Copy link
Contributor

OK, I'm investigating this now.

@amitcse
Copy link
Author

amitcse commented Nov 27, 2015

@mattrjacobs Any update on this ?

@mattrjacobs
Copy link
Contributor

Not yet. I haven't been able to replicate

@amitcse
Copy link
Author

amitcse commented Dec 10, 2015

@mattrjacobs Were you able to reproduce ? Did you try the the number of trials and threads that I mentioned above ? I am able to reproduce it very easily on my machine. Can you also please check if this issue maybe occurring due to the race condition mentioned in issue 257 ?

@mattrjacobs
Copy link
Contributor

I can reproduce the failure above, but the root cause is a timeout on the underlying HystrixObservableCommand. My suspicion is that that eventually happens when a command starts executing, then doesn't get to run again due to bad luck from the thread-scheduler. This manifests as a command exception, which shows up to the caller as missing data (due to the onMissingComplete). So I think this is not relevant to any sort of concurrency issue.

The issue you referenced (#257) was fixed in RxJava 0.18.2 and picked up in Hystrix 1.3.6. So I'm confident that issue has been resolved. There may be a different concurrency issue somewhere, but I still haven't been able to reproduce that.

Since you started the issue with a concrete example of something failing, can you share that?

@amitcse
Copy link
Author

amitcse commented Dec 11, 2015

@mattrjacobs If this issue is because of HystrixObservableCommand timeout, shouldn't we get a timeout exception?

@amitcse amitcse closed this as completed Dec 11, 2015
@amitcse amitcse reopened this Dec 11, 2015
@amitcse
Copy link
Author

amitcse commented Dec 11, 2015

The failure that I am seeing is same as the concurrent test failure. There are no exception stack trace which I can share. The collapser doesn’t emit anything and the data is simply missed. We can only know this by an explicit check.

@mattrjacobs
Copy link
Contributor

@amitcse You're correct on the timeout bit. I just found a way to more reliably replicate this - by running the HystrixObservableCommand on a non-shared I/O thread, I can usually get an error in the first 400 trials. Am digging in further now

@mattrjacobs
Copy link
Contributor

I can get the non-delivery of values to go away reliably if I "speed up" the collapser. I tested 1000 trials of m=1000,n=500 many times.

I discovered a fairly large contention point in the unit test. Each collapser adds a callback to the same Archaius property, which uses a CopyOnWriteSet under the hood. This was dramatically slowing down later trials of the unit test. When I switched away from Archaius and just used non-configurable collapser properties, the issue was not reproducible.

I think the Archaius slowness is a separate issue, and well worth figuring out. I think the problem with collapsers is something around an unexpected code path when things get extremely latent. I'll keep debugging that angle.

@amitcse
Copy link
Author

amitcse commented Dec 29, 2015

@mattrjacobs Any update on finding the bug ?

@mattrjacobs
Copy link
Contributor

See comments in #1043 for some changes to address this. AFAICT, I don't see any failures of this sort anymore unless I run out of heap.

@amitcse Let me know if these changes make sense to you and if you think they should be merged.

@amitcse
Copy link
Author

amitcse commented Jan 14, 2016

@mattrjacobs These changes will help to surface the error, instead of silently missing data as happening currently. But, the issue needs to be fixed. I frequently see this error in production, and to overcome this we had to implement a retry command layer, so that failures can be masked to client. How is it being used in Netflix prod environment ? Is this issue not observed ?

@mattrjacobs
Copy link
Contributor

If you could include an example of code that's not receiving values, that would be helpful. We do use collapsing heavily in production, and we've never observed this issue ourselves.

@amitcse
Copy link
Author

amitcse commented Jan 14, 2016

My production code is very similar to the test you wrote, only that actual network calls are being made. Did the test didn't help much ?

@mattrjacobs
Copy link
Contributor

This unit test doesn't fail for me (nor in CI), so I think the problem you're seeing must be different

@amitcse
Copy link
Author

amitcse commented Jan 14, 2016

Are the tests not failing after you switched from Archaius and used non-configurable collapsers, or it never failed ? I was under the impression that the same tests are failing for you too and you are debugging that angle when things are latent. Anyway, lets merge this code so that I'll be able get the stack traces after these changes you made. Hopefully, those will help us to get to the root cause. Meanwhile, I'll see if I can write something more than those tests that may help us isolate the issue.

@mattrjacobs
Copy link
Contributor

Yes, this code is already merged in. It's still using Archaius, just caching the properties setter, so not as many callbacks get used. As a result, no synthetic slowdowns are introduced.

These tests are not failing for me

@mattrjacobs
Copy link
Contributor

Closing due to inactivity. Please re-open if there's more to discuss

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants