Skip to content

Commit

Permalink
fix: npe and exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
salvatore-campagna committed Jul 11, 2023
1 parent 8636ad7 commit d61bd7a
Showing 1 changed file with 35 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.cluster.stats.MappingVisitor;
Expand Down Expand Up @@ -375,8 +376,9 @@ public void onResponse(
),
e -> {
/*
* At this point rollup has been created successfully even if
* force-merge fails. So, we should not fail the rollup operation.
* At this point rollup has been created successfully even
* if force-merge fails. So, we should not fail the rollup
* operation.
*/
logger.error(
"Failed to force-merge rollup index ["
Expand Down Expand Up @@ -461,14 +463,40 @@ public void onResponse(

@Override
public void onFailure(Exception e) {
logger.error("error while starting downsampling persistent task", e);
listener.onFailure(e);
deleteRollupIndex(
sourceIndexName,
rollupIndexName,
parentTask,
listener,
new ElasticsearchException(
"Error while starting downsampling persistent task [" + persistentRollupTaskId + "]",
e
)
);
}
}),
e -> {
// TODO: handle task already existing waiting
logger.warn("failed to start downsample task or task already exists", e);
listener.onFailure(e);
if (e instanceof ResourceAlreadyExistsException) {
logger.warn(
"Downsampling persistent task [" + persistentRollupTaskId + "] already exists, waiting...",
e
);
persistentTasksService.waitForPersistentTasksCondition(existingTask -> {
PersistentTasksCustomMetadata.PersistentTask<?> existingPersistentTask = existingTask.getTask(
persistentRollupTaskId
);
final RollupShardPersistentTaskState existingPersistentTaskState =
(RollupShardPersistentTaskState) existingPersistentTask.getState();
return existingPersistentTaskState.done();
}, new TimeValue(1, TimeUnit.MINUTES), ActionListener.noop());
} else {
listener.onFailure(
new ElasticsearchException(
"Error while starting persistent downsampling task [" + persistentRollupTaskId + "] ",
e
)
);
}
}
)
);
Expand Down

0 comments on commit d61bd7a

Please sign in to comment.