Skip to content

Commit ad12ef4

Browse files
authored
Fixing concurrency bug on writer (#1508) (#1520)
* fix concurrency bug * fix concurrency bug --------- Signed-off-by: Amit Galitzky <amgalitz@amazon.com>
1 parent 5db65e6 commit ad12ef4

File tree

7 files changed

+94
-5
lines changed

7 files changed

+94
-5
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
1010

1111

1212
### Bug Fixes
13+
- Fixing concurrency bug on writer ([#1508](https://github.com/opensearch-project/anomaly-detection/pull/1508))
14+
1315
### Infrastructure
1416
### Documentation
1517
### Maintenance

src/main/java/org/opensearch/ad/task/ADTaskManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1928,7 +1928,7 @@ protected <T> void resetLatestConfigTaskState(
19281928
protected String triageState(Boolean hasResult, String error, Long rcfTotalUpdates) {
19291929
if (hasResult != null && hasResult) {
19301930
return TaskState.RUNNING.name();
1931-
} else if (rcfTotalUpdates != null && rcfTotalUpdates < TimeSeriesSettings.NUM_MIN_SAMPLES) {
1931+
} else if (rcfTotalUpdates == null || rcfTotalUpdates < TimeSeriesSettings.NUM_MIN_SAMPLES) {
19321932
return TaskState.INIT.name();
19331933
} else {
19341934
return TaskState.RUNNING.name();

src/main/java/org/opensearch/timeseries/ratelimit/BatchWorker.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,11 @@ protected void execute(Runnable afterProcessCallback, Runnable emptyQueueCallbac
113113
if (false == toProcess.isEmpty()) {
114114
final List<String> inflights = new ArrayList<>();
115115
for (RequestType request : toProcess) {
116-
inflightConfigs.add(request.getConfigId());
117-
inflights.add(request.getConfigId());
116+
String configId = request.getConfigId();
117+
if (configId != null) {
118+
inflightConfigs.add(configId);
119+
inflights.add(configId);
120+
}
118121
}
119122

120123
BatchRequestType batchRequest = toBatchRequest(toProcess);

src/main/java/org/opensearch/timeseries/ratelimit/RateLimitedRequestWorker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Random;
2727
import java.util.Set;
2828
import java.util.concurrent.BlockingQueue;
29+
import java.util.concurrent.ConcurrentHashMap;
2930
import java.util.concurrent.ConcurrentSkipListMap;
3031
import java.util.concurrent.LinkedBlockingQueue;
3132
import java.util.concurrent.TimeUnit;
@@ -249,7 +250,7 @@ public RateLimitedRequestWorker(
249250
this.stateTtl = stateTtl;
250251
this.nodeStateManager = nodeStateManager;
251252
this.context = context;
252-
this.inflightConfigs = new HashSet<>();
253+
this.inflightConfigs = ConcurrentHashMap.newKeySet();
253254
}
254255

255256
public String getWorkerName() {

src/main/java/org/opensearch/timeseries/ratelimit/SingleRequestWorker.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,10 @@ protected void execute(Runnable afterProcessCallback, Runnable emptyQueueCallbac
9292
if (false == queue.isEmpty()) {
9393
request = queue.poll();
9494
if (request != null) {
95-
inflightConfigs.add(request.getConfigId());
95+
String configId = request.getConfigId();
96+
if (configId != null) {
97+
inflightConfigs.add(configId);
98+
}
9699
}
97100
}
98101

src/test/java/org/opensearch/ad/ratelimit/ResultWriteWorkerTests.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,12 @@
2626
import java.util.ArrayList;
2727
import java.util.Arrays;
2828
import java.util.Collections;
29+
import java.util.ConcurrentModificationException;
2930
import java.util.HashSet;
3031
import java.util.List;
3132
import java.util.Random;
33+
import java.util.concurrent.CountDownLatch;
34+
import java.util.concurrent.TimeUnit;
3235
import java.util.concurrent.atomic.AtomicBoolean;
3336

3437
import org.opensearch.OpenSearchStatusException;
@@ -209,4 +212,73 @@ public void testOverloaded() {
209212
verify(resultHandler, times(1)).flush(any(), any());
210213
verify(nodeStateManager, times(1)).setException(eq(detectorId), any(OpenSearchRejectedExecutionException.class));
211214
}
215+
216+
public void testConcurrentModificationException() throws InterruptedException {
217+
for (int attempt = 0; attempt < 10; attempt++) {
218+
if (runConcurrencyTest()) {
219+
fail("ConcurrentModificationException occurred on attempt " + (attempt + 1));
220+
}
221+
Thread.sleep(5);
222+
}
223+
}
224+
225+
private boolean runConcurrencyTest() throws InterruptedException {
226+
final int numberOfThreads = 20;
227+
final CountDownLatch startLatch = new CountDownLatch(1);
228+
final CountDownLatch endLatch = new CountDownLatch(numberOfThreads);
229+
final AtomicBoolean exceptionOccurred = new AtomicBoolean(false);
230+
231+
doAnswer(invocation -> {
232+
ActionListener<ResultBulkResponse> listener = invocation.getArgument(1);
233+
try {
234+
Thread.sleep(5);
235+
listener.onResponse(new ResultBulkResponse(Collections.emptyList()));
236+
} catch (Exception e) {
237+
listener.onFailure(e);
238+
}
239+
return null;
240+
}).when(resultHandler).flush(any(), any());
241+
242+
doAnswer(invocation -> {
243+
Exception e = invocation.getArgument(1);
244+
if (e instanceof ConcurrentModificationException) {
245+
exceptionOccurred.set(true);
246+
}
247+
return null;
248+
}).when(nodeStateManager).setException(any(), any());
249+
250+
for (int i = 0; i < numberOfThreads; i++) {
251+
Thread thread = new Thread(() -> {
252+
try {
253+
startLatch.await();
254+
// Each thread adds requests
255+
for (int j = 0; j < 30; j++) {
256+
try {
257+
String detectorId = "detector-" + (j % 3);
258+
resultWriteQueue
259+
.put(
260+
new ADResultWriteRequest(
261+
System.currentTimeMillis() + j,
262+
detectorId,
263+
RequestPriority.MEDIUM,
264+
detectResult,
265+
null,
266+
null
267+
)
268+
);
269+
} catch (ConcurrentModificationException e) {
270+
exceptionOccurred.set(true);
271+
return;
272+
}
273+
}
274+
} catch (Exception ignored) {} finally {
275+
endLatch.countDown();
276+
}
277+
});
278+
thread.start();
279+
}
280+
startLatch.countDown();
281+
assertTrue("Test should complete", endLatch.await(30, TimeUnit.SECONDS));
282+
return exceptionOccurred.get();
283+
}
212284
}

src/test/java/org/opensearch/ad/task/ADTaskManagerTests.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -653,6 +653,14 @@ public void testUpdateLatestRealtimeTaskOnCoordinatingNode() {
653653
verify(actionListener, times(1)).onResponse(any());
654654
}
655655

656+
public void testTriageStateWithNullRcfTotalUpdates() {
657+
Boolean hasResult = null;
658+
String error = null;
659+
Long rcfTotalUpdates = null;
660+
String result = adTaskManager.triageState(hasResult, error, rcfTotalUpdates);
661+
assertEquals(TaskState.INIT.name(), result);
662+
}
663+
656664
public void testGetLocalADTaskProfilesByDetectorId() {
657665
doReturn(node1).when(clusterService).localNode();
658666
when(adTaskCacheManager.isHCTaskRunning(anyString())).thenReturn(true);

0 commit comments

Comments
 (0)