Skip to content

Allow regular data streams to be migrated to tsdb data streams. #83843

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,27 @@
package org.elasticsearch.datastreams;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.time.FormatNames;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.rest.yaml.ObjectPath;

import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Map;

import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.backingIndexEqualTo;
import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

public class TsdbDataStreamRestIT extends ESRestTestCase {

Expand Down Expand Up @@ -84,6 +88,57 @@ public class TsdbDataStreamRestIT extends ESRestTestCase {
}
}""";

private static final String NON_TSDB_TEMPLATE = """
{
"index_patterns": ["k8s*"],
"template": {
"settings":{
"index": {
"number_of_replicas": 0,
"number_of_shards": 2
}
},
"mappings":{
"properties": {
"@timestamp" : {
"type": "date"
},
"metricset": {
"type": "keyword"
},
"k8s": {
"properties": {
"pod": {
"properties": {
"uid": {
"type": "keyword"
},
"name": {
"type": "keyword"
},
"ip": {
"type": "ip"
},
"network": {
"properties": {
"tx": {
"type": "long"
},
"rx": {
"type": "long"
}
}
}
}
}
}
}
}
}
},
"data_stream": {}
}""";

private static final String DOC = """
{
"@timestamp": "$time",
Expand Down Expand Up @@ -235,6 +290,82 @@ public void testSubsequentRollovers() throws Exception {
}
}

public void testMigrateRegularDataStreamToTsdbDataStream() throws Exception {
// Create a non tsdb template
var putComposableIndexTemplateRequest = new Request("POST", "/_index_template/1");
putComposableIndexTemplateRequest.setJsonEntity(NON_TSDB_TEMPLATE);
assertOK(client().performRequest(putComposableIndexTemplateRequest));

// Index a few docs and sometimes rollover
int numRollovers = 4;
int numDocs = 32;
var currentTime = Instant.now();
var currentMinus30Days = currentTime.minus(30, ChronoUnit.DAYS);
for (int i = 0; i < numRollovers; i++) {
for (int j = 0; j < numDocs; j++) {
var indexRequest = new Request("POST", "/k8s/_doc");
var time = Instant.ofEpochMilli(randomLongBetween(currentMinus30Days.toEpochMilli(), currentTime.toEpochMilli()));
indexRequest.setJsonEntity(DOC.replace("$time", formatInstant(time)));
var response = client().performRequest(indexRequest);
assertOK(response);
var responseBody = entityAsMap(response);
// i rollovers and +1 offset:
assertThat((String) responseBody.get("_index"), backingIndexEqualTo("k8s", i + 1));
}
var rolloverRequest = new Request("POST", "/k8s/_rollover");
var rolloverResponse = client().performRequest(rolloverRequest);
assertOK(rolloverResponse);
var rolloverResponseBody = entityAsMap(rolloverResponse);
assertThat(rolloverResponseBody.get("rolled_over"), is(true));
}

var getDataStreamsRequest = new Request("GET", "/_data_stream");
var getDataStreamResponse = client().performRequest(getDataStreamsRequest);
assertOK(getDataStreamResponse);
var dataStreams = entityAsMap(getDataStreamResponse);
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.name"), equalTo("k8s"));
assertThat(ObjectPath.evaluate(dataStreams, "data_streams.0.generation"), equalTo(5));
for (int i = 0; i < 5; i++) {
String backingIndex = ObjectPath.evaluate(dataStreams, "data_streams.0.indices." + i + ".index_name");
assertThat(backingIndex, backingIndexEqualTo("k8s", i + 1));
var indices = getIndex(backingIndex);
var escapedBackingIndex = backingIndex.replace(".", "\\.");
assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".data_stream"), equalTo("k8s"));
assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.mode"), nullValue());
assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.start_time"), nullValue());
assertThat(ObjectPath.evaluate(indices, escapedBackingIndex + ".settings.index.time_series.end_time"), nullValue());
}

// Update template
putComposableIndexTemplateRequest = new Request("POST", "/_index_template/1");
putComposableIndexTemplateRequest.setJsonEntity(TEMPLATE);
assertOK(client().performRequest(putComposableIndexTemplateRequest));

var rolloverRequest = new Request("POST", "/k8s/_rollover");
var rolloverResponse = client().performRequest(rolloverRequest);
assertOK(rolloverResponse);
var rolloverResponseBody = entityAsMap(rolloverResponse);
assertThat(rolloverResponseBody.get("rolled_over"), is(true));
var newIndex = (String) rolloverResponseBody.get("new_index");
assertThat(newIndex, backingIndexEqualTo("k8s", 6));

// Ingest documents that will land in the new tsdb backing index:
for (int i = 0; i < numDocs; i++) {
var indexRequest = new Request("POST", "/k8s/_doc");
indexRequest.setJsonEntity(DOC.replace("$time", formatInstant(currentTime)));
var response = client().performRequest(indexRequest);
assertOK(response);
var responseBody = entityAsMap(response);
assertThat((String) responseBody.get("_index"), backingIndexEqualTo("k8s", 6));
}

// Fail if documents target older non tsdb backing index:
var indexRequest = new Request("POST", "/k8s/_doc");
indexRequest.setJsonEntity(DOC.replace("$time", formatInstant(currentMinus30Days)));
var e = expectThrows(ResponseException.class, () -> client().performRequest(indexRequest));
assertThat(e.getMessage(), containsString("is outside of ranges of currently writable indices"));
}

private static Map<?, ?> getIndex(String indexName) throws IOException {
var getIndexRequest = new Request("GET", "/" + indexName + "?human");
var response = client().performRequest(getIndexRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,16 @@ public Settings getAdditionalIndexSettings(
) {
if (dataStreamName != null) {
DataStream dataStream = metadata.dataStreams().get(dataStreamName);
// First backing index is created and then data stream is rolled over (in a single cluster state update).
// So at this point we can't check index_mode==time_series,
// so checking that index_mode==null|standard and templateIndexMode == TIME_SERIES
boolean migrating = dataStream != null
&& (dataStream.getIndexMode() == null || dataStream.getIndexMode() == IndexMode.STANDARD)
&& templateIndexMode == IndexMode.TIME_SERIES;
IndexMode indexMode;
if (dataStream != null) {
if (migrating) {
indexMode = IndexMode.TIME_SERIES;
} else if (dataStream != null) {
indexMode = dataStream.getIndexMode();
} else {
indexMode = templateIndexMode;
Expand All @@ -50,7 +58,7 @@ public Settings getAdditionalIndexSettings(
TimeValue lookAheadTime = IndexSettings.LOOK_AHEAD_TIME.get(allSettings);
final Instant start;
final Instant end;
if (dataStream == null) {
if (dataStream == null || migrating) {
start = resolvedAt.minusMillis(lookAheadTime.getMillis());
end = resolvedAt.plusMillis(lookAheadTime.getMillis());
} else {
Expand Down
Loading