Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test cases for unsubscribe of Hystrix commands before terminal events #1198

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 45 additions & 51 deletions hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ public void call(Subscriber<? super R> observer) {
/* determine if we're allowed to execute */
if (circuitBreaker.allowRequest()) {
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
// acquire a permit
if (executionSemaphore.tryAcquire()) {
try {
Expand All @@ -393,10 +394,21 @@ public void call() {
// release the semaphore
// this is done here instead of below so that the acquire/release happens where it is guaranteed
// and not affected by the conditional circuit-breaker checks, timeouts, etc
executionSemaphore.release();
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}

}
}).unsafeSubscribe(observer);
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
})
.unsafeSubscribe(observer);
} catch (RuntimeException e) {
observer.onError(e);
}
Expand Down Expand Up @@ -444,24 +456,29 @@ public Observable<R> call(Throwable t) {

});

// any final cleanup needed
o = o.doOnTerminate(new Action0() {
final AtomicBoolean commandCleanupExecuted = new AtomicBoolean(false);
final Action0 commandCleanup = new Action0() {

@Override
public void call() {
Reference<TimerListener> tl = timeoutTimer.get();
if (tl != null) {
tl.clear();
}
if (commandCleanupExecuted.compareAndSet(false, true)) {
Reference<TimerListener> tl = timeoutTimer.get();
if (tl != null) {
tl.clear();
}

long userThreadLatency = System.currentTimeMillis() - executionResult.getStartTimestamp();
executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency);
metrics.markCommandDone(executionResult, commandKey, threadPoolKey);
// record that we're completed
isExecutionComplete.set(true);
long userThreadLatency = System.currentTimeMillis() - executionResult.getStartTimestamp();
executionResult = executionResult.markUserThreadCompletion((int) userThreadLatency);
metrics.markCommandDone(executionResult, commandKey, threadPoolKey);
// record that we're completed
isExecutionComplete.set(true);
}
}

});
};

// any final cleanup needed
o = o.doOnTerminate(commandCleanup).doOnUnsubscribe(commandCleanup);

// put in cache
if (requestCacheEnabled) {
Expand Down Expand Up @@ -546,15 +563,6 @@ public Boolean call() {
}
}

run = run.doOnEach(new Action1<Notification<? super R>>() {

@Override
public void call(Notification<? super R> n) {
setRequestContextIfNeeded(currentRequestContext);
}


});
if (properties.executionTimeoutEnabled().get()) {
run = run.lift(new HystrixObservableTimeoutOperator<R>(_self));
}
Expand Down Expand Up @@ -652,8 +660,6 @@ public void call(Notification<? super R> n) {
}).doOnTerminate(new Action0() {
@Override
public void call() {
//if the command timed out, then we've reached this point in the calling thread
//but the Hystrix thread is still doing work. Let it handle these markers.
if (!isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT)) {
handleThreadEnd();
}
Expand Down Expand Up @@ -684,7 +690,6 @@ public void call() {
//to handle these markers. Otherwise, the calling thread will perform these for us.
if (isCommandTimedOut.get().equals(TimedOutStatus.TIMED_OUT)) {
handleThreadEnd();

}
}
});
Expand Down Expand Up @@ -730,6 +735,7 @@ private Observable<R> getFallbackOrThrowException(final HystrixEventType eventTy
final AbstractCommand<R> _cmd = this;

final TryableSemaphore fallbackSemaphore = getFallbackSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);

Observable<R> fallbackExecutionChain;

Expand All @@ -755,7 +761,18 @@ private Observable<R> getFallbackOrThrowException(final HystrixEventType eventTy

@Override
public void call() {
fallbackSemaphore.release();
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
fallbackSemaphore.release();
}
}
})
.doOnUnsubscribe(new Action0() {

@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
fallbackSemaphore.release();
}
}
});
} else {
Expand Down Expand Up @@ -817,21 +834,6 @@ public Observable<R> call(Throwable t) {
}
}

}).doOnTerminate(new Action0() {

@Override
public void call() {
// record that we're completed (to handle non-successful events we do it here as well as at the end of executeCommand
isExecutionComplete.set(true);
}

}).doOnEach(new Action1<Notification<? super R>>() {

@Override
public void call(Notification<? super R> n) {
setRequestContextIfNeeded(currentRequestContext);
}

});
} else {
/* fallback is disabled so throw HystrixRuntimeException */
Expand All @@ -844,15 +846,7 @@ public void call(Notification<? super R> n) {
}
}

