Skip to content

Commit

Permalink
fix: some serialization issues
Browse files Browse the repository at this point in the history
  • Loading branch information
salvatore-campagna committed Jul 13, 2023
1 parent b2e498f commit 0839060
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.xpack.core.rollup.action;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.persistent.PersistentTaskState;
Expand All @@ -21,7 +22,7 @@
* @param rollupShardIndexerStatus An instance of {@link RollupShardIndexerStatus} with the rollupShardIndexerStatus of the rollup task
* @param tsid The latest successfully processed tsid component of a tuple (tsid, timestamp)
*/
public record RollupShardPersistentTaskState(RollupShardIndexerStatus rollupShardIndexerStatus, String tsid)
public record RollupShardPersistentTaskState(RollupShardIndexerStatus rollupShardIndexerStatus, BytesRef tsid)
implements
PersistentTaskState {

Expand All @@ -32,11 +33,11 @@ public record RollupShardPersistentTaskState(RollupShardIndexerStatus rollupShar
private static final ConstructingObjectParser<RollupShardPersistentTaskState, Void> PARSER = new ConstructingObjectParser<>(
NAME,
true,
args -> new RollupShardPersistentTaskState((RollupShardIndexerStatus) args[0], (String) args[1])
args -> new RollupShardPersistentTaskState((RollupShardIndexerStatus) args[0], (BytesRef) args[1])
);

public RollupShardPersistentTaskState(final StreamInput in) throws IOException {
this(RollupShardIndexerStatus.readFromStream(in), in.readString());
this(RollupShardIndexerStatus.readFromStream(in), in.readBytesRef());
}

@Override
Expand All @@ -55,7 +56,7 @@ public String getWriteableName() {
@Override
public void writeTo(StreamOutput out) throws IOException {
rollupShardIndexerStatus.writeTo(out);
out.writeString(tsid);
out.writeBytesRef(tsid);
}

@Override
Expand All @@ -82,7 +83,7 @@ public boolean failed() {
}

public static RollupShardPersistentTaskState readFromStream(final StreamInput in) throws IOException {
return new RollupShardPersistentTaskState(RollupShardIndexerStatus.readFromStream(in), in.readString());
return new RollupShardPersistentTaskState(RollupShardIndexerStatus.readFromStream(in), in.readBytesRef());
}

public static RollupShardPersistentTaskState fromXContent(final XContentParser parser) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.document.SortedSetDocValuesField;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.MatchNoDocsQuery;
Expand Down Expand Up @@ -37,9 +38,7 @@
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.DocCountFieldMapper;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.DocValueFormat;
Expand Down Expand Up @@ -145,12 +144,20 @@ class RollupShardIndexer {
}

public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOException {
final Query initialStateQuery = createQuery();
if (initialStateQuery instanceof MatchNoDocsQuery) {
return new DownsampleIndexerAction.ShardDownsampleResponse(indexShard.shardId(), task.getNumIndexed());
}
long startTime = client.threadPool().relativeTimeInMillis();
task.setTotalShardDocCount(searcher.getDirectoryReader().numDocs());
task.setRollupShardIndexerStatus(RollupShardIndexerStatus.STARTED);
task.updatePersistentTaskState(
new RollupShardPersistentTaskState(RollupShardIndexerStatus.STARTED, null),
ActionListener.wrap(response -> {}, e -> {
ActionListener.wrap(response -> {
final RollupShardTaskParams params = (RollupShardTaskParams) response.getParams();
logger.info("Downsampling task [" + task.getPersistentTaskId() + " on shard " + params.shardId() + " started");
}, e -> {
logger.error("Updating downsampling task state for [" + task.getPersistentTaskId() + " failed (STARTED)", e);
throw new ElasticsearchException("Error while updating downsampling persistent task state", e);
})
);
Expand All @@ -159,7 +166,7 @@ public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOExcept
final TimeSeriesIndexSearcher timeSeriesSearcher = new TimeSeriesIndexSearcher(searcher, List.of(this::checkCancelled));
TimeSeriesBucketCollector bucketCollector = new TimeSeriesBucketCollector(bulkProcessor);
bucketCollector.preCollection();
timeSeriesSearcher.search(createQuery(), bucketCollector);
timeSeriesSearcher.search(initialStateQuery, bucketCollector);
bucketCollector.postCollection();
}

Expand All @@ -178,9 +185,12 @@ public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOExcept
task.updatePersistentTaskState(
new RollupShardPersistentTaskState(RollupShardIndexerStatus.FAILED, null),
ActionListener.wrap(response -> {
final RollupShardTaskParams params = (RollupShardTaskParams) response.getParams();
logger.info("Downsampling task [" + task.getPersistentTaskId() + " on shard " + params.shardId() + " failed");
task.markAsFailed(new ElasticsearchException("Invalid number of indexed documents"));
}, e -> {
task.markAsFailed(e);
logger.error("Updating downsampling task state for [" + task.getPersistentTaskId() + " failed (FAILED)");
throw new ElasticsearchException("Error while updating downsampling persistent task state", e);
})
);
Expand All @@ -198,7 +208,21 @@ public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOExcept
if (task.getNumFailed() > 0) {
task.updatePersistentTaskState(
new RollupShardPersistentTaskState(RollupShardIndexerStatus.FAILED, null),
ActionListener.wrap(response -> task.setRollupShardIndexerStatus(RollupShardIndexerStatus.FAILED), e -> {
ActionListener.wrap(response -> {
final RollupShardTaskParams params = (RollupShardTaskParams) response.getParams();
logger.info(
"Downsampling task ["
+ task.getPersistentTaskId()
+ " on shard ["
+ params.shardId()
+ "] failed indexing ["
+ task.getNumFailed()
+ "] documents"
);
task.markAsFailed(new ElasticsearchException("Invalid number of indexed documents"));
}, e -> {
task.markAsFailed(e);
logger.error("Updating downsampling task state for [" + task.getPersistentTaskId() + " failed (FAILED)");
throw new ElasticsearchException("Error while updating downsampling persistent task state", e);
})
);
Expand All @@ -217,9 +241,12 @@ public DownsampleIndexerAction.ShardDownsampleResponse execute() throws IOExcept
task.updatePersistentTaskState(
new RollupShardPersistentTaskState(RollupShardIndexerStatus.COMPLETED, null),
ActionListener.wrap(response -> {
final RollupShardTaskParams params = (RollupShardTaskParams) response.getParams();
logger.info("Downsampling task [" + task.getPersistentTaskId() + " on shard " + params.shardId() + " completed");
task.markAsCompleted();
}, e -> {
task.markAsFailed(e);
logger.error("Updating downsampling task state for [" + task.getPersistentTaskId() + " failed (COMPLETED)");
throw new ElasticsearchException("error while updating downsampling persistent task state", e);
})
);
Expand All @@ -237,19 +264,7 @@ private Query createQuery() throws IOException {
null,
Collections.emptyMap()
);
return new BoolQueryBuilder().filter(new TermQueryBuilder(TimeSeriesIdFieldMapper.NAME, this.state.tsid()))
.toQuery(searchExecutionContext);
} else if (this.state.done()) {
// NOTE: resuming a task expected to be completed (successfully or unsuccessfully) should not really happen.
// We make sure we do nothing if it ever happens.
logger.warn(
"Resuming a downsampling task expected to be completed. Last processed entry ["
+ this.state.tsid()
+ "]. Status ["
+ this.state.rollupShardIndexerStatus()
+ "]"
);
return new MatchNoDocsQuery();
return SortedSetDocValuesField.newSlowRangeQuery(TimeSeriesIdFieldMapper.NAME, this.state.tsid(), null, true, false);
}
return new MatchAllDocsQuery();
}
Expand All @@ -267,9 +282,12 @@ private void checkCancelled() {
task.updatePersistentTaskState(
new RollupShardPersistentTaskState(RollupShardIndexerStatus.CANCELLED, null),
ActionListener.wrap(response -> {
final RollupShardTaskParams params = (RollupShardTaskParams) response.getParams();
logger.info("Downsampling task [" + task.getPersistentTaskId() + " on shard " + params.shardId() + " cancelled");
task.markAsFailed(new ElasticsearchException("Downsamplig task cancelled"));
}, e -> {
task.markAsFailed(e);
logger.error("Updating downsampling task state for [" + task.getPersistentTaskId() + " failed (CANCELLED)");
throw new ElasticsearchException("Error while updating downsampling persistent task state", e);
})
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public record RollupShardTaskParams(
(String) args[1],
(Long) args[2],
(Long) args[3],
(ShardId) args[4],
ShardId.fromString((String) args[4]),
(String[]) args[5],
(String[]) args[6]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,9 @@ protected void masterOperation(
params,
ActionListener.wrap(
startedTask -> persistentTasksService.waitForPersistentTaskCondition(startedTask.getId(), runningTask -> {
if (runningTask == null) {
return false;
}
RollupShardPersistentTaskState runningPersistentTaskState = (RollupShardPersistentTaskState) runningTask
.getState();
return runningPersistentTaskState != null && runningPersistentTaskState.done();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public class DownsampleActionSingleNodeTests extends ESSingleNodeTestCase {

private static final int MAX_DIM_VALUES = 5;
private static final long MAX_NUM_BUCKETS = 10;
private static final TimeValue TIMEOUT = new TimeValue(1, TimeUnit.MINUTES);
private static final TimeValue TIMEOUT = new TimeValue(60, TimeUnit.MINUTES);

private String sourceIndex, rollupIndex;
private long startTime;
Expand Down Expand Up @@ -209,9 +209,9 @@ public void setup() throws IOException {
sourceIndex = randomAlphaOfLength(14).toLowerCase(Locale.ROOT);
rollupIndex = "rollup-" + sourceIndex;
startTime = randomLongBetween(946769284000L, 1607470084000L); // random date between 2000-2020
docCount = randomIntBetween(10, 9000);
numOfShards = randomIntBetween(1, 4);
numOfReplicas = randomIntBetween(0, 3);
docCount = randomIntBetween(1000, 9000);
numOfShards = randomIntBetween(1, 1);
numOfReplicas = randomIntBetween(0, 0);

// Values for keyword dimensions
dimensionValues = new ArrayList<>(MAX_DIM_VALUES);
Expand Down

0 comments on commit 0839060

Please sign in to comment.