Skip to content

Commit 4502f8d

Browse files
committed
fix concurrency bug
Signed-off-by: Amit Galitzky <amgalitz@amazon.com>
1 parent 3871e39 commit 4502f8d

File tree

3 files changed

+17
-14
lines changed

3 files changed

+17
-14
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
1010

1111

1212
### Bug Fixes
13+
- Fixing concurrency bug on writer ([#1508](https://github.com/opensearch-project/anomaly-detection/pull/1508))
14+
1315
### Infrastructure
1416
### Documentation
1517
### Maintenance

src/main/java/org/opensearch/ad/task/ADTaskManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1928,7 +1928,7 @@ protected <T> void resetLatestConfigTaskState(
19281928
protected String triageState(Boolean hasResult, String error, Long rcfTotalUpdates) {
19291929
if (hasResult != null && hasResult) {
19301930
return TaskState.RUNNING.name();
1931-
} else if (rcfTotalUpdates != null && rcfTotalUpdates < TimeSeriesSettings.NUM_MIN_SAMPLES) {
1931+
} else if (rcfTotalUpdates == null || rcfTotalUpdates < TimeSeriesSettings.NUM_MIN_SAMPLES) {
19321932
return TaskState.INIT.name();
19331933
} else {
19341934
return TaskState.RUNNING.name();

src/test/java/org/opensearch/ad/ratelimit/ResultWriteWorkerTests.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -222,8 +222,9 @@ public void testConcurrentModificationException() throws InterruptedException {
222222
}
223223
}
224224

225-
private boolean runConcurrencyTest() throws InterruptedException {
226-
final int numberOfThreads = 20; final CountDownLatch startLatch = new CountDownLatch(1);
225+
private boolean runConcurrencyTest() throws InterruptedException {
226+
final int numberOfThreads = 20;
227+
final CountDownLatch startLatch = new CountDownLatch(1);
227228
final CountDownLatch endLatch = new CountDownLatch(numberOfThreads);
228229
final AtomicBoolean exceptionOccurred = new AtomicBoolean(false);
229230

@@ -254,30 +255,30 @@ private boolean runConcurrencyTest() throws InterruptedException {
254255
for (int j = 0; j < 30; j++) {
255256
try {
256257
String detectorId = "detector-" + (j % 3);
257-
resultWriteQueue.put(
258+
resultWriteQueue
259+
.put(
258260
new ADResultWriteRequest(
259-
System.currentTimeMillis() + j,
260-
detectorId,
261-
RequestPriority.MEDIUM,
262-
detectResult,
263-
null,
264-
null
261+
System.currentTimeMillis() + j,
262+
detectorId,
263+
RequestPriority.MEDIUM,
264+
detectResult,
265+
null,
266+
null
265267
)
266-
);
268+
);
267269
} catch (ConcurrentModificationException e) {
268270
exceptionOccurred.set(true);
269271
return;
270272
}
271273
}
272-
} catch (Exception ignored) {
273-
} finally {
274+
} catch (Exception ignored) {} finally {
274275
endLatch.countDown();
275276
}
276277
});
277278
thread.start();
278279
}
279280
startLatch.countDown();
280281
assertTrue("Test should complete", endLatch.await(30, TimeUnit.SECONDS));
281-
return exceptionOccurred.get();
282+
return exceptionOccurred.get();
282283
}
283284
}

0 commit comments

Comments
 (0)