Skip to content

Commit

Permalink
Merge pull request #604 from mattrjacobs/add-emit-events-to-hystrix-o…
Browse files Browse the repository at this point in the history
…bservable-command

Added 2 new HystrixEventTypes: EMIT and FALLBACK_EMIT
  • Loading branch information
mattrjacobs committed Feb 4, 2015
2 parents 464da00 + c7840a5 commit 50459a4
Show file tree
Hide file tree
Showing 6 changed files with 274 additions and 85 deletions.
103 changes: 82 additions & 21 deletions hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -551,27 +551,22 @@ public void call(Notification<? super R> n) {
}


}).lift(new HystrixObservableTimeoutOperator<>(_self)).map(new Func1<R, R>() {

boolean once = false;

}).lift(new HystrixObservableTimeoutOperator<>(_self)).doOnNext(new Action1<R>() {
@Override
public R call(R t1) {
if (!once) {
// report success
executionResult = executionResult.addEvents(HystrixEventType.SUCCESS);
once = true;
public void call(R r) {
if (shouldOutputOnNextEvents()) {
executionResult = executionResult.addEmission(HystrixEventType.EMIT);
eventNotifier.markEvent(HystrixEventType.EMIT, getCommandKey());
}
return t1;
}

}).doOnCompleted(new Action0() {

@Override
public void call() {
long duration = System.currentTimeMillis() - invocationStartTime;
metrics.addCommandExecutionTime(duration);
metrics.markSuccess(duration);
executionResult = executionResult.addEvents(HystrixEventType.SUCCESS);
circuitBreaker.markSuccess();
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) duration, executionResult.events);
}
Expand Down Expand Up @@ -781,7 +776,15 @@ private Observable<R> getFallbackOrThrowException(final HystrixEventType eventTy
executionResult = executionResult.addEvents(eventType);
final AbstractCommand<R> _cmd = this;

return getFallbackWithProtection().doOnCompleted(new Action0() {
return getFallbackWithProtection().doOnNext(new Action1<R>() {
@Override
public void call(R r) {
if (shouldOutputOnNextEvents()) {
executionResult = executionResult.addEmission(HystrixEventType.FALLBACK_EMIT);
eventNotifier.markEvent(HystrixEventType.FALLBACK_EMIT, getCommandKey());
}
}
}).doOnCompleted(new Action0() {

@Override
public void call() {
Expand Down Expand Up @@ -886,6 +889,16 @@ protected void handleThreadEnd() {
}
}

/**
*
* @return if onNext events should be reported on
* This affects {@link HystrixRequestLog}, and {@link HystrixEventNotifier} currently.
* Metrics/hooks will be affected once they are in place
*/
protected boolean shouldOutputOnNextEvents() {
return false;
}

private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {

final AbstractCommand<R> originalCommand;
Expand Down Expand Up @@ -1402,32 +1415,36 @@ protected static class ExecutionResult {
private final int executionTime;
private final Exception exception;
private final long commandRunStartTimeInNanos;
private final int numEmissions;
private final int numFallbackEmissions;

private ExecutionResult(HystrixEventType... events) {
this(Arrays.asList(events), -1, null);
this(Arrays.asList(events), -1, null, 0, 0);
}

public ExecutionResult setExecutionTime(int executionTime) {
return new ExecutionResult(events, executionTime, exception);
return new ExecutionResult(events, executionTime, exception, numEmissions, numFallbackEmissions);
}

public ExecutionResult setException(Exception e) {
return new ExecutionResult(events, executionTime, e);
return new ExecutionResult(events, executionTime, e, numEmissions, numFallbackEmissions);
}

private ExecutionResult(List<HystrixEventType> events, int executionTime, Exception e) {
private ExecutionResult(List<HystrixEventType> events, int executionTime, Exception e, int numEmissions, int numFallbackEmissions) {
// we are safe assigning the List reference instead of deep-copying
// because we control the original list in 'newEvent'
this.events = events;
this.executionTime = executionTime;
if (executionTime >= 0 ) {
this.commandRunStartTimeInNanos = System.nanoTime() - this.executionTime*1000*1000; // 1000*1000 will conver the milliseconds to nanoseconds
this.commandRunStartTimeInNanos = System.nanoTime() - this.executionTime*1000*1000; // 1000*1000 will convert the milliseconds to nanoseconds
}
else {
this.commandRunStartTimeInNanos = -1;
}
this.exception = e;

this.numEmissions = numEmissions;
this.numFallbackEmissions = numFallbackEmissions;
}

// we can return a static version since it's immutable
Expand All @@ -1440,10 +1457,14 @@ private ExecutionResult(List<HystrixEventType> events, int executionTime, Except
* @return
*/
public ExecutionResult addEvents(HystrixEventType... events) {
ArrayList<HystrixEventType> newEvents = new ArrayList<>();
newEvents.addAll(this.events);
Collections.addAll(newEvents, events);
return new ExecutionResult(Collections.unmodifiableList(newEvents), executionTime, exception);
return new ExecutionResult(getUpdatedList(this.events, events), executionTime, exception, numEmissions, numFallbackEmissions);
}

private static List<HystrixEventType> getUpdatedList(List<HystrixEventType> currentList, HystrixEventType... newEvents) {
ArrayList<HystrixEventType> updatedEvents = new ArrayList<>();
updatedEvents.addAll(currentList);
Collections.addAll(updatedEvents, newEvents);
return Collections.unmodifiableList(updatedEvents);
}

public int getExecutionTime() {
Expand All @@ -1456,6 +1477,28 @@ public int getExecutionTime() {
public Exception getException() {
return exception;
}

/**
* This method may be called many times for {@code HystrixEventType.EMIT} and {@link HystrixEventType.FALLBACK_EMIT}.
* To save on storage, on the first time we see that event type, it gets added to the event list, and the count gets incremented.
* @param eventType emission event
* @return "updated" {@link ExecutionResult}
*/
public ExecutionResult addEmission(HystrixEventType eventType) {
switch (eventType) {
case EMIT: if (events.contains(HystrixEventType.EMIT)) {
return new ExecutionResult(events, executionTime, exception, numEmissions + 1, numFallbackEmissions);
} else {
return new ExecutionResult(getUpdatedList(this.events, HystrixEventType.EMIT), executionTime, exception, numEmissions +1, numFallbackEmissions);
}
case FALLBACK_EMIT: if (events.contains(HystrixEventType.FALLBACK_EMIT)) {
return new ExecutionResult(events, executionTime, exception, numEmissions, numFallbackEmissions + 1);
} else {
return new ExecutionResult(getUpdatedList(this.events, HystrixEventType.FALLBACK_EMIT), executionTime, exception, numEmissions, numFallbackEmissions + 1);
}
default: return this;
}
}
}

/* ******************************************************************************** */
Expand Down Expand Up @@ -1610,6 +1653,24 @@ public List<HystrixEventType> getExecutionEvents() {
return executionResult.events;
}

/**
* Number of emissions of the execution of a command. Only interesting in the streaming case.
* @return number of <code>OnNext</code> emissions by a streaming command
*/
@Override
public int getNumberEmissions() {
return executionResult.numEmissions;
}

/**
* Number of emissions of the execution of a fallback. Only interesting in the streaming case.
* @return number of <code>OnNext</code> emissions by a streaming fallback
*/
@Override
public int getNumberFallbackEmissions() {
return executionResult.numFallbackEmissions;
}

/**
* The execution time of this command instance in milliseconds, or -1 if not executed.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@
* These are most often accessed via {@link HystrixRequestLog} or {@link HystrixCommand#getExecutionEvents()}.
*/
public enum HystrixEventType {
SUCCESS, FAILURE, TIMEOUT, SHORT_CIRCUITED, THREAD_POOL_REJECTED, SEMAPHORE_REJECTED, FALLBACK_SUCCESS, FALLBACK_FAILURE, FALLBACK_REJECTION, EXCEPTION_THROWN, RESPONSE_FROM_CACHE, COLLAPSED, BAD_REQUEST
EMIT, SUCCESS, FAILURE, TIMEOUT, SHORT_CIRCUITED, THREAD_POOL_REJECTED, SEMAPHORE_REJECTED, FALLBACK_EMIT, FALLBACK_SUCCESS, FALLBACK_FAILURE, FALLBACK_REJECTION, EXCEPTION_THROWN, RESPONSE_FROM_CACHE, COLLAPSED, BAD_REQUEST
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ public interface HystrixInvokableInfo<R> {

public List<HystrixEventType> getExecutionEvents();

public int getNumberEmissions();

public int getNumberFallbackEmissions();

public int getExecutionTimeInMilliseconds();

public long getCommandRunStartTimeInNanos();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy;
import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook;
import com.netflix.hystrix.strategy.properties.HystrixPropertiesStrategy;
import com.netflix.hystrix.strategy.eventnotifier.HystrixEventNotifier;

/**
* Used to wrap code that will execute potentially risky functionality (typically meaning a service call over the network)
Expand Down Expand Up @@ -49,6 +50,18 @@ protected HystrixObservableCommand(HystrixCommandGroupKey group) {
this(new Setter(group));
}

/**
*
* Overridden to true so that all onNext emissions are captured
*
* @return if onNext events should be reported on
* This affects {@link HystrixRequestLog}, and {@link HystrixEventNotifier} currently. Metrics/Hooks later
*/
@Override
protected boolean shouldOutputOnNextEvents() {
return true;
}

/**
* Construct a {@link HystrixObservableCommand} with defined {@link Setter} that allows injecting property and strategy overrides and other optional arguments.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,24 @@ public Collection<HystrixInvokableInfo<?>> getAllExecutedCommands() {
* <li>TestCommand[FAILURE][1ms]</li>
* <li>TestCommand[THREAD_POOL_REJECTED][1ms]</li>
* <li>TestCommand[THREAD_POOL_REJECTED, FALLBACK_SUCCESS][1ms]</li>
* <li>TestCommand[EMIT, SUCCESS][1ms]</li>
* <li>TestCommand[EMITx5, SUCCESS][1ms]</li>
* <li>TestCommand[EMITx5, FAILURE, FALLBACK_EMITx6, FALLBACK_FAILURE][100ms]</li>
* <li>TestCommand[FAILURE, FALLBACK_SUCCESS][1ms], TestCommand[FAILURE, FALLBACK_SUCCESS, RESPONSE_FROM_CACHE][1ms]x4</li>
* <li>GetData[SUCCESS][1ms], PutData[SUCCESS][1ms], GetValues[SUCCESS][1ms], GetValues[SUCCESS, RESPONSE_FROM_CACHE][1ms], TestCommand[FAILURE, FALLBACK_FAILURE][1ms], TestCommand[FAILURE,
* FALLBACK_FAILURE, RESPONSE_FROM_CACHE][1ms]</li>
* </ul>
* <p>
* If a command has a multiplier such as <code>x4</code> that means this command was executed 4 times with the same events. The time in milliseconds is the sum of the 4 executions.
* If a command has a multiplier such as <code>x4</code>, that means this command was executed 4 times with the same events. The time in milliseconds is the sum of the 4 executions.
* <p>
* For example, <code>TestCommand[SUCCESS][15ms]x4</code> represents TestCommand being executed 4 times and the sum of those 4 executions was 15ms. These 4 each executed the run() method since
* <code>RESPONSE_FROM_CACHE</code> was not present as an event.
*
*
* If an EMIT or FALLBACK_EMIT has a multiplier such as <code>x5</code>, that means a <code>HystrixObservableCommand</code> was used and it emitted that number of <code>OnNext</code>s.
* <p>
* For example, <code>TestCommand[EMITx5, FAILURE, FALLBACK_EMITx6, FALLBACK_FAILURE][100ms]</code> represents TestCommand executing observably, emitted 5 <code>OnNext</code>s, then an <code>OnError</code>.
* This command also has an Observable fallback, and it emits 6 <code>OnNext</code>s, then an <code>OnCompleted</code>.
*
* @return String request log or "Unknown" if unable to instead of throwing an exception.
*/
public String getExecutedCommandsAsString() {
Expand All @@ -179,7 +187,26 @@ public String getExecutedCommandsAsString() {
//replicate functionality of Arrays.toString(events.toArray()) to append directly to existing StringBuilder
builder.append("[");
for (HystrixEventType event : events) {
builder.append(event).append(", ");
switch (event) {
case EMIT:
int numEmissions = command.getNumberEmissions();
if (numEmissions > 1) {
builder.append(event).append("x").append(numEmissions).append(", ");
} else {
builder.append(event).append(", ");
}
break;
case FALLBACK_EMIT:
int numFallbackEmissions = command.getNumberFallbackEmissions();
if (numFallbackEmissions > 1) {
builder.append(event).append("x").append(numFallbackEmissions).append(", ");
} else {
builder.append(event).append(", ");
}
break;
default:
builder.append(event).append(", ");
}
}
builder.setCharAt(builder.length() - 2, ']');
builder.setLength(builder.length() - 1);
Expand Down
Loading

0 comments on commit 50459a4

Please sign in to comment.