Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)


### Bug Fixes
- Fixing concurrency bug on writer ([#1508](https://github.com/opensearch-project/anomaly-detection/pull/1508))

### Infrastructure
### Documentation
### Maintenance
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ad/task/ADTaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -1928,7 +1928,7 @@ protected <T> void resetLatestConfigTaskState(
protected String triageState(Boolean hasResult, String error, Long rcfTotalUpdates) {
if (hasResult != null && hasResult) {
return TaskState.RUNNING.name();
} else if (rcfTotalUpdates != null && rcfTotalUpdates < TimeSeriesSettings.NUM_MIN_SAMPLES) {
} else if (rcfTotalUpdates == null || rcfTotalUpdates < TimeSeriesSettings.NUM_MIN_SAMPLES) {
return TaskState.INIT.name();
} else {
return TaskState.RUNNING.name();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,11 @@ protected void execute(Runnable afterProcessCallback, Runnable emptyQueueCallbac
if (false == toProcess.isEmpty()) {
final List<String> inflights = new ArrayList<>();
for (RequestType request : toProcess) {
inflightConfigs.add(request.getConfigId());
inflights.add(request.getConfigId());
String configId = request.getConfigId();
if (configId != null) {
inflightConfigs.add(configId);
inflights.add(configId);
}
}

BatchRequestType batchRequest = toBatchRequest(toProcess);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -249,7 +250,7 @@ public RateLimitedRequestWorker(
this.stateTtl = stateTtl;
this.nodeStateManager = nodeStateManager;
this.context = context;
this.inflightConfigs = new HashSet<>();
this.inflightConfigs = ConcurrentHashMap.newKeySet();
}

public String getWorkerName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,10 @@ protected void execute(Runnable afterProcessCallback, Runnable emptyQueueCallbac
if (false == queue.isEmpty()) {
request = queue.poll();
if (request != null) {
inflightConfigs.add(request.getConfigId());
String configId = request.getConfigId();
if (configId != null) {
inflightConfigs.add(configId);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.opensearch.OpenSearchStatusException;
Expand Down Expand Up @@ -209,4 +212,73 @@ public void testOverloaded() {
verify(resultHandler, times(1)).flush(any(), any());
verify(nodeStateManager, times(1)).setException(eq(detectorId), any(OpenSearchRejectedExecutionException.class));
}

public void testConcurrentModificationException() throws InterruptedException {
for (int attempt = 0; attempt < 10; attempt++) {
if (runConcurrencyTest()) {
fail("ConcurrentModificationException occurred on attempt " + (attempt + 1));
}
Thread.sleep(5);
}
}

private boolean runConcurrencyTest() throws InterruptedException {
final int numberOfThreads = 20;
final CountDownLatch startLatch = new CountDownLatch(1);
final CountDownLatch endLatch = new CountDownLatch(numberOfThreads);
final AtomicBoolean exceptionOccurred = new AtomicBoolean(false);

doAnswer(invocation -> {
ActionListener<ResultBulkResponse> listener = invocation.getArgument(1);
try {
Thread.sleep(5);
listener.onResponse(new ResultBulkResponse(Collections.emptyList()));
} catch (Exception e) {
listener.onFailure(e);
}
return null;
}).when(resultHandler).flush(any(), any());

doAnswer(invocation -> {
Exception e = invocation.getArgument(1);
if (e instanceof ConcurrentModificationException) {
exceptionOccurred.set(true);
}
return null;
}).when(nodeStateManager).setException(any(), any());

for (int i = 0; i < numberOfThreads; i++) {
Thread thread = new Thread(() -> {
try {
startLatch.await();
// Each thread adds requests
for (int j = 0; j < 30; j++) {
try {
String detectorId = "detector-" + (j % 3);
resultWriteQueue
.put(
new ADResultWriteRequest(
System.currentTimeMillis() + j,
detectorId,
RequestPriority.MEDIUM,
detectResult,
null,
null
)
);
} catch (ConcurrentModificationException e) {
exceptionOccurred.set(true);
return;
}
}
} catch (Exception ignored) {} finally {
endLatch.countDown();
}
});
thread.start();
}
startLatch.countDown();
assertTrue("Test should complete", endLatch.await(30, TimeUnit.SECONDS));
return exceptionOccurred.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,14 @@ public void testUpdateLatestRealtimeTaskOnCoordinatingNode() {
verify(actionListener, times(1)).onResponse(any());
}

public void testTriageStateWithNullRcfTotalUpdates() {
Boolean hasResult = null;
String error = null;
Long rcfTotalUpdates = null;
String result = adTaskManager.triageState(hasResult, error, rcfTotalUpdates);
assertEquals(TaskState.INIT.name(), result);
}

public void testGetLocalADTaskProfilesByDetectorId() {
doReturn(node1).when(clusterService).localNode();
when(adTaskCacheManager.isHCTaskRunning(anyString())).thenReturn(true);
Expand Down
Loading