Skip to content

Early unsubscribe hook test #1445

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 15, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -384,13 +384,23 @@ public void call() {
if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) {
if (!_cmd.executionResult.containsTerminalEvent()) {
_cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
try {
executionHook.onUnsubscribe(_cmd);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onUnsubscribe", hookEx);
}
_cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
.addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
}
handleCommandEnd(false); //user code never ran
} else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) {
if (!_cmd.executionResult.containsTerminalEvent()) {
_cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
try {
executionHook.onUnsubscribe(_cmd);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onUnsubscribe", hookEx);
}
_cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
.addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
}
Expand Down Expand Up @@ -2169,6 +2179,11 @@ public <T> void onCacheHit(HystrixInvokable<T> commandInstance) {
actual.onCacheHit(commandInstance);
}

@Override
public <T> void onUnsubscribe(HystrixInvokable<T> commandInstance) {
actual.onUnsubscribe(commandInstance);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
private <T> HystrixCommand<T> getHystrixCommandFromAbstractIfApplicable(HystrixInvokable<T> commandInstance) {
if (commandInstance instanceof HystrixCommand) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,17 @@ public <T> void onCacheHit(HystrixInvokable<T> commandInstance) {
//do nothing by default
}

/**
* Invoked with the command is unsubscribed before a terminal state
*
* @param commandInstance The executing HystrixInvokable instance.
*
* @since 1.5.9
*/
public <T> void onUnsubscribe(HystrixInvokable<T> commandInstance) {
//do nothing by default
}

/**
* DEPRECATED: Change usages of this to {@link #onExecutionStart}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4010,6 +4010,55 @@ public void call(TestHystrixCommand<Integer> command) {
});
}

@Test
public void testExecutionHookEarlyUnsubscribe() {
System.out.println("Running command.observe(), awaiting terminal state of Observable, then running assertions...");
final CountDownLatch latch = new CountDownLatch(1);

TestHystrixCommand<Integer> command = getCommand(ExecutionIsolationStrategy.THREAD, AbstractTestHystrixCommand.ExecutionResult.SUCCESS, 1000);
Observable<Integer> o = command.observe();

Subscription s = o.
doOnUnsubscribe(new Action0() {
@Override
public void call() {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnUnsubscribe");
latch.countDown();
}
}).
subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnCompleted");
latch.countDown();
}

@Override
public void onError(Throwable e) {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnError : " + e);
latch.countDown();
}

@Override
public void onNext(Integer i) {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnNext : " + i);
}
});

try {
Thread.sleep(15);
s.unsubscribe();
latch.await(3, TimeUnit.SECONDS);
TestableExecutionHook hook = command.getBuilder().executionHook;
assertTrue(hook.commandEmissionsMatch(0, 0, 0));
assertTrue(hook.executionEventsMatch(0, 0, 0));
assertTrue(hook.fallbackEventsMatch(0, 0, 0));
assertEquals("onStart - onThreadStart - !onRunStart - onExecutionStart - onUnsubscribe - onThreadComplete - ", hook.executionSequence.toString());
} catch (Exception e) {
throw new RuntimeException(e);
}
}

/**
* Short-circuit? : NO
* Thread/semaphore: THREAD
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,14 @@ public <T> void onCacheHit(HystrixInvokable<T> commandInstance) {
recordHookCall(executionSequence, "onCacheHit");
}

@Override
public <T> void onUnsubscribe(HystrixInvokable<T> commandInstance) {
super.onUnsubscribe(commandInstance);
recordHookCall(executionSequence, "onUnsubscribe");
}

/**
* DEPRECATED METHODS FOLLOW. The string representation starts with `!D!` to distinguish
* DEPRECATED METHODS FOLLOW. The string representation starts with `!` to distinguish
*/

AtomicInteger startExecute = new AtomicInteger();
Expand Down