Skip to content

Commit 5409734

Browse files
[Segment Replication] Override segment replication handler for duplicate request from replica (opensearch-project#6693) (opensearch-project#6699)
* [Segment Replication] Override segment replication handler for new request from same replica * Spotless and use map.compute --------- (cherry picked from commit 6bbe31a) Signed-off-by: Suraj Singh <surajrider@gmail.com> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 1e30d68 commit 5409734

File tree

2 files changed

+9
-12
lines changed

2 files changed

+9
-12
lines changed

server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -147,16 +147,13 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener<GetSegmentF
147147
*/
148148
CopyState prepareForReplication(CheckpointInfoRequest request, FileChunkWriter fileChunkWriter) throws IOException {
149149
final CopyState copyState = getCachedCopyState(request.getCheckpoint());
150-
if (allocationIdToHandlers.putIfAbsent(
151-
request.getTargetAllocationId(),
152-
createTargetHandler(request.getTargetNode(), copyState, request.getTargetAllocationId(), fileChunkWriter)
153-
) != null) {
154-
throw new OpenSearchException(
155-
"Shard copy {} on node {} already replicating",
156-
request.getCheckpoint().getShardId(),
157-
request.getTargetNode()
158-
);
159-
}
150+
allocationIdToHandlers.compute(request.getTargetAllocationId(), (allocationId, segrepHandler) -> {
151+
if (segrepHandler != null) {
152+
logger.warn("Override handler for allocation id {}", request.getTargetAllocationId());
153+
cancelHandlers(handler -> handler.getAllocationId().equals(request.getTargetAllocationId()), "cancel due to retry");
154+
}
155+
return createTargetHandler(request.getTargetNode(), copyState, request.getTargetAllocationId(), fileChunkWriter);
156+
});
160157
return copyState;
161158
}
162159

server/src/test/java/org/opensearch/indices/replication/OngoingSegmentReplicationsTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
package org.opensearch.indices.replication;
1010

1111
import org.junit.Assert;
12-
import org.opensearch.OpenSearchException;
1312
import org.opensearch.action.ActionListener;
1413
import org.opensearch.cluster.metadata.IndexMetadata;
1514
import org.opensearch.cluster.node.DiscoveryNode;
@@ -277,7 +276,8 @@ public void testShardAlreadyReplicatingToNode() throws IOException {
277276
listener.onResponse(null);
278277
};
279278
replications.prepareForReplication(request, segmentSegmentFileChunkWriter);
280-
assertThrows(OpenSearchException.class, () -> { replications.prepareForReplication(request, segmentSegmentFileChunkWriter); });
279+
CopyState copyState = replications.prepareForReplication(request, segmentSegmentFileChunkWriter);
280+
assertEquals(1, copyState.refCount());
281281
}
282282

283283
public void testStartReplicationWithNoFilesToFetch() throws IOException {

0 commit comments

Comments
 (0)