Skip to content

StartedShardUpdateTask task listener #82854

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.MessageSupplier;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -616,43 +615,20 @@ public void messageReceived(StartedShardEntry request, TransportChannel channel,
SHARD_STARTED_ACTION_NAME,
request
);

var update = new StartedShardUpdateTask(request, listener);

clusterService.submitStateUpdateTask(
"shard-started " + request,
request,
update,
ClusterStateTaskConfig.build(Priority.URGENT),
shardStartedClusterStateTaskExecutor,
new ClusterStateTaskListener() {
@Override
public void onFailure(Exception e) {
final MessageSupplier msg = () -> new ParameterizedMessage(
"{} unexpected failure while starting shard [{}]",
request.shardId,
request
);
if (e instanceof FailedToCommitClusterStateException) {
logger.debug(msg, e);
} else {
logger.error(msg, e);
}
listener.onFailure(e);
}

@Override
public void onNoLongerMaster() {
logger.debug("{} no longer master while starting shard [{}]", request.shardId, request);
listener.onFailure(new NotMasterException("shard-started"));
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
listener.onResponse(TransportResponse.Empty.INSTANCE);
}
}
update
);
}
}

public static class ShardStartedClusterStateTaskExecutor implements ClusterStateTaskExecutor<StartedShardEntry> {
public static class ShardStartedClusterStateTaskExecutor implements ClusterStateTaskExecutor<StartedShardUpdateTask> {
private final AllocationService allocationService;
private final RerouteService rerouteService;

Expand All @@ -662,52 +638,54 @@ public ShardStartedClusterStateTaskExecutor(AllocationService allocationService,
}

@Override
public ClusterTasksResult<StartedShardEntry> execute(ClusterState currentState, List<StartedShardEntry> tasks) throws Exception {
ClusterTasksResult.Builder<StartedShardEntry> builder = ClusterTasksResult.builder();
List<StartedShardEntry> tasksToBeApplied = new ArrayList<>();
public ClusterTasksResult<StartedShardUpdateTask> execute(ClusterState currentState, List<StartedShardUpdateTask> tasks)
throws Exception {
ClusterTasksResult.Builder<StartedShardUpdateTask> builder = ClusterTasksResult.builder();
List<StartedShardUpdateTask> tasksToBeApplied = new ArrayList<>();
List<ShardRouting> shardRoutingsToBeApplied = new ArrayList<>(tasks.size());
Set<ShardRouting> seenShardRoutings = new HashSet<>(); // to prevent duplicates
final Map<Index, IndexLongFieldRange> updatedTimestampRanges = new HashMap<>();
for (StartedShardEntry task : tasks) {
final ShardRouting matched = currentState.getRoutingTable().getByAllocationId(task.shardId, task.allocationId);
for (StartedShardUpdateTask task : tasks) {
StartedShardEntry entry = task.getEntry();
final ShardRouting matched = currentState.getRoutingTable().getByAllocationId(entry.shardId, entry.allocationId);
if (matched == null) {
// tasks that correspond to non-existent shards are marked as successful. The reason is that we resend shard started
// events on every cluster state publishing that does not contain the shard as started yet. This means that old stale
// requests might still be in flight even after the shard has already been started or failed on the master. We just
// ignore these requests for now.
logger.debug("{} ignoring shard started task [{}] (shard does not exist anymore)", task.shardId, task);
logger.debug("{} ignoring shard started task [{}] (shard does not exist anymore)", entry.shardId, entry);
builder.success(task);
} else {
if (matched.primary() && task.primaryTerm > 0) {
final IndexMetadata indexMetadata = currentState.metadata().index(task.shardId.getIndex());
if (matched.primary() && entry.primaryTerm > 0) {
final IndexMetadata indexMetadata = currentState.metadata().index(entry.shardId.getIndex());
assert indexMetadata != null;
final long currentPrimaryTerm = indexMetadata.primaryTerm(task.shardId.id());
if (currentPrimaryTerm != task.primaryTerm) {
assert currentPrimaryTerm > task.primaryTerm
final long currentPrimaryTerm = indexMetadata.primaryTerm(entry.shardId.id());
if (currentPrimaryTerm != entry.primaryTerm) {
assert currentPrimaryTerm > entry.primaryTerm
: "received a primary term with a higher term than in the "
+ "current cluster state (received ["
+ task.primaryTerm
+ entry.primaryTerm
+ "] but current is ["
+ currentPrimaryTerm
+ "])";
logger.debug(
"{} ignoring shard started task [{}] (primary term {} does not match current term {})",
task.shardId,
task,
task.primaryTerm,
entry.shardId,
entry,
entry.primaryTerm,
currentPrimaryTerm
);
builder.success(task);
continue;
}
}
if (matched.initializing() == false) {
assert matched.active() : "expected active shard routing for task " + task + " but found " + matched;
assert matched.active() : "expected active shard routing for task " + entry + " but found " + matched;
// same as above, this might have been a stale in-flight request, so we just ignore.
logger.debug(
"{} ignoring shard started task [{}] (shard exists but is not initializing: {})",
task.shardId,
task,
entry.shardId,
entry,
matched
);
builder.success(task);
Expand All @@ -716,29 +694,29 @@ public ClusterTasksResult<StartedShardEntry> execute(ClusterState currentState,
if (seenShardRoutings.contains(matched)) {
logger.trace(
"{} ignoring shard started task [{}] (already scheduled to start {})",
task.shardId,
task,
entry.shardId,
entry,
matched
);
tasksToBeApplied.add(task);
} else {
logger.debug("{} starting shard {} (shard started task: [{}])", task.shardId, matched, task);
logger.debug("{} starting shard {} (shard started task: [{}])", entry.shardId, matched, entry);
tasksToBeApplied.add(task);
shardRoutingsToBeApplied.add(matched);
seenShardRoutings.add(matched);

// expand the timestamp range recorded in the index metadata if needed
final Index index = task.shardId.getIndex();
final Index index = entry.shardId.getIndex();
IndexLongFieldRange currentTimestampMillisRange = updatedTimestampRanges.get(index);
final IndexMetadata indexMetadata = currentState.metadata().index(index);
if (currentTimestampMillisRange == null) {
currentTimestampMillisRange = indexMetadata.getTimestampRange();
}
final IndexLongFieldRange newTimestampMillisRange;
newTimestampMillisRange = currentTimestampMillisRange.extendWithShardRange(
task.shardId.id(),
entry.shardId.id(),
indexMetadata.getNumberOfShards(),
task.timestampRange
entry.timestampRange
);
if (newTimestampMillisRange != currentTimestampMillisRange) {
updatedTimestampRanges.put(index, newTimestampMillisRange);
Expand Down Expand Up @@ -872,6 +850,43 @@ public int hashCode() {
}
}

public static class StartedShardUpdateTask implements ClusterStateTaskListener {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is similar to #82795,
though I wonder if it make sense to gentrify this class for both Started and Failed entry.

This will result in less code, however some of the declarations might get longer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/gentrify/generify/?

I'm not sure there's much to be gained from extracting a common base class for the two requests at the moment. It wouldn't reduce any code duplication AFAIK. I think it makes more sense to aim for splitting ShardStateAction into separate action classes for the shard-started and shard-failed functionality since they're conceptually quite independent. The sendShardAction method is not really doing anything that TransportMasterNodeAction couldn't do today, although I would like to change the behaviour here so that when a new master is elected we send all the retries in a single transport request.


private final StartedShardEntry entry;
private final ActionListener<TransportResponse.Empty> listener;

public StartedShardUpdateTask(StartedShardEntry entry, ActionListener<TransportResponse.Empty> listener) {
this.entry = entry;
this.listener = listener;
}

public StartedShardEntry getEntry() {
return entry;
}

@Override
public void onFailure(Exception e) {
if (e instanceof NotMasterException) {
logger.debug(() -> new ParameterizedMessage("{} no longer master while starting shard [{}]", entry.shardId, entry));
} else if (e instanceof FailedToCommitClusterStateException) {
logger.debug(() -> new ParameterizedMessage("{} unexpected failure while starting shard [{}]", entry.shardId, entry), e);
} else {
logger.error(() -> new ParameterizedMessage("{} unexpected failure while starting shard [{}]", entry.shardId, entry), e);
}
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
listener.onResponse(TransportResponse.Empty.INSTANCE);
}

@Override
public String toString() {
return "StartedShardUpdateTask{entry=" + entry + ", listener=" + listener + "}";
}
}

public static class NoLongerPrimaryShardException extends ElasticsearchException {

public NoLongerPrimaryShardException(ShardId shardId, String msg) {
Expand Down
Loading