diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardPersistentTaskState.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardPersistentTaskState.java index 2752d07ff770c..45885507b4af2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardPersistentTaskState.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupShardPersistentTaskState.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core.rollup.action; +import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.persistent.PersistentTaskState; @@ -21,7 +22,7 @@ * @param rollupShardIndexerStatus An instance of {@link RollupShardIndexerStatus} with the rollupShardIndexerStatus of the rollup task * @param tsid The latest successfully processed tsid component of a tuple (tsid, timestamp) */ -public record RollupShardPersistentTaskState(RollupShardIndexerStatus rollupShardIndexerStatus, String tsid) +public record RollupShardPersistentTaskState(RollupShardIndexerStatus rollupShardIndexerStatus, BytesRef tsid) implements PersistentTaskState { @@ -32,11 +33,11 @@ public record RollupShardPersistentTaskState(RollupShardIndexerStatus rollupShar private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( NAME, true, - args -> new RollupShardPersistentTaskState((RollupShardIndexerStatus) args[0], (String) args[1]) + args -> new RollupShardPersistentTaskState((RollupShardIndexerStatus) args[0], (BytesRef) args[1]) ); public RollupShardPersistentTaskState(final StreamInput in) throws IOException { - this(RollupShardIndexerStatus.readFromStream(in), in.readString()); + this(RollupShardIndexerStatus.readFromStream(in), in.readBytesRef()); } @Override @@ -55,7 +56,7 @@ public String getWriteableName() { @Override public void writeTo(StreamOutput out) throws IOException { rollupShardIndexerStatus.writeTo(out); - out.writeString(tsid); + out.writeBytesRef(tsid); } @Override @@ -82,7 +83,7 @@ public boolean failed() { } public static RollupShardPersistentTaskState readFromStream(final StreamInput in) throws IOException { - return new RollupShardPersistentTaskState(RollupShardIndexerStatus.readFromStream(in), in.readString()); + return new RollupShardPersistentTaskState(RollupShardIndexerStatus.readFromStream(in), in.readBytesRef()); } public static RollupShardPersistentTaskState fromXContent(final XContentParser parser) { diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java index aa38ac9e56ec2..4d47b60ac154f 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardIndexer.java @@ -8,6 +8,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.document.SortedSetDocValuesField; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.MatchNoDocsQuery; @@ -37,9 +38,7 @@ import org.elasticsearch.index.mapper.DateFieldMapper; import org.elasticsearch.index.mapper.DocCountFieldMapper; import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; -import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.SearchExecutionContext; -import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.DocValueFormat; @@ -145,12 +144,20 @@ class RollupShardIndexer { } public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOException { + final Query initialStateQuery = createQuery(); + if (initialStateQuery instanceof MatchNoDocsQuery) { + return new DownsampleIndexerAction.ShardDownsampleResponse(indexShard.shardId(), task.getNumIndexed()); + } long startTime = client.threadPool().relativeTimeInMillis(); task.setTotalShardDocCount(searcher.getDirectoryReader().numDocs()); task.setRollupShardIndexerStatus(RollupShardIndexerStatus.STARTED); task.updatePersistentTaskState( new RollupShardPersistentTaskState(RollupShardIndexerStatus.STARTED, null), - ActionListener.wrap(response -> {}, e -> { + ActionListener.wrap(response -> { + final RollupShardTaskParams params = (RollupShardTaskParams) response.getParams(); + logger.info("Downsampling task [" + task.getPersistentTaskId() + " on shard " + params.shardId() + " started"); + }, e -> { + logger.error("Updating downsampling task state for [" + task.getPersistentTaskId() + " failed (STARTED)", e); throw new ElasticsearchException("Error while updating downsampling persistent task state", e); }) ); @@ -159,7 +166,7 @@ public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOExcept final TimeSeriesIndexSearcher timeSeriesSearcher = new TimeSeriesIndexSearcher(searcher, List.of(this::checkCancelled)); TimeSeriesBucketCollector bucketCollector = new TimeSeriesBucketCollector(bulkProcessor); bucketCollector.preCollection(); - timeSeriesSearcher.search(createQuery(), bucketCollector); + timeSeriesSearcher.search(initialStateQuery, bucketCollector); bucketCollector.postCollection(); } @@ -178,9 +185,12 @@ public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOExcept task.updatePersistentTaskState( new RollupShardPersistentTaskState(RollupShardIndexerStatus.FAILED, null), ActionListener.wrap(response -> { + final RollupShardTaskParams params = (RollupShardTaskParams) response.getParams(); + logger.info("Downsampling task [" + task.getPersistentTaskId() + " on shard " + params.shardId() + " failed"); task.markAsFailed(new ElasticsearchException("Invalid number of indexed documents")); }, e -> { task.markAsFailed(e); + logger.error("Updating downsampling task state for [" + task.getPersistentTaskId() + " failed (FAILED)"); throw new ElasticsearchException("Error while updating downsampling persistent task state", e); }) ); @@ -198,7 +208,21 @@ public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOExcept if (task.getNumFailed() > 0) { task.updatePersistentTaskState( new RollupShardPersistentTaskState(RollupShardIndexerStatus.FAILED, null), - ActionListener.wrap(response -> task.setRollupShardIndexerStatus(RollupShardIndexerStatus.FAILED), e -> { + ActionListener.wrap(response -> { + final RollupShardTaskParams params = (RollupShardTaskParams) response.getParams(); + logger.info( + "Downsampling task [" + + task.getPersistentTaskId() + + " on shard [" + + params.shardId() + + "] failed indexing [" + + task.getNumFailed() + + "] documents" + ); + task.markAsFailed(new ElasticsearchException("Invalid number of indexed documents")); + }, e -> { + task.markAsFailed(e); + logger.error("Updating downsampling task state for [" + task.getPersistentTaskId() + " failed (FAILED)"); throw new ElasticsearchException("Error while updating downsampling persistent task state", e); }) ); @@ -217,9 +241,12 @@ public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOExcept task.updatePersistentTaskState( new RollupShardPersistentTaskState(RollupShardIndexerStatus.COMPLETED, null), ActionListener.wrap(response -> { + final RollupShardTaskParams params = (RollupShardTaskParams) response.getParams(); + logger.info("Downsampling task [" + task.getPersistentTaskId() + " on shard " + params.shardId() + " completed"); task.markAsCompleted(); }, e -> { task.markAsFailed(e); + logger.error("Updating downsampling task state for [" + task.getPersistentTaskId() + " failed (COMPLETED)"); throw new ElasticsearchException("error while updating downsampling persistent task state", e); }) ); @@ -237,19 +264,7 @@ private Query createQuery() throws IOException { null, Collections.emptyMap() ); - return new BoolQueryBuilder().filter(new TermQueryBuilder(TimeSeriesIdFieldMapper.NAME, this.state.tsid())) - .toQuery(searchExecutionContext); - } else if (this.state.done()) { - // NOTE: resuming a task expected to be completed (successfully or unsuccessfully) should not really happen. - // We make sure we do nothing if it ever happens. - logger.warn( - "Resuming a downsampling task expected to be completed. Last processed entry [" - + this.state.tsid() - + "]. Status [" - + this.state.rollupShardIndexerStatus() - + "]" - ); - return new MatchNoDocsQuery(); + return SortedSetDocValuesField.newSlowRangeQuery(TimeSeriesIdFieldMapper.NAME, this.state.tsid(), null, true, false); } return new MatchAllDocsQuery(); } @@ -267,9 +282,12 @@ private void checkCancelled() { task.updatePersistentTaskState( new RollupShardPersistentTaskState(RollupShardIndexerStatus.CANCELLED, null), ActionListener.wrap(response -> { + final RollupShardTaskParams params = (RollupShardTaskParams) response.getParams(); + logger.info("Downsampling task [" + task.getPersistentTaskId() + " on shard " + params.shardId() + " cancelled"); task.markAsFailed(new ElasticsearchException("Downsamplig task cancelled")); }, e -> { task.markAsFailed(e); + logger.error("Updating downsampling task state for [" + task.getPersistentTaskId() + " failed (CANCELLED)"); throw new ElasticsearchException("Error while updating downsampling persistent task state", e); }) ); diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardTaskParams.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardTaskParams.java index 80d5a01defd5f..0b17c23787c98 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardTaskParams.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/RollupShardTaskParams.java @@ -46,7 +46,7 @@ public record RollupShardTaskParams( (String) args[1], (Long) args[2], (Long) args[3], - (ShardId) args[4], + ShardId.fromString((String) args[4]), (String[]) args[5], (String[]) args[6] ) diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java index aa346b21c6445..093b69682cb8b 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/downsample/TransportDownsampleAction.java @@ -322,6 +322,9 @@ protected void masterOperation( params, ActionListener.wrap( startedTask -> persistentTasksService.waitForPersistentTaskCondition(startedTask.getId(), runningTask -> { + if (runningTask == null) { + return false; + } RollupShardPersistentTaskState runningPersistentTaskState = (RollupShardPersistentTaskState) runningTask .getState(); return runningPersistentTaskState != null && runningPersistentTaskState.done(); diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java index 9bd53cd74b262..7882bdc553cc4 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/downsample/DownsampleActionSingleNodeTests.java @@ -150,7 +150,7 @@ public class DownsampleActionSingleNodeTests extends ESSingleNodeTestCase { private static final int MAX_DIM_VALUES = 5; private static final long MAX_NUM_BUCKETS = 10; - private static final TimeValue TIMEOUT = new TimeValue(1, TimeUnit.MINUTES); + private static final TimeValue TIMEOUT = new TimeValue(60, TimeUnit.MINUTES); private String sourceIndex, rollupIndex; private long startTime; @@ -209,9 +209,9 @@ public void setup() throws IOException { sourceIndex = randomAlphaOfLength(14).toLowerCase(Locale.ROOT); rollupIndex = "rollup-" + sourceIndex; startTime = randomLongBetween(946769284000L, 1607470084000L); // random date between 2000-2020 - docCount = randomIntBetween(10, 9000); - numOfShards = randomIntBetween(1, 4); - numOfReplicas = randomIntBetween(0, 3); + docCount = randomIntBetween(1000, 9000); + numOfShards = randomIntBetween(1, 1); + numOfReplicas = randomIntBetween(0, 0); // Values for keyword dimensions dimensionValues = new ArrayList<>(MAX_DIM_VALUES);