Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into index-metadata-mutate
Browse files Browse the repository at this point in the history
  • Loading branch information
shourya035 committed Apr 17, 2024
2 parents da6b91a + b899a27 commit 4e15cf9
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix UOE While building Exists query for nested search_as_you_type field ([#12048](https://github.com/opensearch-project/OpenSearch/pull/12048))
- Client with Java 8 runtime and Apache HttpClient 5 Transport fails with java.lang.NoSuchMethodError: java.nio.ByteBuffer.flip()Ljava/nio/ByteBuffer ([#13100](https://github.com/opensearch-project/opensearch-java/pull/13100))
- Fix implement mark() and markSupported() in class FilterStreamInput ([#13098](https://github.com/opensearch-project/OpenSearch/pull/13098))
- Fix snapshot _status API to return correct status for partial snapshots ([#12812](https://github.com/opensearch-project/OpenSearch/pull/12812))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ public void testRestoreIndexWithMissingShards() throws Exception {
List<SnapshotStatus> snapshotStatuses = snapshotsStatusResponse.getSnapshots();
assertEquals(snapshotStatuses.size(), 1);
logger.trace("current snapshot status [{}]", snapshotStatuses.get(0));
assertTrue(snapshotStatuses.get(0).getState().completed());
assertThat(getSnapshot("test-repo", "test-snap-2").state(), equalTo(SnapshotState.PARTIAL));
}, 1, TimeUnit.MINUTES);
SnapshotsStatusResponse snapshotsStatusResponse = clusterAdmin().prepareSnapshotStatus("test-repo")
.setSnapshots("test-snap-2")
Expand All @@ -589,7 +589,6 @@ public void testRestoreIndexWithMissingShards() throws Exception {
// After it was marked as completed in the cluster state - we need to check if it's completed on the file system as well
assertBusy(() -> {
SnapshotInfo snapshotInfo = getSnapshot("test-repo", "test-snap-2");
assertTrue(snapshotInfo.state().completed());
assertEquals(SnapshotState.PARTIAL, snapshotInfo.state());
}, 1, TimeUnit.MINUTES);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStatus;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotStats;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest;
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.SnapshotsInProgress;
Expand Down Expand Up @@ -101,13 +100,9 @@ public void testStatusApiConsistency() {
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
assertThat(snapshotInfo.version(), equalTo(Version.CURRENT));

final List<SnapshotStatus> snapshotStatus = clusterAdmin().snapshotsStatus(
new SnapshotsStatusRequest("test-repo", new String[] { "test-snap" })
).actionGet().getSnapshots();
assertThat(snapshotStatus.size(), equalTo(1));
final SnapshotStatus snStatus = snapshotStatus.get(0);
assertEquals(snStatus.getStats().getStartTime(), snapshotInfo.startTime());
assertEquals(snStatus.getStats().getTime(), snapshotInfo.endTime() - snapshotInfo.startTime());
final SnapshotStatus snapshotStatus = getSnapshotStatus("test-repo", "test-snap");
assertEquals(snapshotStatus.getStats().getStartTime(), snapshotInfo.startTime());
assertEquals(snapshotStatus.getStats().getTime(), snapshotInfo.endTime() - snapshotInfo.startTime());
}

public void testStatusAPICallForShallowCopySnapshot() {
Expand Down Expand Up @@ -357,6 +352,22 @@ public void testSnapshotStatusOnFailedSnapshot() throws Exception {
assertEquals(SnapshotsInProgress.State.FAILED, snapshotsStatusResponse.getSnapshots().get(0).getState());
}

public void testSnapshotStatusOnPartialSnapshot() throws Exception {
final String dataNode = internalCluster().startDataOnlyNode();
final String repoName = "test-repo";
final String snapshotName = "test-snap";
final String indexName = "test-idx";
createRepository(repoName, "fs");
// create an index with a single shard on the data node, that will be stopped
createIndex(indexName, singleShardOneNode(dataNode));
index(indexName, "_doc", "some_doc_id", "foo", "bar");
logger.info("--> stopping data node before creating snapshot");
stopNode(dataNode);
startFullSnapshot(repoName, snapshotName, true).get();
final SnapshotStatus snapshotStatus = getSnapshotStatus(repoName, snapshotName);
assertEquals(SnapshotsInProgress.State.PARTIAL, snapshotStatus.getState());
}

public void testStatusAPICallInProgressShallowSnapshot() throws Exception {
internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,11 +356,11 @@ private void loadRepositoryData(
state = SnapshotsInProgress.State.FAILED;
break;
case SUCCESS:
case PARTIAL:
// Translating both PARTIAL and SUCCESS to SUCCESS for now
// TODO: add the differentiation on the metadata level in the next major release
state = SnapshotsInProgress.State.SUCCESS;
break;
case PARTIAL:
state = SnapshotsInProgress.State.PARTIAL;
break;
default:
throw new IllegalArgumentException("Unknown snapshot state " + snapshotInfo.state());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,12 @@ public void writeTo(StreamOutput out) throws IOException {
snapshot.writeTo(out);
out.writeBoolean(includeGlobalState);
out.writeBoolean(partial);
out.writeByte(state.value());
if ((out.getVersion().before(Version.V_3_0_0)) && state == State.PARTIAL) {
// Setting to SUCCESS for partial snapshots in older versions to maintain backward compatibility
out.writeByte(State.SUCCESS.value());
} else {
out.writeByte(state.value());
}
out.writeList(indices);
out.writeLong(startTime);
out.writeMap(shards, (o, v) -> v.writeTo(o), (o, v) -> v.writeTo(o));
Expand Down Expand Up @@ -937,7 +942,8 @@ public enum State {
STARTED((byte) 1, false),
SUCCESS((byte) 2, true),
FAILED((byte) 3, true),
ABORTED((byte) 4, false);
ABORTED((byte) 4, false),
PARTIAL((byte) 5, false);

private final byte value;

Expand Down Expand Up @@ -968,6 +974,8 @@ public static State fromValue(byte value) {
return FAILED;
case 4:
return ABORTED;
case 5:
return PARTIAL;
default:
throw new IllegalArgumentException("No snapshot state for value [" + value + "]");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,12 @@ public void testAlreadyOnNewCheckpoint() {
}

@TestLogging(reason = "Getting trace logs from replication package", value = "org.opensearch.indices.replication:TRACE")
public void testShardAlreadyReplicating() {
public void testShardAlreadyReplicating() throws InterruptedException {
// in this case shard is already replicating and we receive an ahead checkpoint with same pterm.
// ongoing replication is not cancelled and new one does not start.
CountDownLatch blockGetCheckpointMetadata = new CountDownLatch(1);
CountDownLatch continueGetCheckpointMetadata = new CountDownLatch(1);
CountDownLatch replicationCompleteLatch = new CountDownLatch(1);
SegmentReplicationSource source = new TestReplicationSource() {
@Override
public void getCheckpointMetadata(
Expand All @@ -272,11 +276,13 @@ public void getCheckpointMetadata(
ActionListener<CheckpointInfoResponse> listener
) {
try {
blockGetCheckpointMetadata.await();
final CopyState copyState = new CopyState(primaryShard);
listener.onResponse(
new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes())
);
blockGetCheckpointMetadata.countDown();
continueGetCheckpointMetadata.await();
try (final CopyState copyState = new CopyState(primaryShard)) {
listener.onResponse(
new CheckpointInfoResponse(copyState.getCheckpoint(), copyState.getMetadataMap(), copyState.getInfosBytes())
);
}
} catch (InterruptedException | IOException e) {
throw new RuntimeException(e);
}
Expand All @@ -297,24 +303,73 @@ public void getSegmentFiles(
final SegmentReplicationTarget target = spy(
new SegmentReplicationTarget(
replicaShard,
primaryShard.getLatestReplicationCheckpoint(),
initialCheckpoint,
source,
mock(SegmentReplicationTargetService.SegmentReplicationListener.class)
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
replicationCompleteLatch.countDown();
}

@Override
public void onReplicationFailure(
SegmentReplicationState state,
ReplicationFailedException e,
boolean sendShardFailure
) {
Assert.fail("Replication should not fail");
}
}
)
);

final SegmentReplicationTargetService spy = spy(sut);
doReturn(false).when(spy).processLatestReceivedCheckpoint(eq(replicaShard), any());
// Start first round of segment replication.
spy.startReplication(target);
// wait until we are at getCheckpointMetadata stage
blockGetCheckpointMetadata.await(5, TimeUnit.MINUTES);

// Start second round of segment replication, this should fail to start as first round is still in-progress
spy.onNewCheckpoint(newPrimaryCheckpoint, replicaShard);
verify(spy, times(1)).processLatestReceivedCheckpoint(eq(replicaShard), any());
blockGetCheckpointMetadata.countDown();
// try and insert a new target directly - it should fail immediately and alert listener
spy.startReplication(
new SegmentReplicationTarget(
replicaShard,
aheadCheckpoint,
source,
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
Assert.fail("Should not succeed");
}

@Override
public void onReplicationFailure(
SegmentReplicationState state,
ReplicationFailedException e,
boolean sendShardFailure
) {
assertFalse(sendShardFailure);
assertEquals("Shard " + replicaShard.shardId() + " is already replicating", e.getMessage());
}
}
)
);

// Start second round of segment replication through onNewCheckpoint, this should fail to start as first round is still in-progress
// aheadCheckpoint is of same pterm but higher version
assertTrue(replicaShard.shouldProcessCheckpoint(aheadCheckpoint));
spy.onNewCheckpoint(aheadCheckpoint, replicaShard);
verify(spy, times(0)).processLatestReceivedCheckpoint(eq(replicaShard), any());
// start replication is not invoked with aheadCheckpoint
verify(spy, times(0)).startReplication(
eq(replicaShard),
eq(aheadCheckpoint),
any(SegmentReplicationTargetService.SegmentReplicationListener.class)
);
continueGetCheckpointMetadata.countDown();
replicationCompleteLatch.await(5, TimeUnit.MINUTES);
}

public void testOnNewCheckpointFromNewPrimaryCancelOngoingReplication() throws InterruptedException {
public void testShardAlreadyReplicating_HigherPrimaryTermReceived() throws InterruptedException {
// Create a spy of Target Service so that we can verify invocation of startReplication call with specific checkpoint on it.
SegmentReplicationTargetService serviceSpy = spy(sut);
doNothing().when(serviceSpy).updateVisibleCheckpoint(anyLong(), any());
Expand Down

0 comments on commit 4e15cf9

Please sign in to comment.