Skip to content

Commit 5c0e3c9

Browse files
authored
[Remote Segment Store] Make metadata file immutable (#8363)
Authored-by: Sachin Kale <kalsac@amazon.com>
1 parent c1c23b4 commit 5c0e3c9

File tree

12 files changed

+351
-289
lines changed

12 files changed

+351
-289
lines changed

server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -800,6 +800,7 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception {
800800
public void testPressureServiceStats() throws Exception {
801801
final String primaryNode = internalCluster().startDataOnlyNode();
802802
createIndex(INDEX_NAME);
803+
ensureYellow(INDEX_NAME);
803804
final String replicaNode = internalCluster().startDataOnlyNode();
804805
ensureGreen(INDEX_NAME);
805806

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,6 @@ public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception {
320320
.get()
321321
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
322322
Path indexPath = Path.of(String.valueOf(absolutePath), indexUUID, "/0/segments/metadata");
323-
assertEquals(1, getFileCount(indexPath));
323+
assertEquals(numberOfIterations, getFileCount(indexPath));
324324
}
325325
}

server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.opensearch.index.seqno.SequenceNumbers;
3131
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
3232
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
33+
import org.opensearch.index.translog.Translog;
3334
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
3435
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
3536
import org.opensearch.threadpool.Scheduler;
@@ -359,12 +360,19 @@ void uploadMetadata(Collection<String> localSegmentsPostRefresh, SegmentInfos se
359360
userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo));
360361
segmentInfosSnapshot.setUserData(userData, false);
361362

362-
remoteDirectory.uploadMetadata(
363-
localSegmentsPostRefresh,
364-
segmentInfosSnapshot,
365-
storeDirectory,
366-
indexShard.getOperationPrimaryTerm()
367-
);
363+
Translog.TranslogGeneration translogGeneration = indexShard.getEngine().translogManager().getTranslogGeneration();
364+
if (translogGeneration == null) {
365+
throw new UnsupportedOperationException("Encountered null TranslogGeneration while uploading metadata to remote segment store");
366+
} else {
367+
long translogFileGeneration = translogGeneration.translogFileGeneration;
368+
remoteDirectory.uploadMetadata(
369+
localSegmentsPostRefresh,
370+
segmentInfosSnapshot,
371+
storeDirectory,
372+
indexShard.getOperationPrimaryTerm(),
373+
translogFileGeneration
374+
);
375+
}
368376
}
369377

370378
private boolean uploadNewSegments(Collection<String> localSegmentsPostRefresh) throws IOException {

server/src/main/java/org/opensearch/index/store/RemoteDirectory.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,24 @@
1313
import org.apache.lucene.store.IndexInput;
1414
import org.apache.lucene.store.IndexOutput;
1515
import org.apache.lucene.store.Lock;
16+
import org.opensearch.action.ActionListener;
17+
import org.opensearch.action.LatchedActionListener;
1618
import org.opensearch.common.blobstore.BlobContainer;
1719
import org.opensearch.common.blobstore.BlobMetadata;
1820

1921
import java.io.FileNotFoundException;
2022
import java.io.IOException;
2123
import java.io.InputStream;
2224
import java.nio.file.NoSuchFileException;
25+
import java.util.ArrayList;
2326
import java.util.Collection;
2427
import java.util.Collections;
28+
import java.util.List;
2529
import java.util.Map;
2630
import java.util.Set;
31+
import java.util.concurrent.CountDownLatch;
32+
import java.util.concurrent.atomic.AtomicReference;
33+
import java.util.stream.Collectors;
2734

2835
/**
2936
* A {@code RemoteDirectory} provides an abstraction layer for storing a list of files to a remote store.
@@ -61,6 +68,40 @@ public Collection<String> listFilesByPrefix(String filenamePrefix) throws IOExce
6168
return blobContainer.listBlobsByPrefix(filenamePrefix).keySet();
6269
}
6370

71+
public List<String> listFilesByPrefixInLexicographicOrder(String filenamePrefix, int limit) throws IOException {
72+
List<String> sortedBlobList = new ArrayList<>();
73+
AtomicReference<Exception> exception = new AtomicReference<>();
74+
final CountDownLatch latch = new CountDownLatch(1);
75+
LatchedActionListener<List<BlobMetadata>> actionListener = new LatchedActionListener<>(new ActionListener<>() {
76+
@Override
77+
public void onResponse(List<BlobMetadata> blobMetadata) {
78+
sortedBlobList.addAll(blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList()));
79+
}
80+
81+
@Override
82+
public void onFailure(Exception e) {
83+
exception.set(e);
84+
}
85+
}, latch);
86+
87+
try {
88+
blobContainer.listBlobsByPrefixInSortedOrder(
89+
filenamePrefix,
90+
limit,
91+
BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC,
92+
actionListener
93+
);
94+
latch.await();
95+
} catch (InterruptedException e) {
96+
throw new IOException("Exception in listFilesByPrefixInLexicographicOrder with prefix: " + filenamePrefix, e);
97+
}
98+
if (exception.get() != null) {
99+
throw new IOException(exception.get());
100+
} else {
101+
return sortedBlobList;
102+
}
103+
}
104+
64105
/**
65106
* Removes an existing file in the directory.
66107
*

0 commit comments

Comments
 (0)