Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flaky test in testApproximateRangeWithSizeOverDefault by adjusting totalHits assertion logic #16433

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Workload Management] Make query groups persistent across process restarts [#16370](https://github.com/opensearch-project/OpenSearch/pull/16370)
- Fix inefficient Stream API call chains ending with count() ([#15386](https://github.com/opensearch-project/OpenSearch/pull/15386))
- Fix array hashCode calculation in ResyncReplicationRequest ([#16378](https://github.com/opensearch-project/OpenSearch/pull/16378))
- Fix missing fields in task index mapping to ensure proper task result storage ([#16201](https://github.com/opensearch-project/OpenSearch/pull/16201))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,14 @@
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.regex.Regex;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.Streams;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.core.tasks.resourcetracker.TaskResourceStats;
import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage;
import org.opensearch.core.tasks.resourcetracker.TaskThreadUsage;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.index.mapper.StrictDynamicMappingException;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.tasks.Task;
Expand All @@ -73,11 +78,17 @@
import org.opensearch.transport.ReceiveTimeoutTransportException;
import org.opensearch.transport.TransportService;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
Expand All @@ -103,6 +114,8 @@
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.startsWith;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;

/**
* Integration tests for task management API
Expand All @@ -112,6 +125,26 @@
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, minNumDataNodes = 2)
public class TasksIT extends AbstractTasksIT {

protected final TaskInfo taskInfo = new TaskInfo(
new TaskId("fake", 1),
"test_type",
"test_action",
"test_description",
null,
0L,
1L,
false,
false,
TaskId.EMPTY_TASK_ID,
Collections.emptyMap(),
new TaskResourceStats(new HashMap<>() {
{
put("dummy-type1", new TaskResourceUsage(10, 20));
}
}, new TaskThreadUsage(30, 40)),
2L
);

public void testTaskCounts() {
// Run only on data nodes
ListTasksResponse response = client().admin()
Expand Down Expand Up @@ -879,46 +912,77 @@ public void testNodeNotFoundButTaskFound() throws Exception {
// Save a fake task that looks like it is from a node that isn't part of the cluster
CyclicBarrier b = new CyclicBarrier(2);
TaskResultsService resultsService = internalCluster().getInstance(TaskResultsService.class);
resultsService.storeResult(
new TaskResult(
new TaskInfo(
new TaskId("fake", 1),
"test",
"test",
"",
null,
0,
0,
false,
false,
TaskId.EMPTY_TASK_ID,
Collections.emptyMap(),
null
),
new RuntimeException("test")
),
new ActionListener<Void>() {
resultsService.storeResult(new TaskResult(taskInfo, new RuntimeException("test")), new ActionListener<Void>() {
@Override
public void onResponse(Void response) {
try {
b.await();
} catch (InterruptedException | BrokenBarrierException e) {
onFailure(e);
}
}

@Override
public void onFailure(Exception e) {
throw new RuntimeException(e);
}
});
b.await();

// Now we can find it!
GetTaskResponse response = expectFinishedTask(new TaskId("fake:1"));
TaskResult taskResult = response.getTask();
TaskInfo task = taskResult.getTask();

assertEquals("fake", task.getTaskId().getNodeId());
assertEquals(1, task.getTaskId().getId());
assertEquals("test_type", task.getType());
assertEquals("test_action", task.getAction());
assertEquals("test_description", task.getDescription());
assertEquals(0L, task.getStartTime());
assertEquals(1L, task.getRunningTimeNanos());
assertFalse(task.isCancellable());
assertFalse(task.isCancelled());
assertEquals(TaskId.EMPTY_TASK_ID, task.getParentTaskId());
assertEquals(1, task.getResourceStats().getResourceUsageInfo().size());
assertEquals(30, task.getResourceStats().getThreadUsage().getThreadExecutions());
assertEquals(40, task.getResourceStats().getThreadUsage().getActiveThreads());
assertEquals(Long.valueOf(2L), task.getCancellationStartTime());

assertNotNull(taskResult.getError());
assertNull(taskResult.getResponse());
}

public void testStoreTaskResultFailsDueToMissingIndexMappingFields() throws IOException {
// given
TaskResultsService resultsService = spy(internalCluster().getInstance(TaskResultsService.class));

InputStream mockInputStream = getClass().getResourceAsStream("/org/opensearch/tasks/missing-fields-task-index-mapping.json");
ByteArrayOutputStream out = new ByteArrayOutputStream();
Streams.copy(mockInputStream, out);
String mockJsonString = out.toString(StandardCharsets.UTF_8.name());

// when & then
doReturn(mockJsonString).when(resultsService).taskResultIndexMapping();

CompletionException thrown = assertThrows(CompletionException.class, () -> {
CompletableFuture<Void> future = new CompletableFuture<>();

resultsService.storeResult(new TaskResult(taskInfo, new RuntimeException("test")), new ActionListener<Void>() {
@Override
public void onResponse(Void response) {
try {
b.await();
} catch (InterruptedException | BrokenBarrierException e) {
onFailure(e);
}
future.complete(null);
}

@Override
public void onFailure(Exception e) {
throw new RuntimeException(e);
future.completeExceptionally(e);
}
}
);
b.await();
});

// Now we can find it!
GetTaskResponse response = expectFinishedTask(new TaskId("fake:1"));
assertEquals("test", response.getTask().getTask().getAction());
assertNotNull(response.getTask().getError());
assertNull(response.getTask().getResponse());
future.join();
});

assertTrue(thrown.getCause() instanceof StrictDynamicMappingException);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public class TaskResultsService {

public static final String TASK_RESULT_MAPPING_VERSION_META_FIELD = "version";

public static final int TASK_RESULT_MAPPING_VERSION = 4; // must match version in task-index-mapping.json
public static final int TASK_RESULT_MAPPING_VERSION = 5; // must match version in task-index-mapping.json

/**
* The backoff policy to use when saving a task result fails. The total wait
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"_doc" : {
"_meta": {
"version": 4
"version": 5
},
"dynamic" : "strict",
"properties" : {
Expand Down Expand Up @@ -34,6 +34,9 @@
"start_time_in_millis": {
"type": "long"
},
"cancellation_time_millis": {
"type": "long"
},
"type": {
"type": "keyword"
},
Expand All @@ -47,6 +50,10 @@
"headers": {
"type" : "object",
"enabled" : false
},
"resource_stats": {
"type" : "object",
"enabled" : false
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.search.TotalHits.Relation;
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.opensearch.search.internal.SearchContext;
Expand Down Expand Up @@ -175,6 +176,7 @@ public void testApproximateRangeWithSizeOverDefault() throws IOException {
try {
long lower = 0;
long upper = 12000;
long maxHits = 12001;
Query approximateQuery = new ApproximatePointRangeQuery(
"point",
pack(lower).bytes,
Expand All @@ -188,7 +190,13 @@ protected String toString(int dimension, byte[] value) {
};
IndexSearcher searcher = new IndexSearcher(reader);
TopDocs topDocs = searcher.search(approximateQuery, 11000);
assertEquals(topDocs.totalHits, new TotalHits(11000, TotalHits.Relation.EQUAL_TO));

if (topDocs.totalHits.relation == Relation.EQUAL_TO) {
assertEquals(topDocs.totalHits.value, 11000);
} else {
assertTrue(11000 <= topDocs.totalHits.value);
assertTrue(maxHits >= topDocs.totalHits.value);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -226,7 +234,7 @@ protected String toString(int dimension, byte[] value) {
}
};
Query query = LongPoint.newRangeQuery("point", lower, upper);
;

IndexSearcher searcher = new IndexSearcher(reader);
TopDocs topDocs = searcher.search(approximateQuery, 10);
TopDocs topDocs1 = searcher.search(query, 10);
Expand All @@ -235,7 +243,6 @@ protected String toString(int dimension, byte[] value) {
assertNotEquals(topDocs.totalHits, topDocs1.totalHits);
assertEquals(topDocs.totalHits, new TotalHits(10, TotalHits.Relation.EQUAL_TO));
assertEquals(topDocs1.totalHits, new TotalHits(101, TotalHits.Relation.EQUAL_TO));

} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -278,7 +285,7 @@ protected String toString(int dimension, byte[] value) {
}
};
Query query = LongPoint.newRangeQuery("point", lower, upper);
;

IndexSearcher searcher = new IndexSearcher(reader);
Sort sort = new Sort(new SortField("point", SortField.Type.LONG));
TopDocs topDocs = searcher.search(approximateQuery, 10, sort);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
{
"_doc" : {
"_meta": {
"version": 5
},
"dynamic" : "strict",
"properties" : {
"completed": {
"type": "boolean"
},
"task" : {
"properties": {
"action": {
"type": "keyword"
},
"cancellable": {
"type": "boolean"
},
"cancelled": {
"type": "boolean"
},
"id": {
"type": "long"
},
"parent_task_id": {
"type": "keyword"
},
"node": {
"type": "keyword"
},
"running_time_in_nanos": {
"type": "long"
},
"start_time_in_millis": {
"type": "long"
},
"type": {
"type": "keyword"
},
"status": {
"type" : "object",
"enabled" : false
},
"description": {
"type": "text"
},
"headers": {
"type" : "object",
"enabled" : false
}
}
},
"response" : {
"type" : "object",
"enabled" : false
},
"error" : {
"type" : "object",
"enabled" : false
}
}
}
}
Loading