Skip to content

Commit

Permalink
Addressing review comment - adding tests to test concurrency
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <bharath78910@gmail.com>
  • Loading branch information
bharath-techie committed Jul 26, 2022
1 parent f4db20b commit 3f08013
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 0 deletions.
5 changes: 5 additions & 0 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -1078,6 +1078,11 @@ public DeletePitResponse freeReaderContextsIfFound(List<PitSearchContextIdForNod
if (getReaderContext(contextId.getSearchContextIdForNode().getSearchContextId()) != null) {
try (ReaderContext context = removeReaderContext(contextId.getSearchContextIdForNode().getSearchContextId().getId())) {
PitReaderContext pitReaderContext = (PitReaderContext) context;
if (context == null) {
DeletePitInfo deletePitInfo = new DeletePitInfo(true, contextId.getPitId());
deleteResults.add(deletePitInfo);
continue;
}
String pitId = pitReaderContext.getPitId();
boolean success = context != null;
DeletePitInfo deletePitInfo = new DeletePitInfo(success, pitId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,31 @@
import org.junit.After;
import org.junit.Before;
import org.opensearch.action.ActionFuture;
import org.opensearch.action.ActionListener;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.action.search.CreatePitAction;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.DeletePitAction;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.DeletePitResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.search.builder.PointInTimeBuilder;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.hamcrest.Matchers.containsString;
import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
Expand Down Expand Up @@ -199,4 +213,103 @@ public void testPitInvalidDefaultKeepAlive() {
.setTransientSettings(Settings.builder().putNull("*"))
);
}

public void testConcurrentCreates() throws InterruptedException {
CreatePitRequest createPitRequest = new CreatePitRequest(TimeValue.timeValueDays(1), true);
createPitRequest.setIndices(new String[] { "index" });

int concurrentRuns = randomIntBetween(20, 50);
AtomicInteger numSuccess = new AtomicInteger();
TestThreadPool testThreadPool = null;
try {
testThreadPool = new TestThreadPool(DeletePitMultiNodeTests.class.getName());
List<Runnable> operationThreads = new ArrayList<>();
CountDownLatch countDownLatch = new CountDownLatch(concurrentRuns);
Set<String> createSet = new HashSet<>();
for (int i = 0; i < concurrentRuns; i++) {
Runnable thread = () -> {
logger.info("Triggering pit create --->");
LatchedActionListener listener = new LatchedActionListener<>(new ActionListener<CreatePitResponse>() {
@Override
public void onResponse(CreatePitResponse createPitResponse) {
if (createSet.add(createPitResponse.getId())) {
numSuccess.incrementAndGet();
}
}

@Override
public void onFailure(Exception e) {}
}, countDownLatch);
client().execute(CreatePitAction.INSTANCE, createPitRequest, listener);
};
operationThreads.add(thread);
}
TestThreadPool finalTestThreadPool = testThreadPool;
operationThreads.forEach(runnable -> finalTestThreadPool.executor("generic").execute(runnable));
countDownLatch.await();
assertEquals(concurrentRuns, numSuccess.get());
} finally {
ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS);
}
}

public void testConcurrentCreatesWithDeletes() throws InterruptedException, ExecutionException {
CreatePitRequest createPitRequest = new CreatePitRequest(TimeValue.timeValueDays(1), true);
createPitRequest.setIndices(new String[] { "index" });
List<String> pitIds = new ArrayList<>();
String id = client().execute(CreatePitAction.INSTANCE, createPitRequest).get().getId();
pitIds.add(id);
DeletePitRequest deletePITRequest = new DeletePitRequest(pitIds);
Set<String> createSet = new HashSet<>();
AtomicInteger numSuccess = new AtomicInteger();
TestThreadPool testThreadPool = null;
try {
testThreadPool = new TestThreadPool(CreatePitMultiNodeTests.class.getName());
int concurrentRuns = randomIntBetween(20, 50);

List<Runnable> operationThreads = new ArrayList<>();
CountDownLatch countDownLatch = new CountDownLatch(concurrentRuns);
long randomDeleteThread = randomLongBetween(0, concurrentRuns - 1);
for (int i = 0; i < concurrentRuns; i++) {
int currentThreadIteration = i;
Runnable thread = () -> {
if (currentThreadIteration == randomDeleteThread) {
LatchedActionListener listener = new LatchedActionListener<>(new ActionListener<CreatePitResponse>() {
@Override
public void onResponse(CreatePitResponse createPitResponse) {
if (createSet.add(createPitResponse.getId())) {
numSuccess.incrementAndGet();
}
}

@Override
public void onFailure(Exception e) {}
}, countDownLatch);
client().execute(CreatePitAction.INSTANCE, createPitRequest, listener);
} else {
LatchedActionListener listener = new LatchedActionListener<>(new ActionListener<DeletePitResponse>() {
@Override
public void onResponse(DeletePitResponse deletePitResponse) {
if (deletePitResponse.getDeletePitResults().get(0).isSuccessful()) {
numSuccess.incrementAndGet();
}
}

@Override
public void onFailure(Exception e) {}
}, countDownLatch);
client().execute(DeletePitAction.INSTANCE, deletePITRequest, listener);
}
};
operationThreads.add(thread);
}
TestThreadPool finalTestThreadPool = testThreadPool;
operationThreads.forEach(runnable -> finalTestThreadPool.executor("generic").execute(runnable));
countDownLatch.await();
assertEquals(concurrentRuns, numSuccess.get());

} finally {
ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.junit.After;
import org.junit.Before;
import org.opensearch.action.ActionFuture;
import org.opensearch.action.ActionListener;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.action.search.CreatePitAction;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
Expand All @@ -23,12 +25,16 @@
import org.opensearch.search.builder.PointInTimeBuilder;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.hamcrest.Matchers.blankOrNullString;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -283,4 +289,45 @@ public void testDeleteWhileSearch() throws Exception {
thread.join();
}
}

public void testtConcurrentDeletes() throws InterruptedException, ExecutionException {
CreatePitResponse pitResponse = createPitOnIndex("index");
ensureGreen();
int concurrentRuns = randomIntBetween(20, 50);
List<String> pitIds = new ArrayList<>();
pitIds.add(pitResponse.getId());
DeletePitRequest deletePITRequest = new DeletePitRequest(pitIds);
AtomicInteger numDeleteAcknowledged = new AtomicInteger();
TestThreadPool testThreadPool = null;
try {
testThreadPool = new TestThreadPool(DeletePitMultiNodeTests.class.getName());
List<Runnable> operationThreads = new ArrayList<>();
CountDownLatch countDownLatch = new CountDownLatch(concurrentRuns);
for (int i = 0; i < concurrentRuns; i++) {
Runnable thread = () -> {
logger.info("Triggering pit delete --->");
LatchedActionListener listener = new LatchedActionListener<>(new ActionListener<DeletePitResponse>() {
@Override
public void onResponse(DeletePitResponse deletePitResponse) {
if (deletePitResponse.getDeletePitResults().get(0).isSuccessful()) {
numDeleteAcknowledged.incrementAndGet();
}
}

@Override
public void onFailure(Exception e) {}
}, countDownLatch);
client().execute(DeletePitAction.INSTANCE, deletePITRequest, listener);
};
operationThreads.add(thread);
}
TestThreadPool finalTestThreadPool = testThreadPool;
operationThreads.forEach(runnable -> finalTestThreadPool.executor("generic").execute(runnable));
countDownLatch.await();
assertEquals(concurrentRuns, numDeleteAcknowledged.get());
} finally {
ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS);
}
}

}

0 comments on commit 3f08013

Please sign in to comment.