Skip to content

Commit

Permalink
Improve the AbstracAuditor to allow implementation better handle the …
Browse files Browse the repository at this point in the history
…InterruptedException.
  • Loading branch information
becketqin authored and Jiangjie Qin committed Sep 26, 2017
1 parent 1e83f3a commit 70082d5
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void onTick(AuditStats lastStats) {
}

@Override
public void onClosed(AuditStats currentStats, AuditStats nextStats) {
public void onClosed(AuditStats currentStats, AuditStats nextStats, long timeout) {
AUDIT_LOG.info("Logging auditing stats on closure...");
printSummary(currentStats);
printSummary(nextStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
* To use this class, users need to implement the following methods:
* <pre>
* {@link #onTick(AuditStats)}
* {@link #onClosed(AuditStats, AuditStats)}
* {@link #onClosed(AuditStats, AuditStats, long)}
* {@link #createAuditStats()}
* {@link #getAuditKey(Object, String, Long, Long, Long, AuditType)}
* {@link #auditToken(Object, Object)}
Expand Down Expand Up @@ -86,8 +86,11 @@ public abstract class AbstractAuditor<K, V> extends Thread implements Auditor<K,
private volatile long _ticks;

// The shutdown flag and latches.
protected final AtomicBoolean _started = new AtomicBoolean(false);
protected final AtomicBoolean _shutdown = new AtomicBoolean(false);
private final AtomicBoolean _started = new AtomicBoolean(false);
private final Object _shutdownLock = new Object();
private volatile boolean _shutdown;
// The shutdown flag and deadline.
private volatile long _shutdownDeadline;

/**
* Construct the abstract auditor.
Expand Down Expand Up @@ -124,24 +127,22 @@ public void configure(Map<String, ?> configs) {
_nextTick = _enableAutoTick ?
(_time.milliseconds() / _reportingIntervalMs) * _reportingIntervalMs + _reportingIntervalMs : Long.MAX_VALUE;
_ticks = 0;
_shutdown = false;
_shutdownDeadline = -1L;
}

@Override
public void run() {
if (_enableAutoTick) {
LOG.info("Starting auditor...");
try {
while (!_shutdown.get()) {
while (!_shutdown) {
try {
long now = _time.milliseconds();
if (now >= _nextTick + _reportingDelayMs) {
tick();
}
try {
Thread.sleep(Math.max(0, _nextTick + _reportingDelayMs - now));
} catch (InterruptedException ie) {
// Let it go.
}
waitForNextTick(now);
} catch (Exception e) {
// We catch all the exceptions from the user's onTick() call but not exit.
LOG.error("Auditor encounter exception.", e);
Expand All @@ -150,14 +151,25 @@ public void run() {
} finally {
_currentStats.close();
_nextStats.close();
_shutdown.set(true);
onClosed(_currentStats, _nextStats);
_shutdown = true;
onClosed(_currentStats, _nextStats, Math.min(0, _shutdownDeadline - System.currentTimeMillis()));
}
} else {
LOG.info("Auto auditing is set to false. Automatic ticking is disabled.");
}
}

private void waitForNextTick(long now) {
try {
synchronized (_shutdownLock) {
if (!_shutdown) {
_shutdownLock.wait(Math.max(0, _nextTick + _reportingDelayMs - now));
}
}
} catch (InterruptedException ie) {
// Let it go.
}
}

// protected methods
/**
Expand Down Expand Up @@ -214,10 +226,23 @@ protected void tick() {
onTick(tickAndGetStats());
}

/**
* Check if the auditor is shutting down.
*
* @return true if the auditor is shutting down, false otherwise.
*/
protected boolean isShuttingDown() {
return _shutdown;
}

/**
* This method is called when a reporting interval is reached and the abstract auditor rolls out a new AuditStats.
* The old AuditStats will be closed and passed to this method as the argument. The subclass must implement this
* method to handle the AuditStats for the previous reporting interval.
* <p>
* Note that the implementation should expect to be interrupted when the auditor is shutdown and the shutdown
* timeout has passed.
* </p>
*
* @param lastStats The AuditStats of the previous reporting interval.
*/
Expand All @@ -229,8 +254,10 @@ protected void tick() {
*
* @param currentStats The stats for the current reporting period.
* @param nextStats The stats for the next reporting period.
* @param timeout The timeout to close the auditor. The onClosed() call will be interrupted if it did not return
* before timeout.
*/
public abstract void onClosed(AuditStats currentStats, AuditStats nextStats);
public abstract void onClosed(AuditStats currentStats, AuditStats nextStats, long timeout);

/**
* Create a new AuditStats. This method will be called when the abstract auditor rolls out a new AuditStat for a new
Expand Down Expand Up @@ -292,22 +319,32 @@ public void record(Object auditToken,
} catch (IllegalStateException ise) {
// Ignore this exception and retry because we might be ticking.
}
} while (!done && !_shutdown.get());
} while (!done && !_shutdown);
}

@Override
public void close(long timeout, TimeUnit unit) {
LOG.info("Closing auditor with timeout {} {}", timeout, unit);
if (_shutdown.compareAndSet(false, true)) {
interrupt();
long timeoutMillis = unit.toMillis(timeout);
long now = _time.milliseconds();
// Set shutdown flag
synchronized (_shutdownLock) {
if (!_shutdown) {
// Handle long overflow
_shutdownDeadline = Long.min(Long.MAX_VALUE - now, timeoutMillis) + now;
_shutdown = true;
_shutdownLock.notify();
}
}
try {
if (timeout > 0) {
this.join(unit.toMillis(timeout));
this.join(timeoutMillis);
}
} catch (InterruptedException e) {
LOG.warn("Auditor closure interrupted");
}
// Interrupt after timeout.
interrupt();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ public void onTick(AuditStats lastStats) {
}

@Override
public void onClosed(AuditStats currentStats, AuditStats nextStats) {
public void onClosed(AuditStats currentStats, AuditStats nextStats, long timeout) {

}

Expand Down

0 comments on commit 70082d5

Please sign in to comment.