Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update replicationCheckpoint to include accurate seqNo from SegmentInfos. #6122

Merged
merged 2 commits into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix 'org.apache.hc.core5.http.ParseException: Invalid protocol version' under JDK 16+ ([#4827](https://github.com/opensearch-project/OpenSearch/pull/4827))
- Fixed compression support for h2c protocol ([#4944](https://github.com/opensearch-project/OpenSearch/pull/4944))
- Support OpenSSL Provider with default Netty allocator ([#5460](https://github.com/opensearch-project/OpenSearch/pull/5460))
- Segment Replication - Fixed bug where inaccurate sequence numbers were sent during replication ([#6122](https://github.com/opensearch-project/OpenSearch/pull/6122))

### Security

Expand Down
49 changes: 37 additions & 12 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1447,31 +1447,56 @@ public GatedCloseable<IndexCommit> acquireSafeIndexCommit() throws EngineExcepti
}

/**
* Returns the latest ReplicationCheckpoint that shard received.
* Compute and return the latest ReplicationCheckpoint for a particular shard.
* @return EMPTY checkpoint before the engine is opened and null for non-segrep enabled indices
*/
public ReplicationCheckpoint getLatestReplicationCheckpoint() {
final Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> infosAndCheckpoint = getLatestSegmentInfosAndCheckpoint();
if (infosAndCheckpoint == null) {
return null;
}
try (final GatedCloseable<SegmentInfos> ignored = infosAndCheckpoint.v1()) {
return infosAndCheckpoint.v2();
} catch (IOException e) {
throw new OpenSearchException("Error Closing SegmentInfos Snapshot", e);
}
}

/**
* Compute and return the latest ReplicationCheckpoint for a shard and a GatedCloseable containing the corresponding SegmentInfos.
* The segments referenced by the SegmentInfos will remain on disk until the GatedCloseable is closed.
*
* Primary shards compute the seqNo used in the replication checkpoint from the fetched SegmentInfos.
* Replica shards compute the seqNo from its latest processed checkpoint, which only increases when refreshing on new segments.
*
* @return A {@link Tuple} containing SegmentInfos wrapped in a {@link GatedCloseable} and the {@link ReplicationCheckpoint} computed from the infos.
*
*/
public Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> getLatestSegmentInfosAndCheckpoint() {
dreamer-89 marked this conversation as resolved.
Show resolved Hide resolved
if (indexSettings.isSegRepEnabled() == false) {
return null;
}
if (getEngineOrNull() == null) {
return ReplicationCheckpoint.empty(shardId);
return new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId));
}
try (final GatedCloseable<SegmentInfos> snapshot = getSegmentInfosSnapshot()) {
return Optional.ofNullable(snapshot.get())
.map(
segmentInfos -> new ReplicationCheckpoint(
// do not close the snapshot - caller will close it.
final GatedCloseable<SegmentInfos> snapshot = getSegmentInfosSnapshot();
return Optional.ofNullable(snapshot.get()).map(segmentInfos -> {
try {
return new Tuple<>(
snapshot,
new ReplicationCheckpoint(
this.shardId,
getOperationPrimaryTerm(),
segmentInfos.getGeneration(),
getProcessedLocalCheckpoint(),
shardRouting.primary() ? getEngine().getMaxSeqNoFromSegmentInfos(segmentInfos) : getProcessedLocalCheckpoint(),
segmentInfos.getVersion()
)
)
.orElse(ReplicationCheckpoint.empty(shardId));
} catch (IOException ex) {
throw new OpenSearchException("Error Closing SegmentInfos Snapshot", ex);
}
);
} catch (IOException e) {
throw new OpenSearchException("Error Fetching SegmentInfos and latest checkpoint", e);
}
}).orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,7 @@ public void commitSegmentInfos(SegmentInfos latestSegmentInfos, long maxSeqNo, l
final Map<String, String> userData = new HashMap<>(latestSegmentInfos.getUserData());
userData.put(LOCAL_CHECKPOINT_KEY, String.valueOf(processedCheckpoint));
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
latestSegmentInfos.setUserData(userData, true);
latestSegmentInfos.setUserData(userData, false);
mch2 marked this conversation as resolved.
Show resolved Hide resolved
latestSegmentInfos.commit(directory());
directory.sync(latestSegmentInfos.files(true));
directory.syncMetaData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.ByteBuffersDataOutput;
import org.apache.lucene.store.ByteBuffersIndexOutput;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.util.concurrent.AbstractRefCounted;
import org.opensearch.index.shard.IndexShard;
Expand Down Expand Up @@ -44,16 +45,12 @@ public CopyState(ReplicationCheckpoint requestedReplicationCheckpoint, IndexShar
super("CopyState-" + shard.shardId());
this.requestedReplicationCheckpoint = requestedReplicationCheckpoint;
this.shard = shard;
this.segmentInfosRef = shard.getSegmentInfosSnapshot();
final Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> latestSegmentInfosAndCheckpoint = shard
.getLatestSegmentInfosAndCheckpoint();
this.segmentInfosRef = latestSegmentInfosAndCheckpoint.v1();
this.replicationCheckpoint = latestSegmentInfosAndCheckpoint.v2();
SegmentInfos segmentInfos = this.segmentInfosRef.get();
this.metadataMap = shard.store().getSegmentMetadataMap(segmentInfos);
this.replicationCheckpoint = new ReplicationCheckpoint(
shard.shardId(),
shard.getOperationPrimaryTerm(),
segmentInfos.getGeneration(),
shard.getProcessedLocalCheckpoint(),
segmentInfos.getVersion()
);
this.commitRef = shard.acquireLastIndexCommit(false);

ByteBuffersDataOutput buffer = new ByteBuffersDataOutput();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingHelper;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -104,6 +105,60 @@ public void testReplicationCheckpointNotNullForSegRep() throws IOException {
closeShards(indexShard);
}

public void testSegmentInfosAndReplicationCheckpointTuple() throws Exception {
try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) {
shards.startAll();
final IndexShard primary = shards.getPrimary();
final IndexShard replica = shards.getReplicas().get(0);

// assert before any indexing:
// replica:
Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> replicaTuple = replica.getLatestSegmentInfosAndCheckpoint();
try (final GatedCloseable<SegmentInfos> gatedCloseable = replicaTuple.v1()) {
assertReplicationCheckpoint(replica, gatedCloseable.get(), replicaTuple.v2());
}

// primary:
Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> primaryTuple = primary.getLatestSegmentInfosAndCheckpoint();
try (final GatedCloseable<SegmentInfos> gatedCloseable = primaryTuple.v1()) {
assertReplicationCheckpoint(primary, gatedCloseable.get(), primaryTuple.v2());
}
// We use compareTo here instead of equals because we ignore segments gen with replicas performing their own commits.
// However infos version we expect to be equal.
assertEquals(1, primary.getLatestReplicationCheckpoint().compareTo(replica.getLatestReplicationCheckpoint()));

// index and copy segments to replica.
int numDocs = randomIntBetween(10, 100);
shards.indexDocs(numDocs);
primary.refresh("test");
replicateSegments(primary, List.of(replica));

replicaTuple = replica.getLatestSegmentInfosAndCheckpoint();
try (final GatedCloseable<SegmentInfos> gatedCloseable = replicaTuple.v1()) {
assertReplicationCheckpoint(replica, gatedCloseable.get(), replicaTuple.v2());
}

primaryTuple = primary.getLatestSegmentInfosAndCheckpoint();
try (final GatedCloseable<SegmentInfos> gatedCloseable = primaryTuple.v1()) {
assertReplicationCheckpoint(primary, gatedCloseable.get(), primaryTuple.v2());
}

replicaTuple = replica.getLatestSegmentInfosAndCheckpoint();
try (final GatedCloseable<SegmentInfos> gatedCloseable = replicaTuple.v1()) {
assertReplicationCheckpoint(replica, gatedCloseable.get(), replicaTuple.v2());
}
assertEquals(1, primary.getLatestReplicationCheckpoint().compareTo(replica.getLatestReplicationCheckpoint()));
}
}

private void assertReplicationCheckpoint(IndexShard shard, SegmentInfos segmentInfos, ReplicationCheckpoint checkpoint)
throws IOException {
assertNotNull(segmentInfos);
assertEquals(checkpoint.getSeqNo(), shard.getEngine().getMaxSeqNoFromSegmentInfos(segmentInfos));
assertEquals(checkpoint.getSegmentInfosVersion(), segmentInfos.getVersion());
assertEquals(checkpoint.getSegmentsGen(), segmentInfos.getGeneration());
}

public void testIsSegmentReplicationAllowed_WrongEngineType() throws IOException {
final IndexShard indexShard = newShard(false, settings, new InternalEngineFactory());
assertFalse(indexShard.isSegmentReplicationAllowed());
Expand Down
25 changes: 25 additions & 0 deletions server/src/test/java/org/opensearch/index/store/StoreTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.opensearch.index.engine.Engine;
import org.opensearch.index.seqno.ReplicationTracker;
import org.opensearch.index.seqno.RetentionLease;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata;
Expand Down Expand Up @@ -120,6 +121,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY;
import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectory.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY_MINIMUM_VERSION;
import static org.opensearch.test.VersionUtils.randomVersion;

Expand Down Expand Up @@ -1305,6 +1307,29 @@ public void testReadSegmentsFromOldIndicesFailure() throws IOException {
store.close();
}

public void testCommitSegmentInfos() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 1);
Store store = new Store(
shardId,
SEGMENT_REPLICATION_INDEX_SETTINGS,
StoreTests.newDirectory(random()),
new DummyShardLock(shardId)
);
commitRandomDocs(store);
final SegmentInfos lastCommittedInfos = store.readLastCommittedSegmentsInfo();
final long expectedLocalCheckpoint = 1;
final long expectedMaxSeqNo = 2;
store.commitSegmentInfos(lastCommittedInfos, expectedMaxSeqNo, expectedLocalCheckpoint);

final SegmentInfos updatedInfos = store.readLastCommittedSegmentsInfo();
assertEquals(lastCommittedInfos.getVersion(), updatedInfos.getVersion());
final Map<String, String> userData = updatedInfos.getUserData();
assertEquals(expectedLocalCheckpoint, Long.parseLong(userData.get(LOCAL_CHECKPOINT_KEY)));
assertEquals(expectedMaxSeqNo, Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO)));
deleteContent(store.directory());
IOUtils.close(store);
}

private void commitRandomDocs(Store store) throws IOException {
IndexWriter writer = indexRandomDocs(store);
writer.commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.util.Version;
import org.opensearch.common.collect.Map;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardTestCase;
Expand Down Expand Up @@ -48,14 +49,7 @@ public class CopyStateTests extends IndexShardTestCase {

public void testCopyStateCreation() throws IOException {
final IndexShard mockIndexShard = createMockIndexShard();
ReplicationCheckpoint testCheckpoint = new ReplicationCheckpoint(
mockIndexShard.shardId(),
mockIndexShard.getOperationPrimaryTerm(),
0L,
mockIndexShard.getProcessedLocalCheckpoint(),
0L
);
CopyState copyState = new CopyState(testCheckpoint, mockIndexShard);
CopyState copyState = new CopyState(ReplicationCheckpoint.empty(mockIndexShard.shardId()), mockIndexShard);
ReplicationCheckpoint checkpoint = copyState.getCheckpoint();
assertEquals(TEST_SHARD_ID, checkpoint.getShardId());
// version was never set so this should be zero
Expand All @@ -73,7 +67,18 @@ public static IndexShard createMockIndexShard() throws IOException {
when(mockShard.store()).thenReturn(mockStore);

SegmentInfos testSegmentInfos = new SegmentInfos(Version.LATEST.major);
when(mockShard.getSegmentInfosSnapshot()).thenReturn(new GatedCloseable<>(testSegmentInfos, () -> {}));
ReplicationCheckpoint testCheckpoint = new ReplicationCheckpoint(
mockShard.shardId(),
mockShard.getOperationPrimaryTerm(),
0L,
mockShard.getProcessedLocalCheckpoint(),
0L
);
final Tuple<GatedCloseable<SegmentInfos>, ReplicationCheckpoint> gatedCloseableReplicationCheckpointTuple = new Tuple<>(
new GatedCloseable<>(testSegmentInfos, () -> {}),
testCheckpoint
);
when(mockShard.getLatestSegmentInfosAndCheckpoint()).thenReturn(gatedCloseableReplicationCheckpointTuple);
when(mockStore.getSegmentMetadataMap(testSegmentInfos)).thenReturn(SI_SNAPSHOT.asMap());

IndexCommit mockIndexCommit = mock(IndexCommit.class);
Expand Down