Skip to content

Commit

Permalink
elastic: Add DataStream#getIndicesPastRetention (#94194)
Browse files Browse the repository at this point in the history
Commit: b0aa7b8d4f30e47d5797f044ad6b3c5cb1145377
  • Loading branch information
Andrei Dan authored and sourcegraph-bot committed Mar 6, 2023
1 parent 0d48827 commit e43ae1f
Show file tree
Hide file tree
Showing 4 changed files with 444 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.scheduler.SchedulerEngine;
Expand Down Expand Up @@ -176,8 +177,7 @@ void run(ClusterState state) {
}

private void maybeExecuteRollover(ClusterState state, DataStream dataStream) {
IndexMetadata writeIndex = state.metadata().index(dataStream.getWriteIndex());
if (writeIndex != null && isManagedByDLM(dataStream, writeIndex)) {
if (dataStream.isIndexManagedByDLM(dataStream.getWriteIndex(), state.metadata()::index)) {
RolloverRequest rolloverRequest = defaultRolloverRequestSupplier.apply(dataStream.getName());
transportActionsDeduplicator.executeOnce(
rolloverRequest,
Expand All @@ -190,47 +190,29 @@ private void maybeExecuteRollover(ClusterState state, DataStream dataStream) {
private void maybeExecuteRetention(ClusterState state, DataStream dataStream) {
TimeValue retention = getRetentionConfiguration(dataStream);
if (retention != null) {
List<Index> backingIndices = dataStream.getIndices();
// we'll look at the current write index in the next run if it's rolled over (and not the write index anymore)
for (int i = 0; i < backingIndices.size() - 1; i++) {
IndexMetadata backingIndex = state.metadata().index(backingIndices.get(i));
if (backingIndex == null || isManagedByDLM(dataStream, backingIndex) == false) {
continue;
}
Metadata metadata = state.metadata();
List<Index> backingIndicesOlderThanRetention = dataStream.getIndicesPastRetention(metadata::index, nowSupplier);

if (isTimeToBeDeleted(dataStream.getName(), backingIndex, nowSupplier, retention)) {
// there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request)
// let's start simple and reevaluate
DeleteIndexRequest deleteRequest = new DeleteIndexRequest(backingIndex.getIndex().getName()).masterNodeTimeout(
TimeValue.MAX_VALUE
);
for (Index index : backingIndicesOlderThanRetention) {
IndexMetadata backingIndex = metadata.index(index);
assert backingIndex != null : "the data stream backing indices must exist";

// time to delete the index
transportActionsDeduplicator.executeOnce(
deleteRequest,
ActionListener.noop(),
(req, reqListener) -> deleteIndex(deleteRequest, retention, reqListener)
);
}
// there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request)
// let's start simple and reevaluate
DeleteIndexRequest deleteRequest = new DeleteIndexRequest(backingIndex.getIndex().getName()).masterNodeTimeout(
TimeValue.MAX_VALUE
);

// time to delete the index
transportActionsDeduplicator.executeOnce(
deleteRequest,
ActionListener.noop(),
(req, reqListener) -> deleteIndex(deleteRequest, retention, reqListener)
);
}
}
}

/**
* Checks if the provided index is ready to be deleted according to the configured retention.
*/
static boolean isTimeToBeDeleted(
String dataStreamName,
IndexMetadata backingIndex,
LongSupplier nowSupplier,
TimeValue configuredRetention
) {
TimeValue indexLifecycleDate = getCreationOrRolloverDate(dataStreamName, backingIndex);

long nowMillis = nowSupplier.getAsLong();
return nowMillis >= indexLifecycleDate.getMillis() + configuredRetention.getMillis();
}

private void rolloverDataStream(RolloverRequest rolloverRequest, ActionListener<Void> listener) {
// "saving" the rollover target name here so we don't capture the entire request
String rolloverTarget = rolloverRequest.getRolloverTarget();
Expand Down Expand Up @@ -306,15 +288,6 @@ static TimeValue getCreationOrRolloverDate(String rolloverTarget, IndexMetadata
}
}

/**
* This is quite a shallow method but the purpose of its existence is to have only one place to modify once we
* introduce the index.lifecycle.prefer_ilm setting. Once the prefer_ilm setting exists the method will also
* make more sense as it will encapsulate a bit more logic.
*/
private static boolean isManagedByDLM(DataStream parentDataStream, IndexMetadata indexMetadata) {
return indexMetadata.getLifecyclePolicyName() == null && parentDataStream.getLifecycle() != null;
}

private RolloverRequest getDefaultRolloverRequest(String dataStream) {
RolloverRequest rolloverRequest = new RolloverRequest(dataStream, null).masterNodeTimeout(TimeValue.MAX_VALUE);
rolloverRequest.setConditions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void setupServices() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, builtInClusterSettings);
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool, clusterSettings);

now = randomNonNegativeLong();
now = System.currentTimeMillis();
Clock clock = Clock.fixed(Instant.ofEpochMilli(now), ZoneId.of(randomFrom(ZoneId.getAvailableZoneIds())));
clientSeenRequests = new CopyOnWriteArrayList<>();

Expand Down Expand Up @@ -192,50 +192,6 @@ public void testDataStreamsWithoutLifecycleAreSkipped() {
assertThat(clientSeenRequests.isEmpty(), is(true));
}

public void testIsTimeToBeDeleted() {
String dataStreamName = "metrics-foo";
{
IndexMetadata.Builder indexMetaBuilder = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, 1))
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.creationDate(now - 3000L);
MaxAgeCondition rolloverCondition = new MaxAgeCondition(TimeValue.timeValueMillis(now - 2000L));
indexMetaBuilder.putRolloverInfo(new RolloverInfo(dataStreamName, List.of(rolloverCondition), now - 2000L));

IndexMetadata rolledIndex = indexMetaBuilder.build();
assertThat(
DataLifecycleService.isTimeToBeDeleted(dataStreamName, rolledIndex, () -> now, TimeValue.timeValueMillis(1000)),
is(true)
);

assertThat(
DataLifecycleService.isTimeToBeDeleted(dataStreamName, rolledIndex, () -> now, TimeValue.timeValueMillis(5000)),
is(false)
);
}

{
// if rollover info is missing the creation date should be used
IndexMetadata.Builder indexMetaBuilder = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, 1))
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.creationDate(now - 3000L);
IndexMetadata noRolloverIndex = indexMetaBuilder.build();

assertThat(
DataLifecycleService.isTimeToBeDeleted(dataStreamName, noRolloverIndex, () -> now, TimeValue.timeValueMillis(2000)),
is(true)
);

assertThat(
DataLifecycleService.isTimeToBeDeleted(dataStreamName, noRolloverIndex, () -> now, TimeValue.timeValueMillis(5000)),
is(false)
);
}
}

private DataStream createDataStream(
Metadata.Builder builder,
String dataStreamName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.lucene.index.PointValues;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.admin.indices.rollover.RolloverInfo;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.SimpleDiffable;
import org.elasticsearch.common.Strings;
Expand All @@ -20,6 +21,7 @@
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexMode;
Expand All @@ -44,6 +46,7 @@
import java.util.Set;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Predicate;

public final class DataStream implements SimpleDiffable<DataStream>, ToXContentObject {

Expand Down Expand Up @@ -284,7 +287,7 @@ public boolean isHidden() {

/**
* Determines whether this data stream is replicated from elsewhere,
* for example a remote cluster.
* for example a remote cluster
*
* @return Whether this data stream is replicated.
*/
Expand Down Expand Up @@ -572,6 +575,104 @@ public DataStream snapshot(Collection<String> indicesInSnapshot) {
);
}

/**
* Iterate over the backing indices and return the ones that are managed by DLM and past the configured
* retention in their lifecycle.
* NOTE that this specifically does not return the write index of the data stream as usually retention
* is treated differently for the write index (i.e. they first need to be rolled over)
*/
public List<Index> getIndicesPastRetention(Function<String, IndexMetadata> indexMetadataSupplier, LongSupplier nowSupplier) {
if (lifecycle == null || lifecycle.getDataRetention() == null) {
return List.of();
}

List<Index> indicesPastRetention = getIndicesOlderThan(
lifecycle.getDataRetention(),
indexMetadataSupplier,
this::isIndexManagedByDLM,
nowSupplier
);
// when it comes to executing retention the write index should be excluded (a data stream must always have a write index)
indicesPastRetention.remove(getWriteIndex());
return indicesPastRetention;
}

/**
* Returns the backing indices that are older than the provided age.
* The index age is calculated from the rollover or index creation date.
* Note that the write index is also evaluated and could be returned in the list
* of results.
* If an indices predicate is provided the returned list of indices will be filtered
* according to the predicate definition. This is useful for things like "return only
* the backing indices that are managed by DLM".
*/
public List<Index> getIndicesOlderThan(
TimeValue age,
Function<String, IndexMetadata> indexMetadataSupplier,
@Nullable Predicate<IndexMetadata> indicesPredicate,
LongSupplier nowSupplier
) {
List<Index> olderIndices = new ArrayList<>();
for (Index index : indices) {
IndexMetadata indexMetadata = indexMetadataSupplier.apply(index.getName());
if (indexMetadata == null) {
// we would normally throw exception in a situation like this however, this is meant to be a helper method
// so let's ignore deleted indices
continue;
}
TimeValue indexLifecycleDate = getCreationOrRolloverDate(name, indexMetadata);
long nowMillis = nowSupplier.getAsLong();
if (nowMillis >= indexLifecycleDate.getMillis() + age.getMillis()) {
if (indicesPredicate == null || indicesPredicate.test(indexMetadata)) {
olderIndices.add(index);
}
}
}
return olderIndices;
}

/**
* Checks if the provided backing index is managed by DLM as part of this data stream.
* If the index is not a backing index of this data stream, or we cannot supply its metadata
* we return false.
*/
public boolean isIndexManagedByDLM(Index index, Function<String, IndexMetadata> indexMetadataSupplier) {
if (indices.contains(index) == false) {
return false;
}
IndexMetadata indexMetadata = indexMetadataSupplier.apply(index.getName());
if (indexMetadata == null) {
// the index was deleted
return false;
}
return isIndexManagedByDLM(indexMetadata);
}

/**
* This is the raw defintion of an index being managed by DLM. It's currently quite a shallow method
* but more logic will land here once we'll have a setting to control if ILM takes precedence or not.
* This method also skips any validation to make sure the index is part of this data stream, hence the private
* access method.
*/
private boolean isIndexManagedByDLM(IndexMetadata indexMetadata) {
return indexMetadata.getLifecyclePolicyName() == null && lifecycle != null;
}

/**
* Returns the rollover or creation date for the provided index.
* We look for the rollover information for the provided data stream name as the
* rollover target. If the index has not been rolled over for the provided
* data stream name we return the index creation date.
*/
static TimeValue getCreationOrRolloverDate(String dataStreamName, IndexMetadata index) {
RolloverInfo rolloverInfo = index.getRolloverInfos().get(dataStreamName);
if (rolloverInfo != null) {
return TimeValue.timeValueMillis(rolloverInfo.getTime());
} else {
return TimeValue.timeValueMillis(index.getCreationDate());
}
}

/**
* Generates the name of the index that conforms to the default naming convention for backing indices
* on data streams given the specified data stream name and generation and the current system time.
Expand Down
Loading

0 comments on commit e43ae1f

Please sign in to comment.