Skip to content

Add support for providing absolute start time to SearchRequest #37142

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
public static final int DEFAULT_PRE_FILTER_SHARD_SIZE = 128;
public static final int DEFAULT_BATCHED_REDUCE_SIZE = 512;

private static final long DEFAULT_ABSOLUTE_START_MILLIS = -1;

private final String localClusterAlias;
private final long absoluteStartMillis;

private SearchType searchType = SearchType.DEFAULT;

Expand Down Expand Up @@ -95,6 +98,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest

public SearchRequest() {
this.localClusterAlias = null;
this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
}

/**
Expand All @@ -115,6 +119,7 @@ public SearchRequest(SearchRequest searchRequest) {
this.source = searchRequest.source;
this.types = searchRequest.types;
this.localClusterAlias = searchRequest.localClusterAlias;
this.absoluteStartMillis = searchRequest.absoluteStartMillis;
}

/**
Expand All @@ -138,12 +143,17 @@ public SearchRequest(String[] indices, SearchSourceBuilder source) {
}

/**
* Creates a new search request by providing the alias of the cluster where it will be executed. Used when a {@link SearchRequest}
* is created and executed as part of a cross-cluster search request performing local reduction on each cluster.
* The coordinating CCS node provides the alias to prefix index names with in the returned search results.
* Creates a new search request by providing the alias of the cluster where it will be executed, as well as the current time in
* milliseconds from the epoch time. Used when a {@link SearchRequest} is created and executed as part of a cross-cluster search
* request performing local reduction on each cluster. The coordinating CCS node provides the alias to prefix index names with in
* the returned search results, and the current time to be used on the remote clusters to ensure that the same value is used.
*/
SearchRequest(String localClusterAlias) {
SearchRequest(String localClusterAlias, long absoluteStartMillis) {
this.localClusterAlias = Objects.requireNonNull(localClusterAlias, "cluster alias must not be null");
if (absoluteStartMillis < 0) {
throw new IllegalArgumentException("absoluteStartMillis must not be negative but was [" + absoluteStartMillis + "]");
}
this.absoluteStartMillis = absoluteStartMillis;
}

/**
Expand All @@ -155,10 +165,7 @@ public SearchRequest(String[] indices, SearchSourceBuilder source) {
public SearchRequest(StreamInput in) throws IOException {
super(in);
searchType = SearchType.fromId(in.readByte());
indices = new String[in.readVInt()];
for (int i = 0; i < indices.length; i++) {
indices[i] = in.readString();
}
indices = in.readStringArray();
routing = in.readOptionalString();
preference = in.readOptionalString();
scroll = in.readOptionalWriteable(Scroll::new);
Expand All @@ -175,19 +182,22 @@ public SearchRequest(StreamInput in) throws IOException {
//TODO update version after backport
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
localClusterAlias = in.readOptionalString();
if (localClusterAlias != null) {
absoluteStartMillis = in.readVLong();
} else {
absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
}
} else {
localClusterAlias = null;
absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeByte(searchType.id());
out.writeVInt(indices.length);
for (String index : indices) {
out.writeString(index);
}
out.writeStringArray(indices);
out.writeOptionalString(routing);
out.writeOptionalString(preference);
out.writeOptionalWriteable(scroll);
Expand All @@ -204,6 +214,9 @@ public void writeTo(StreamOutput out) throws IOException {
//TODO update version after backport
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeOptionalString(localClusterAlias);
if (localClusterAlias != null) {
out.writeVLong(absoluteStartMillis);
}
}
}

Expand Down Expand Up @@ -243,6 +256,17 @@ String getLocalClusterAlias() {
return localClusterAlias;
}

/**
* Returns the current time in milliseconds from the time epoch, to be used for the execution of this search request. Used to
* ensure that the same value, determined by the coordinating node, is used on all nodes involved in the execution of the search
* request. When created through {@link #SearchRequest(String, long)}, this method returns the provided current time, otherwise
* it will return {@link System#currentTimeMillis()}.
*
*/
long getOrCreateAbsoluteStartMillis() {
return absoluteStartMillis == DEFAULT_ABSOLUTE_START_MILLIS ? System.currentTimeMillis() : absoluteStartMillis;
}

/**
* Sets the indices the search will be executed on.
*/
Expand Down Expand Up @@ -435,7 +459,6 @@ public Boolean allowPartialSearchResults() {
return this.allowPartialSearchResults;
}


/**
* Sets the number of shard results that should be reduced at once on the coordinating node. This value should be used as a protection
* mechanism to reduce the memory overhead per search request if the potential number of shards in the request can be large.
Expand Down Expand Up @@ -498,13 +521,6 @@ public int getPreFilterShardSize() {
return preFilterShardSize;
}

/**
* Returns <code>true</code> iff the maxConcurrentShardRequest is set.
*/
boolean isMaxConcurrentShardRequestsSet() {
return maxConcurrentShardRequests != 0;
}

/**
* @return true if the request only has suggest
*/
Expand Down Expand Up @@ -538,7 +554,7 @@ public String getDescription() {
}

@Override
public void readFrom(StreamInput in) throws IOException {
public void readFrom(StreamInput in) {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}

Expand All @@ -564,14 +580,15 @@ public boolean equals(Object o) {
Objects.equals(preFilterShardSize, that.preFilterShardSize) &&
Objects.equals(indicesOptions, that.indicesOptions) &&
Objects.equals(allowPartialSearchResults, that.allowPartialSearchResults) &&
Objects.equals(localClusterAlias, that.localClusterAlias);
Objects.equals(localClusterAlias, that.localClusterAlias) &&
absoluteStartMillis == that.absoluteStartMillis;
}

@Override
public int hashCode() {
return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache,
scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize,
allowPartialSearchResults, localClusterAlias);
allowPartialSearchResults, localClusterAlias, absoluteStartMillis);
}

