Skip to content

Commit

Permalink
Make auditor start and shutdown multi-thread safe. (#69)
Browse files Browse the repository at this point in the history
  • Loading branch information
becketqin authored Sep 26, 2017
1 parent 44f19cb commit 1e83f3a
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.linkedin.kafka.clients.auditing.AuditType;
import com.linkedin.kafka.clients.auditing.Auditor;
import com.linkedin.kafka.clients.auditing.LoggingAuditor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
Expand Down Expand Up @@ -85,7 +86,8 @@ public abstract class AbstractAuditor<K, V> extends Thread implements Auditor<K,
private volatile long _ticks;

// The shutdown flag and latches.
protected volatile boolean _shutdown;
protected final AtomicBoolean _started = new AtomicBoolean(false);
protected final AtomicBoolean _shutdown = new AtomicBoolean(false);

/**
* Construct the abstract auditor.
Expand Down Expand Up @@ -122,15 +124,14 @@ public void configure(Map<String, ?> configs) {
_nextTick = _enableAutoTick ?
(_time.milliseconds() / _reportingIntervalMs) * _reportingIntervalMs + _reportingIntervalMs : Long.MAX_VALUE;
_ticks = 0;
_shutdown = false;
}

@Override
public void run() {
if (_enableAutoTick) {
LOG.info("Starting auditor...");
try {
while (!_shutdown) {
while (!_shutdown.get()) {
try {
long now = _time.milliseconds();
if (now >= _nextTick + _reportingDelayMs) {
Expand All @@ -149,7 +150,7 @@ public void run() {
} finally {
_currentStats.close();
_nextStats.close();
_shutdown = true;
_shutdown.set(true);
onClosed(_currentStats, _nextStats);
}
} else {
Expand Down Expand Up @@ -264,10 +265,12 @@ protected abstract Object getAuditKey(Object auditToken,

@Override
public void start() {
// Initialize the stats before starting auditor.
_currentStats = createAuditStats();
_nextStats = createAuditStats();
super.start();
if (_started.compareAndSet(false, true)) {
// Initialize the stats before starting auditor.
_currentStats = createAuditStats();
_nextStats = createAuditStats();
super.start();
}
}

@Override
Expand All @@ -289,14 +292,15 @@ public void record(Object auditToken,
} catch (IllegalStateException ise) {
// Ignore this exception and retry because we might be ticking.
}
} while (!done && !_shutdown);
} while (!done && !_shutdown.get());
}

@Override
public void close(long timeout, TimeUnit unit) {
LOG.info("Closing auditor with timeout {} {}", timeout, unit);
_shutdown = true;
interrupt();
if (_shutdown.compareAndSet(false, true)) {
interrupt();
}
try {
if (timeout > 0) {
this.join(unit.toMillis(timeout));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,11 @@ public void testClose() {
config.put(AbstractAuditor.REPORTING_INTERVAL_MS, "60000");
auditor.configure(config);
auditor.start();
// calling start twice should have no impact.
auditor.start();

auditor.close();
// Calling close twice should have no impact.
auditor.close();

assertEquals(auditor.getState(), Thread.State.TERMINATED, "The auditor thread should have exited.");
Expand Down

0 comments on commit 1e83f3a

Please sign in to comment.