Skip to content

Commit 02ba992

Browse files
committed
Add stopped event
1 parent daf6f10 commit 02ba992

File tree

3 files changed

+70
-20
lines changed

3 files changed

+70
-20
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexer.java

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@
2323
* An abstract class that builds an index incrementally. A background job can be launched using {@link #maybeTriggerAsyncJob(long)},
2424
* it will create the index from the source index up to the last complete bucket that is allowed to be built (based on job position).
2525
* Only one background job can run simultaneously and {@link #onFinish} is called when the job
26-
* finishes. {@link #onStop()} is called after the current search returns when the job is stopped early via a call
27-
* to {@link #stop()}. {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()}
26+
* finishes before state is persisted. The indexer can be stopped early by a call to {@link #stop()} which will
27+
* trigger the {@link #onStopping()} and {@link #onStopped()} methods.
28+
* {@link #onFailure(Exception)} is called if the job fails with an exception and {@link #onAbort()}
2829
* is called if the indexer is aborted while a job is running. The indexer must be started ({@link #start()}
2930
* to allow a background job to run when {@link #maybeTriggerAsyncJob(long)} is called.
30-
* {@link #stop()} can be used to stop the background job without aborting the indexer.
3131
*
3232
* In a nutshell this is a 2 cycle engine: 1st it sends a query, 2nd it indexes documents based on the response, sends the next query,
3333
* indexes, queries, indexes, ... until a condition lets the engine pause until the source provides new input.
@@ -87,10 +87,10 @@ public synchronized IndexerState start() {
8787

8888
/**
8989
* Sets the internal state to {@link IndexerState#STOPPING} if an async job is
90-
* running in the background, {@link #onStop()} will be called when the background job
90+
* running in the background, {@link #onStopped()} will be called when the background job
9191
* detects that the indexer is stopped.
9292
* If there is no job running when this function is called the returned
93-
* state is {@link IndexerState#STOPPED} and {@link #onStop()} will not be called.
93+
* state is {@link IndexerState#STOPPED} and {@link #onStopped()} will not be called.
9494
*
9595
* @return The new state for the indexer (STOPPED, STOPPING or ABORTING if the job was already aborted).
9696
*/
@@ -249,20 +249,30 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
249249

250250
/**
251251
* Called when a background job finishes before the internal state changes from {@link IndexerState#INDEXING} back to
252-
* {@link IndexerState#STARTED}.
252+
* {@link IndexerState#STARTED} and before {@link #doSaveState(IndexerState, Object, Runnable)} is called
253253
*
254254
* @param listener listener to call after done
255255
*/
256256
protected abstract void onFinish(ActionListener<Void> listener);
257257

258+
258259
/**
259-
* Called when the indexer is stopped. This is only called when the indexer is stopped
260-
* via {@link #stop()} as opposed to {@link #onFinish(ActionListener)} which is called
261-
* when the indexer's work is done.
260+
* Called when a background job stops after internal state has changed from {@link IndexerState#STOPPING}
261+
* to {@link IndexerState#STOPPED} and before state is persisted via {@link #doSaveState(IndexerState, Object, Runnable)}.
262+
* This is only called when the indexer is stopped due to a call to {@link #stop()}
262263
*/
263-
protected void onStop() {
264+
protected void onStopping() {
264265
}
265266

267+
/**
268+
* Called when the indexer is stopped after {@link #onStopping()} and {@link #doSaveState(IndexerState, Object, Runnable)}
269+
* have been called.
270+
*/
271+
protected void onStopped() {
272+
}
273+
274+
275+
266276
/**
267277
* Called when a background job detects that the indexer is aborted causing the
268278
* async execution to stop.
@@ -280,16 +290,18 @@ private void finishWithIndexingFailure(Exception exc) {
280290
}
281291

282292
private IndexerState finishAndSetState() {
283-
AtomicBoolean callOnStop = new AtomicBoolean(false);
284-
AtomicBoolean callOnAbort = new AtomicBoolean(false);
293+
AtomicBoolean callOnStopping = new AtomicBoolean();
294+
AtomicBoolean callOnAbort = new AtomicBoolean();
285295
IndexerState updatedState = state.updateAndGet(prev -> {
296+
callOnAbort.set(false);
297+
callOnStopping.set(false);
286298
switch (prev) {
287299
case INDEXING:
288300
// ready for another job
289301
return IndexerState.STARTED;
290302

291303
case STOPPING:
292-
callOnStop.set(true);
304+
callOnStopping.set(true);
293305
// must be started again
294306
return IndexerState.STOPPED;
295307

@@ -311,8 +323,8 @@ private IndexerState finishAndSetState() {
311323
}
312324
});
313325

314-
if (callOnStop.get()) {
315-
onStop();
326+
if (callOnStopping.get()) {
327+
onStopping();
316328
} else if (callOnAbort.get()) {
317329
onAbort();
318330
}
@@ -412,7 +424,7 @@ private boolean checkState(IndexerState currentState) {
412424

413425
case STOPPING:
414426
logger.info("Indexer job encountered [" + IndexerState.STOPPING + "] state, halting indexer.");
415-
doSaveState(finishAndSetState(), getPosition(), () -> {});
427+
doSaveState(finishAndSetState(), getPosition(), this::onStopped);
416428
return false;
417429

418430
case STOPPED:

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/indexing/AsyncTwoPhaseIndexerTests.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,13 @@ public class AsyncTwoPhaseIndexerTests extends ESTestCase {
3636

3737
AtomicBoolean isFinished = new AtomicBoolean(false);
3838
AtomicBoolean isStopped = new AtomicBoolean(false);
39+
AtomicBoolean isStopping = new AtomicBoolean(false);
3940

4041
@Before
4142
public void reset() {
4243
isFinished.set(false);
4344
isStopped.set(false);
45+
isStopping.set(false);
4446
}
4547

4648
private class MockIndexer extends AsyncTwoPhaseIndexer<Integer, MockJobStats> {
@@ -112,7 +114,7 @@ protected void doNextBulk(BulkRequest request, ActionListener<BulkResponse> next
112114

113115
@Override
114116
protected void doSaveState(IndexerState state, Integer position, Runnable next) {
115-
int expectedStep = stoppedBeforeFinished ? 3 : 5;
117+
int expectedStep = stoppedBeforeFinished ? 4 : 5;
116118
assertThat(step, equalTo(expectedStep));
117119
++step;
118120
next.run();
@@ -132,7 +134,16 @@ protected void onFinish(ActionListener<Void> listener) {
132134
}
133135

134136
@Override
135-
protected void onStop() {
137+
protected void onStopping() {
138+
assertThat(step, equalTo(3));
139+
++step;
140+
assertTrue(isStopping.compareAndSet(false, true));
141+
}
142+
143+
@Override
144+
protected void onStopped() {
145+
assertThat(step, equalTo(5));
146+
++step;
136147
assertTrue(isStopped.compareAndSet(false, true));
137148
}
138149

@@ -268,6 +279,26 @@ public void testStateMachineBrokenSearch() throws InterruptedException {
268279
}
269280
}
270281

282+
public void testStop_AfterIndexerIsFinished() throws InterruptedException {
283+
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
284+
final ExecutorService executor = Executors.newFixedThreadPool(1);
285+
try {
286+
CountDownLatch countDownLatch = new CountDownLatch(1);
287+
MockIndexer indexer = new MockIndexer(executor, state, 2, countDownLatch, false);
288+
indexer.start();
289+
assertTrue(indexer.maybeTriggerAsyncJob(System.currentTimeMillis()));
290+
countDownLatch.countDown();
291+
assertTrue(awaitBusy(() -> isFinished.get()));
292+
293+
indexer.stop();
294+
assertFalse(isStopping.get());
295+
assertFalse(isStopped.get());
296+
assertThat(indexer.getState(), equalTo(IndexerState.STOPPED));
297+
} finally {
298+
executor.shutdownNow();
299+
}
300+
}
301+
271302
public void testStop_WhileIndexing() throws InterruptedException {
272303
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
273304
final ExecutorService executor = Executors.newFixedThreadPool(1);
@@ -282,6 +313,7 @@ public void testStop_WhileIndexing() throws InterruptedException {
282313
countDownLatch.countDown();
283314

284315
assertThat(indexer.getPosition(), equalTo(2));
316+
assertTrue(awaitBusy(() -> isStopping.get()));
285317
assertTrue(awaitBusy(() -> isStopped.get()));
286318
assertFalse(isFinished.get());
287319
} finally {

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,8 @@ public synchronized void stop() {
238238

239239
IndexerState state = getIndexer().stop();
240240
if (state == IndexerState.STOPPED) {
241-
getIndexer().doSaveState(state, getIndexer().getPosition(), () -> getIndexer().onStop());
241+
getIndexer().onStopping();
242+
getIndexer().doSaveState(state, getIndexer().getPosition(), () -> getIndexer().onStopped());
242243
}
243244
}
244245

@@ -592,7 +593,12 @@ protected void onFinish(ActionListener<Void> listener) {
592593
}
593594

594595
@Override
595-
protected void onStop() {
596+
protected void onStopping() {
597+
transformTask.setTaskStateStopped();
598+
}
599+
600+
@Override
601+
protected void onStopped() {
596602
auditor.info(transformConfig.getId(), "Indexer has stopped");
597603
logger.info("Data frame transform [{}] indexer has stopped", transformConfig.getId());
598604
transformTask.setTaskStateStopped();

0 commit comments

Comments
 (0)