@Override
Expand All @@ -590,6 +607,7 @@ public String toString() {
", preFilterShardSize=" + preFilterShardSize +
", allowPartialSearchResults=" + allowPartialSearchResults +
", localClusterAlias=" + localClusterAlias +
", getOrCreateAbsoluteStartMillis=" + absoluteStartMillis +
", source=" + source + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,9 @@ long getRelativeCurrentNanos() {

@Override
protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
final long absoluteStartMillis = System.currentTimeMillis();
final long relativeStartNanos = System.nanoTime();
final SearchTimeProvider timeProvider =
new SearchTimeProvider(absoluteStartMillis, relativeStartNanos, System::nanoTime);
new SearchTimeProvider(searchRequest.getOrCreateAbsoluteStartMillis(), relativeStartNanos, System::nanoTime);
ActionListener<SearchSourceBuilder> rewriteListener = ActionListener.wrap(source -> {
if (source != searchRequest.source()) {
// only set it if it changed - we don't allow null values to be set but it might be already null be we want to catch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
import java.util.List;

import static org.elasticsearch.test.EqualsHashCodeTestUtils.checkEqualsAndHashCode;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

public class SearchRequestTests extends AbstractSearchTestCase {

Expand All @@ -48,12 +51,19 @@ protected SearchRequest createSearchRequest() throws IOException {
if (randomBoolean()) {
return super.createSearchRequest();
}
//clusterAlias does not have public getter/setter hence we randomize it only in this test specifically.
SearchRequest searchRequest = new SearchRequest(randomAlphaOfLengthBetween(5, 10));
//clusterAlias and absoluteStartMillis do not have public getters/setters hence we randomize them only in this test specifically.
SearchRequest searchRequest = new SearchRequest(randomAlphaOfLengthBetween(5, 10), randomNonNegativeLong());
RandomSearchRequestGenerator.randomSearchRequest(searchRequest, this::createSearchSourceBuilder);
return searchRequest;
}

public void testClusterAliasValidation() {
expectThrows(NullPointerException.class, () -> new SearchRequest(null, 0));
expectThrows(IllegalArgumentException.class, () -> new SearchRequest("", -1));
SearchRequest searchRequest = new SearchRequest("", 0);
assertNull(searchRequest.validate());
}

public void testSerialization() throws Exception {
SearchRequest searchRequest = createSearchRequest();
SearchRequest deserializedRequest = copyWriteable(searchRequest, namedWriteableRegistry, SearchRequest::new);
Expand All @@ -69,22 +79,32 @@ public void testClusterAliasSerialization() throws IOException {
//TODO update version after backport
if (version.before(Version.V_7_0_0)) {
assertNull(deserializedRequest.getLocalClusterAlias());
assertAbsoluteStartMillisIsCurrentTime(deserializedRequest);
} else {
assertEquals(searchRequest.getLocalClusterAlias(), deserializedRequest.getLocalClusterAlias());
assertEquals(searchRequest.getOrCreateAbsoluteStartMillis(), deserializedRequest.getOrCreateAbsoluteStartMillis());
}
}

//TODO rename and update version after backport
public void testReadFromPre7_0_0() throws IOException {
String msg = "AAEBBWluZGV4AAAAAQACAAAA/////w8AAAAAAAAA/////w8AAAAAAAACAAAAAAABAAMCBAUBAAKABACAAQIAAA==";
try (StreamInput in = StreamInput.wrap(Base64.getDecoder().decode(msg))) {
in.setVersion(Version.V_6_6_0);
in.setVersion(VersionUtils.randomVersionBetween(random(), Version.V_6_4_0, VersionUtils.getPreviousVersion(Version.V_7_0_0)));
SearchRequest searchRequest = new SearchRequest(in);
assertArrayEquals(new String[]{"index"}, searchRequest.indices());
assertNull(searchRequest.getLocalClusterAlias());
assertAbsoluteStartMillisIsCurrentTime(searchRequest);
}
}

private static void assertAbsoluteStartMillisIsCurrentTime(SearchRequest searchRequest) {
long before = System.currentTimeMillis();
long absoluteStartMillis = searchRequest.getOrCreateAbsoluteStartMillis();
long after = System.currentTimeMillis();
assertThat(absoluteStartMillis, allOf(greaterThanOrEqualTo(before), lessThanOrEqualTo(after)));
}

public void testIllegalArguments() {
SearchRequest searchRequest = new SearchRequest();
assertNotNull(searchRequest.indices());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@

import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESSingleNodeTestCase;

public class TransportSearchActionSingleNodeTests extends ESSingleNodeTestCase {

public void testLocalClusterAlias() {
long nowInMillis = System.currentTimeMillis();
IndexRequest indexRequest = new IndexRequest("test");
indexRequest.id("1");
indexRequest.source("field", "value");
Expand All @@ -37,7 +41,7 @@ public void testLocalClusterAlias() {
assertEquals(RestStatus.CREATED, indexResponse.status());

{
SearchRequest searchRequest = new SearchRequest("local");
SearchRequest searchRequest = new SearchRequest("local", nowInMillis);
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(1, searchResponse.getHits().getTotalHits().value);
SearchHit[] hits = searchResponse.getHits().getHits();
Expand All @@ -48,7 +52,7 @@ public void testLocalClusterAlias() {
assertEquals("1", hit.getId());
}
{
SearchRequest searchRequest = new SearchRequest("");
SearchRequest searchRequest = new SearchRequest("", nowInMillis);
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(1, searchResponse.getHits().getTotalHits().value);
SearchHit[] hits = searchResponse.getHits().getHits();
Expand All @@ -59,4 +63,58 @@ public void testLocalClusterAlias() {
assertEquals("1", hit.getId());
}
}

public void testAbsoluteStartMillis() {
{
IndexRequest indexRequest = new IndexRequest("test-1970.01.01");
indexRequest.id("1");
indexRequest.source("date", "1970-01-01");
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
IndexResponse indexResponse = client().index(indexRequest).actionGet();
assertEquals(RestStatus.CREATED, indexResponse.status());
}
{
IndexRequest indexRequest = new IndexRequest("test-1982.01.01");
indexRequest.id("1");
indexRequest.source("date", "1982-01-01");
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
IndexResponse indexResponse = client().index(indexRequest).actionGet();
assertEquals(RestStatus.CREATED, indexResponse.status());
}
{
SearchRequest searchRequest = new SearchRequest();
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(2, searchResponse.getHits().getTotalHits().value);
}
{
SearchRequest searchRequest = new SearchRequest("<test-{now/d}>");
searchRequest.indicesOptions(IndicesOptions.fromOptions(true, true, true, true));
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(0, searchResponse.getTotalShards());
}
{
SearchRequest searchRequest = new SearchRequest("", 0);
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(2, searchResponse.getHits().getTotalHits().value);
}
{
SearchRequest searchRequest = new SearchRequest("", 0);
searchRequest.indices("<test-{now/d}>");
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(1, searchResponse.getHits().getTotalHits().value);
assertEquals("test-1970.01.01", searchResponse.getHits().getHits()[0].getIndex());
}
{
SearchRequest searchRequest = new SearchRequest("", 0);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
RangeQueryBuilder rangeQuery = new RangeQueryBuilder("date");
rangeQuery.gte("1970-01-01");
rangeQuery.lt("1982-01-01");
sourceBuilder.query(rangeQuery);
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = client().search(searchRequest).actionGet();
assertEquals(1, searchResponse.getHits().getTotalHits().value);
assertEquals("test-1970.01.01", searchResponse.getHits().getHits()[0].getIndex());
}
}
}