return fallbackLogicApplied.doOnTerminate(new Action0() {

@Override
public void call() {
// record that we're completed (to handle non-successful events we do it here as well as at the end of executeCommand
isExecutionComplete.set(true);
}

}).doOnEach(new Action1<Notification<? super R>>() {
return fallbackLogicApplied.doOnEach(new Action1<Notification<? super R>>() {

@Override
public void call(Notification<? super R> n) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1489,7 +1489,6 @@ public void call(C command) {
********************* END SEMAPHORE-ISOLATED Execution Hook Tests ***********************************
*/


/**
* Abstract methods defining a way to instantiate each of the described commands.
* {@link HystrixCommandTest} and {@link HystrixObservableCommandTest} should each provide concrete impls for
Expand Down
137 changes: 137 additions & 0 deletions hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -2980,6 +2984,139 @@ protected Integer run() throws Exception {
assertFalse(executionAttempted.get());
}

@Test
public void testEarlyUnsubscribeDuringExecution() {
class AsyncCommand extends HystrixCommand<Boolean> {

public AsyncCommand() {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ASYNC")));
}

@Override
protected Boolean run() {
try {
Thread.sleep(100);
return true;
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}

HystrixCommand<Boolean> cmd = new AsyncCommand();

final CountDownLatch latch = new CountDownLatch(1);

Observable<Boolean> o = cmd.toObservable();
Subscription s = o.
doOnUnsubscribe(new Action0() {
@Override
public void call() {
System.out.println("OnUnsubscribe");
latch.countDown();
}
}).
subscribe(new Subscriber<Boolean>() {
@Override
public void onCompleted() {
System.out.println("OnCompleted");
latch.countDown();
}

@Override
public void onError(Throwable e) {
System.out.println("OnError : " + e);
}

@Override
public void onNext(Boolean b) {
System.out.println("OnNext : " + b);
}
});

try {
s.unsubscribe();
assertTrue(latch.await(200, TimeUnit.MILLISECONDS));
assertEquals("Number of execution semaphores in use", 0, cmd.getExecutionSemaphore().getNumberOfPermitsUsed());
assertEquals("Number of fallback semaphores in use", 0, cmd.getFallbackSemaphore().getNumberOfPermitsUsed());
assertTrue(cmd.isExecutionComplete());
assertTrue(cmd.isExecutedInThread());
System.out.println("EventCounts : " + cmd.getEventCounts());
System.out.println("Execution Time : " + cmd.getExecutionTimeInMilliseconds());
System.out.println("Is Successful : " + cmd.isSuccessfulExecution());
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}

@Test
public void testEarlyUnsubscribeDuringFallback() {
class AsyncCommand extends HystrixCommand<Boolean> {

public AsyncCommand() {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ASYNC")));
}

@Override
protected Boolean run() {
throw new RuntimeException("run failure");
}

@Override
protected Boolean getFallback() {
try {
Thread.sleep(100);
return false;
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}

HystrixCommand<Boolean> cmd = new AsyncCommand();

final CountDownLatch latch = new CountDownLatch(1);

Observable<Boolean> o = cmd.toObservable();
Subscription s = o.
doOnUnsubscribe(new Action0() {
@Override
public void call() {
System.out.println("OnUnsubscribe");
latch.countDown();
}
}).
subscribe(new Subscriber<Boolean>() {
@Override
public void onCompleted() {
System.out.println("OnCompleted");
latch.countDown();
}

@Override
public void onError(Throwable e) {
System.out.println("OnError : " + e);
}

@Override
public void onNext(Boolean b) {
System.out.println("OnNext : " + b);
}
});

try {
Thread.sleep(10); //give fallback a chance to fire
s.unsubscribe();
assertTrue(latch.await(200, TimeUnit.MILLISECONDS));
assertEquals("Number of execution semaphores in use", 0, cmd.getExecutionSemaphore().getNumberOfPermitsUsed());
assertEquals("Number of fallback semaphores in use", 0, cmd.getFallbackSemaphore().getNumberOfPermitsUsed());
assertTrue(cmd.isExecutionComplete());
assertTrue(cmd.isExecutedInThread());
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}

/* ******************************************************************************** */
/* ******************************************************************************** */
/* private HystrixCommand class implementations for unit testing */
Expand Down
Loading