Skip to content

Commit 3871e39

Browse files
committed
fix concurrency bug
Signed-off-by: Amit Galitzky <amgalitz@amazon.com>
1 parent f5e9ff7 commit 3871e39

File tree

5 files changed

+90
-4
lines changed

5 files changed

+90
-4
lines changed

src/main/java/org/opensearch/timeseries/ratelimit/BatchWorker.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,11 @@ protected void execute(Runnable afterProcessCallback, Runnable emptyQueueCallbac
113113
if (false == toProcess.isEmpty()) {
114114
final List<String> inflights = new ArrayList<>();
115115
for (RequestType request : toProcess) {
116-
inflightConfigs.add(request.getConfigId());
117-
inflights.add(request.getConfigId());
116+
String configId = request.getConfigId();
117+
if (configId != null) {
118+
inflightConfigs.add(configId);
119+
inflights.add(configId);
120+
}
118121
}
119122

120123
BatchRequestType batchRequest = toBatchRequest(toProcess);

src/main/java/org/opensearch/timeseries/ratelimit/RateLimitedRequestWorker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Random;
2727
import java.util.Set;
2828
import java.util.concurrent.BlockingQueue;
29+
import java.util.concurrent.ConcurrentHashMap;
2930
import java.util.concurrent.ConcurrentSkipListMap;
3031
import java.util.concurrent.LinkedBlockingQueue;
3132
import java.util.concurrent.TimeUnit;
@@ -249,7 +250,7 @@ public RateLimitedRequestWorker(
249250
this.stateTtl = stateTtl;
250251
this.nodeStateManager = nodeStateManager;
251252
this.context = context;
252-
this.inflightConfigs = new HashSet<>();
253+
this.inflightConfigs = ConcurrentHashMap.newKeySet();
253254
}
254255

255256
public String getWorkerName() {

src/main/java/org/opensearch/timeseries/ratelimit/SingleRequestWorker.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,10 @@ protected void execute(Runnable afterProcessCallback, Runnable emptyQueueCallbac
9292
if (false == queue.isEmpty()) {
9393
request = queue.poll();
9494
if (request != null) {
95-
inflightConfigs.add(request.getConfigId());
95+
String configId = request.getConfigId();
96+
if (configId != null) {
97+
inflightConfigs.add(configId);
98+
}
9699
}
97100
}
98101

src/test/java/org/opensearch/ad/ratelimit/ResultWriteWorkerTests.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,12 @@
2626
import java.util.ArrayList;
2727
import java.util.Arrays;
2828
import java.util.Collections;
29+
import java.util.ConcurrentModificationException;
2930
import java.util.HashSet;
3031
import java.util.List;
3132
import java.util.Random;
33+
import java.util.concurrent.CountDownLatch;
34+
import java.util.concurrent.TimeUnit;
3235
import java.util.concurrent.atomic.AtomicBoolean;
3336

3437
import org.opensearch.OpenSearchStatusException;
@@ -209,4 +212,72 @@ public void testOverloaded() {
209212
verify(resultHandler, times(1)).flush(any(), any());
210213
verify(nodeStateManager, times(1)).setException(eq(detectorId), any(OpenSearchRejectedExecutionException.class));
211214
}
215+
216+
public void testConcurrentModificationException() throws InterruptedException {
217+
for (int attempt = 0; attempt < 10; attempt++) {
218+
if (runConcurrencyTest()) {
219+
fail("ConcurrentModificationException occurred on attempt " + (attempt + 1));
220+
}
221+
Thread.sleep(5);
222+
}
223+
}
224+
225+
private boolean runConcurrencyTest() throws InterruptedException {
226+
final int numberOfThreads = 20; final CountDownLatch startLatch = new CountDownLatch(1);
227+
final CountDownLatch endLatch = new CountDownLatch(numberOfThreads);
228+
final AtomicBoolean exceptionOccurred = new AtomicBoolean(false);
229+
230+
doAnswer(invocation -> {
231+
ActionListener<ResultBulkResponse> listener = invocation.getArgument(1);
232+
try {
233+
Thread.sleep(5);
234+
listener.onResponse(new ResultBulkResponse(Collections.emptyList()));
235+
} catch (Exception e) {
236+
listener.onFailure(e);
237+
}
238+
return null;
239+
}).when(resultHandler).flush(any(), any());
240+
241+
doAnswer(invocation -> {
242+
Exception e = invocation.getArgument(1);
243+
if (e instanceof ConcurrentModificationException) {
244+
exceptionOccurred.set(true);
245+
}
246+
return null;
247+
}).when(nodeStateManager).setException(any(), any());
248+
249+
for (int i = 0; i < numberOfThreads; i++) {
250+
Thread thread = new Thread(() -> {
251+
try {
252+
startLatch.await();
253+
// Each thread adds requests
254+
for (int j = 0; j < 30; j++) {
255+
try {
256+
String detectorId = "detector-" + (j % 3);
257+
resultWriteQueue.put(
258+
new ADResultWriteRequest(
259+
System.currentTimeMillis() + j,
260+
detectorId,
261+
RequestPriority.MEDIUM,
262+
detectResult,
263+
null,
264+
null
265+
)
266+
);
267+
} catch (ConcurrentModificationException e) {
268+
exceptionOccurred.set(true);
269+
return;
270+
}
271+
}
272+
} catch (Exception ignored) {
273+
} finally {
274+
endLatch.countDown();
275+
}
276+
});
277+
thread.start();
278+
}
279+
startLatch.countDown();
280+
assertTrue("Test should complete", endLatch.await(30, TimeUnit.SECONDS));
281+
return exceptionOccurred.get();
282+
}
212283
}

src/test/java/org/opensearch/ad/task/ADTaskManagerTests.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -653,6 +653,14 @@ public void testUpdateLatestRealtimeTaskOnCoordinatingNode() {
653653
verify(actionListener, times(1)).onResponse(any());
654654
}
655655

656+
public void testTriageStateWithNullRcfTotalUpdates() {
657+
Boolean hasResult = true;
658+
String error = null;
659+
Long rcfTotalUpdates = null;
660+
String result = adTaskManager.triageState(hasResult, error, rcfTotalUpdates);
661+
assertEquals(TaskState.INIT.name(), result);
662+
}
663+
656664
public void testGetLocalADTaskProfilesByDetectorId() {
657665
doReturn(node1).when(clusterService).localNode();
658666
when(adTaskCacheManager.isHCTaskRunning(anyString())).thenReturn(true);

0 commit comments

Comments
 (0)