Skip to content

Commit

Permalink
Revert changes made to PublishCheckpointAction in opensearch-project#…
Browse files Browse the repository at this point in the history
…6366

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>
  • Loading branch information
Rishikesh1159 committed Mar 11, 2023
1 parent f25361f commit 5407a68
Showing 1 changed file with 29 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,12 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionListenerResponseHandler;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.replication.ReplicationMode;
import org.opensearch.action.support.replication.ReplicationOperation;
import org.opensearch.action.support.replication.ReplicationResponse;
import org.opensearch.action.support.replication.ReplicationTask;
import org.opensearch.action.support.replication.TransportReplicationAction;
import org.opensearch.cluster.action.shard.ShardStateAction;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.StreamInput;
Expand All @@ -39,12 +33,15 @@
import org.opensearch.node.NodeClosedException;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.List;
import java.util.Objects;

import org.opensearch.action.support.replication.ReplicationMode;

/**
* Replication action responsible for publishing checkpoint to a replica shard.
*
Expand Down Expand Up @@ -110,33 +107,36 @@ public ReplicationMode getReplicationMode(IndexShard indexShard) {
* Publish checkpoint request to shard
*/
final void publish(IndexShard indexShard) {
String primaryAllocationId = indexShard.routingEntry().allocationId().getId();
long primaryTerm = indexShard.getPendingPrimaryTerm();
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
// we have to execute under the system context so that if security is enabled the sync is authorized
threadContext.markAsSystemContext();
PublishCheckpointRequest request = new PublishCheckpointRequest(indexShard.getLatestReplicationCheckpoint());
final ReplicationCheckpoint checkpoint = request.getCheckpoint();
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "segrep_publish_checkpoint", request);
final ReplicationTimer timer = new ReplicationTimer();
timer.start();
transportService.sendChildRequest(
clusterService.localNode(),
transportPrimaryAction,
new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm),
task,
transportOptions,
new TransportResponseHandler<ReplicationResponse>() {
@Override
public ReplicationResponse read(StreamInput in) throws IOException {
return newResponseInstance(in);
}

final List<ShardRouting> replicationTargets = indexShard.getReplicationGroup().getReplicationTargets();
for (ShardRouting replicationTarget : replicationTargets) {
if (replicationTarget.primary()) {
continue;
}
final DiscoveryNode node = clusterService.state().nodes().get(replicationTarget.currentNodeId());
final ConcreteReplicaRequest<PublishCheckpointRequest> replicaRequest = new ConcreteReplicaRequest<>(
request,
replicationTarget.allocationId().getId(),
primaryTerm,
indexShard.getLastKnownGlobalCheckpoint(),
indexShard.getMaxSeqNoOfUpdatesOrDeletes()
);
final ReplicationTimer timer = new ReplicationTimer();
timer.start();
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "segrep_publish_checkpoint", request);
ActionListener<ReplicationOperation.ReplicaResponse> listener = new ActionListener<>() {
@Override
public void onResponse(ReplicationOperation.ReplicaResponse replicaResponse) {
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public void handleResponse(ReplicationResponse response) {
timer.stop();
logger.trace(
() -> new ParameterizedMessage(
Expand All @@ -151,7 +151,7 @@ public void onResponse(ReplicationOperation.ReplicaResponse replicaResponse) {
}

@Override
public void onFailure(Exception e) {
public void handleException(TransportException e) {
timer.stop();
logger.trace("[shardId {}] Failed to publish checkpoint, timing: {}", indexShard.shardId().getId(), timer.time());
task.setPhase("finished");
Expand All @@ -174,13 +174,8 @@ public void onFailure(Exception e) {
e
);
}
};
final ActionListenerResponseHandler<ReplicaResponse> handler = new ActionListenerResponseHandler<>(
listener,
ReplicaResponse::new
);
transportService.sendChildRequest(node, transportReplicaAction, replicaRequest, task, transportOptions, handler);
}
}
);
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] Publishing replication checkpoint [{}]",
Expand All @@ -197,7 +192,7 @@ protected void shardOperationOnPrimary(
IndexShard primary,
ActionListener<PrimaryResult<PublishCheckpointRequest, ReplicationResponse>> listener
) {
throw new OpenSearchException("PublishCheckpointAction should not hit primary shards");
ActionListener.completeWith(listener, () -> new PrimaryResult<>(request, new ReplicationResponse()));
}

@Override
Expand Down

0 comments on commit 5407a68

Please sign in to comment.