Skip to content

Commit

Permalink
Remove one unnecesary test and simply some code in a test (opensearch…
Browse files Browse the repository at this point in the history
…-project#14360)

Signed-off-by: Liyun Xiu <xiliyun@amazon.com>
  • Loading branch information
chishui authored and harshavamsi committed Jul 12, 2024
1 parent a25bd85 commit f77d7e5
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ private void runBulkTestWithRandomDocs(boolean shouldSetBatchSize) throws Except
int numRequests = scaledRandomIntBetween(32, 128);
BulkRequest bulkRequest = new BulkRequest();
if (shouldSetBatchSize) {
bulkRequest.batchSize(numRequests);
bulkRequest.batchSize(scaledRandomIntBetween(2, numRequests));
}
for (int i = 0; i < numRequests; i++) {
IndexRequest indexRequest = new IndexRequest("index").id(Integer.toString(i)).setPipeline("_id");
Expand All @@ -214,6 +214,9 @@ private void runBulkTestWithRandomDocs(boolean shouldSetBatchSize) throws Except
);
assertThat(indexResponse, notNullValue());
assertThat(indexResponse.getId(), equalTo(Integer.toString(i)));
// verify field of successful doc
Map<String, Object> successDoc = client().prepareGet("index", indexResponse.getId()).get().getSourceAsMap();
assertThat(successDoc.get("processed"), equalTo(true));
assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
}
}
Expand All @@ -223,51 +226,6 @@ private void runBulkTestWithRandomDocs(boolean shouldSetBatchSize) throws Except
assertTrue(deletePipelineResponse.isAcknowledged());
}

public void testBulkWithIngestFailuresBatch() throws Exception {
createIndex("index");

BytesReference source = BytesReference.bytes(
jsonBuilder().startObject()
.field("description", "my_pipeline")
.startArray("processors")
.startObject()
.startObject("test")
.endObject()
.endObject()
.endArray()
.endObject()
);
PutPipelineRequest putPipelineRequest = new PutPipelineRequest("_id", source, MediaTypeRegistry.JSON);
client().admin().cluster().putPipeline(putPipelineRequest).get();

BulkRequest bulkRequest = new BulkRequest();
bulkRequest.batchSize(2);
bulkRequest.add(
new IndexRequest("index").id("_fail").setPipeline("_id").source(Requests.INDEX_CONTENT_TYPE, "field", "value", "fail", true)
);
bulkRequest.add(
new IndexRequest("index").id("_success").setPipeline("_id").source(Requests.INDEX_CONTENT_TYPE, "field", "value", "fail", false)
);

BulkResponse response = client().bulk(bulkRequest).actionGet();
MatcherAssert.assertThat(response.getItems().length, equalTo(bulkRequest.requests().size()));

Map<String, BulkItemResponse> results = Arrays.stream(response.getItems())
.collect(Collectors.toMap(BulkItemResponse::getId, r -> r));

MatcherAssert.assertThat(results.keySet(), containsInAnyOrder("_fail", "_success"));
assertNotNull(results.get("_fail").getFailure());
assertNull(results.get("_success").getFailure());

// verify field of successful doc
Map<String, Object> successDoc = client().prepareGet("index", "_success").get().getSourceAsMap();
assertThat(successDoc.get("processed"), equalTo(true));

// cleanup
AcknowledgedResponse deletePipelineResponse = client().admin().cluster().prepareDeletePipeline("_id").get();
assertTrue(deletePipelineResponse.isAcknowledged());
}

public void testBulkWithIngestFailuresAndDropBatch() throws Exception {
createIndex("index");

Expand Down
27 changes: 11 additions & 16 deletions server/src/test/java/org/opensearch/ingest/IngestServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,15 @@
import org.junit.Before;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -97,7 +99,6 @@
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;

Expand Down Expand Up @@ -1923,27 +1924,21 @@ public void testExecuteBulkRequestInBatchWithExceptionAndDropInCallback() {
return null;
}).when(mockCompoundProcessor).batchExecute(any(), any());

@SuppressWarnings("unchecked")
final BiConsumer<Integer, Exception> failureHandler = mock(BiConsumer.class);
@SuppressWarnings("unchecked")
final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class);
final IntConsumer dropHandler = mock(IntConsumer.class);
final Map<Integer, Exception> failureHandler = new HashMap<>();
final Map<Thread, Exception> completionHandler = new HashMap<>();
final List<Integer> dropHandler = new ArrayList<>();
ingestService.executeBulkRequest(
3,
bulkRequest.requests(),
failureHandler,
completionHandler,
dropHandler,
failureHandler::put,
completionHandler::put,
dropHandler::add,
Names.WRITE,
bulkRequest
);
ArgumentCaptor<Integer> failureSlotCaptor = ArgumentCaptor.forClass(Integer.class);
verify(failureHandler, times(1)).accept(failureSlotCaptor.capture(), any());
assertEquals(1, failureSlotCaptor.getValue().intValue());
ArgumentCaptor<Integer> dropSlotCaptor = ArgumentCaptor.forClass(Integer.class);
verify(dropHandler, times(1)).accept(dropSlotCaptor.capture());
assertEquals(2, dropSlotCaptor.getValue().intValue());
verify(completionHandler, times(1)).accept(Thread.currentThread(), null);
assertEquals(Set.of(1), failureHandler.keySet());
assertEquals(List.of(2), dropHandler);
assertEquals(Set.of(Thread.currentThread()), completionHandler.keySet());
verify(mockCompoundProcessor, times(1)).batchExecute(any(), any());
verify(mockCompoundProcessor, never()).execute(any(), any());
}
Expand Down

0 comments on commit f77d7e5

Please sign in to comment.