Skip to content

Commit

Permalink
Removing files from FileTransferTracker as well
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>
  • Loading branch information
gbbafna committed Jan 2, 2023
1 parent a626f3e commit e1f2c43
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1124,7 +1124,7 @@ private void createConfiguration() {
if (nodeName != null) {
baseConfig.put("node.name", nodeName);
}
baseConfig.put("path.repo", "/Users/gbbafna/git/OpenSearch/build/testclusters/repo");
baseConfig.put("path.repo", confPathRepo.toAbsolutePath().toString());
baseConfig.put("path.data", confPathData.toAbsolutePath().toString());
baseConfig.put("path.logs", confPathLogs.toAbsolutePath().toString());
baseConfig.put("path.shared_data", workingDir.resolve("sharedData").toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,8 @@ public void close() throws IOException {
protected long getMinReferencedGen() throws IOException {
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
long minReferencedGen = Math.min(
Math.min(
deletionPolicy.minTranslogGenRequired(readers, current),
minGenerationForSeqNo(deletionPolicy.getLocalCheckpointOfSafeCommit() + 1, current, readers)
),
minGenerationForSeqNo(minSeqNoRequired, current, readers)
deletionPolicy.minTranslogGenRequired(readers, current),
minGenerationForSeqNo(Math.min(deletionPolicy.getLocalCheckpointOfSafeCommit() + 1, minSeqNoRequired), current, readers)
);
assert minReferencedGen >= getMinFileGeneration() : "deletion policy requires a minReferenceGen of ["
+ minReferencedGen
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) {
});
}

@Override
public void onDelete(String name) {
fileTransferTracker.remove(name);
}

public Set<TransferFileSnapshot> exclusionFilter(Set<TransferFileSnapshot> original) {
return original.stream()
.filter(fileSnapshot -> fileTransferTracker.get(fileSnapshot.getName()) != TransferState.SUCCESS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot)
public void deleteTranslog(long primaryTerm, long generation) throws IOException {
String ckpFileName = Translog.getCommitCheckpointFileName(generation);
String translogFilename = Translog.getFilename(generation);
// ToDo - Take care of metadata file cleanup
fileTransferListener.onDelete(ckpFileName);
fileTransferListener.onDelete(translogFilename);
List<String> files = List.of(ckpFileName, translogFilename);
transferService.deleteBlobs(remoteBaseTransferPath.add(String.valueOf(primaryTerm)), files);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;

/**
* The listener to be invoked on the completion or failure of a {@link TransferFileSnapshot}
* The listener to be invoked on the completion or failure of a {@link TransferFileSnapshot} or deletion of file
*
* @opensearch.internal
*/
Expand All @@ -29,4 +29,6 @@ public interface FileTransferListener {
* @param e the exception while processing the {@link TransferFileSnapshot}
*/
void onFailure(TransferFileSnapshot fileSnapshot, Exception e);

void onDelete(String name);
}
Original file line number Diff line number Diff line change
Expand Up @@ -404,13 +404,13 @@ public void testSimpleOperationsUpload() throws IOException {
assertThat(snapshot.totalOperations(), equalTo(ops.size()));
}

assertEquals(translog.allUploaded().size(), 4);
assertEquals(translog.allUploaded().size(), 2);

addToTranslogAndListAndUpload(translog, ops, new Translog.Index("1", 1, primaryTerm.get(), new byte[] { 1 }));
assertEquals(translog.allUploaded().size(), 6);
assertEquals(translog.allUploaded().size(), 4);

translog.rollGeneration();
assertEquals(translog.allUploaded().size(), 6);
assertEquals(translog.allUploaded().size(), 4);

Set<String> mdFiles = blobStoreTransferService.listAll(
repository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())).add("metadata")
Expand Down Expand Up @@ -451,6 +451,38 @@ public void testSimpleOperationsUpload() throws IOException {
assertArrayEquals(ckp, content);
}
}

// expose the new checkpoint (simulating a commit), before we trim the translog
translog.deletionPolicy.setLocalCheckpointOfSafeCommit(0);
// simulating the remote segment upload .
translog.minSeqNoRequired(0);
// This should not trim anything
translog.trimUnreferencedReaders();
assertEquals(translog.allUploaded().size(), 4);
assertEquals(
blobStoreTransferService.listAll(
repository.basePath()
.add(shardId.getIndex().getUUID())
.add(String.valueOf(shardId.id()))
.add(String.valueOf(primaryTerm.get()))
).size(),
4
);

// This should trim tlog-2.* files as it contains seq no 0
translog.minSeqNoRequired(1);
translog.trimUnreferencedReaders();
assertEquals(translog.allUploaded().size(), 2);
assertEquals(
blobStoreTransferService.listAll(
repository.basePath()
.add(shardId.getIndex().getUUID())
.add(String.valueOf(shardId.id()))
.add(String.valueOf(primaryTerm.get()))
).size(),
2
);

}

private Long populateTranslogOps(boolean withMissingOps) throws IOException {
Expand Down Expand Up @@ -640,6 +672,7 @@ public void doRun() throws BrokenBarrierException, InterruptedException, IOExcep
// expose the new checkpoint (simulating a commit), before we trim the translog
lastCommittedLocalCheckpoint.set(localCheckpoint);
deletionPolicy.setLocalCheckpointOfSafeCommit(localCheckpoint);
translog.minSeqNoRequired(localCheckpoint + 1);
translog.trimUnreferencedReaders();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ public void onSuccess(TransferFileSnapshot fileSnapshot) {
public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) {
fileTransferFailed.incrementAndGet();
}

@Override
public void onDelete(String name) {

}
},
r -> r
);
Expand Down

0 comments on commit e1f2c43

Please sign in to comment.