Skip to content

Commit

Permalink
[Remote Store] Add capability of doing flush as determined by the tra…
Browse files Browse the repository at this point in the history
…nslog (#12992)

Signed-off-by: Shubh Sahu <shubhvs@amazon.com>
  • Loading branch information
astute-decipher authored and Shubh Sahu committed Apr 25, 2024
1 parent 443dd81 commit c9e2677
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959))
- [Streaming Indexing] Ensure support of the new transport by security plugin ([#13174](https://github.com/opensearch-project/OpenSearch/pull/13174))
- Add cluster setting to dynamically configure the buckets for filter rewrite optimization. ([#13179](https://github.com/opensearch-project/OpenSearch/pull/13179))
- [Remote Store] Add capability of doing refresh as determined by the translog ([#12992](https://github.com/opensearch-project/OpenSearch/pull/12992))

### Dependencies
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardClosedException;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.Translog.Durability;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.RemoteStoreSettings;
Expand Down Expand Up @@ -58,6 +59,11 @@

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA;
import static org.opensearch.index.shard.IndexShardTestCase.getTranslog;
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
Expand Down Expand Up @@ -871,4 +877,45 @@ public void testLocalOnlyTranslogCleanupOnNodeRestart() throws Exception {
refresh(INDEX_NAME);
assertHitCount(client(dataNode).prepareSearch(INDEX_NAME).setSize(0).get(), searchableDocs + 15);
}

public void testFlushOnTooManyRemoteTranslogFiles() throws Exception {
internalCluster().startClusterManagerOnlyNode();
String datanode = internalCluster().startDataOnlyNodes(1).get(0);
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1));
ensureGreen(INDEX_NAME);

ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(
Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "100")
);
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

IndexShard indexShard = getIndexShard(datanode, INDEX_NAME);
Path translogLocation = getTranslog(indexShard).location();
assertFalse(indexShard.shouldPeriodicallyFlush());

try (Stream<Path> files = Files.list(translogLocation)) {
long totalFiles = files.filter(f -> f.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX)).count();
assertEquals(totalFiles, 1L);
}

// indexing 100 documents (100 bulk requests), no flush will be triggered yet
for (int i = 0; i < 100; i++) {
indexBulk(INDEX_NAME, 1);
}

try (Stream<Path> files = Files.list(translogLocation)) {
long totalFiles = files.filter(f -> f.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX)).count();
assertEquals(totalFiles, 101L);
}
// Will flush and trim the translog readers
indexBulk(INDEX_NAME, 1);

assertBusy(() -> {
try (Stream<Path> files = Files.list(translogLocation)) {
long totalFiles = files.filter(f -> f.getFileName().toString().endsWith(Translog.TRANSLOG_FILE_SUFFIX)).count();
assertEquals(totalFiles, 1L);
}
}, 30, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,8 @@ public void apply(Settings value, Settings current, Settings previous) {
IoBasedAdmissionControllerSettings.INDEXING_IO_USAGE_LIMIT,

RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2924,7 +2924,7 @@ public void restoreFromRepository(Repository repository, ActionListener<Boolean>
*
* @return {@code true} if the engine should be flushed
*/
boolean shouldPeriodicallyFlush() {
public boolean shouldPeriodicallyFlush() {
final Engine engine = getEngineOrNull();
if (engine != null) {
try {
Expand Down Expand Up @@ -4508,6 +4508,7 @@ public Durability getTranslogDurability() {
/**
* Schedules a flush or translog generation roll if needed but will not schedule more than one concurrently. The operation will be
* executed asynchronously on the flush thread pool.
* Can also schedule a flush if decided by translog manager
*/
public void afterWriteOperation() {
if (shouldPeriodicallyFlush() || shouldRollTranslogGeneration()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,12 @@ public String getTranslogUUID() {
* @return if the translog should be flushed
*/
public boolean shouldPeriodicallyFlush(long localCheckpointOfLastCommit, long flushThreshold) {
/*
* This can trigger flush depending upon translog's implementation
*/
if (translog.shouldFlush()) {
return true;
}
// This is the minimum seqNo that is referred in translog and considered for calculating translog size
long minTranslogRefSeqNo = translog.getMinUnreferencedSeqNoInSegments(localCheckpointOfLastCommit + 1);
final long minReferencedTranslogGeneration = translog.getMinGenerationForSeqNo(minTranslogRefSeqNo).translogFileGeneration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -632,4 +632,15 @@ public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommi
int availablePermits() {
return syncPermit.availablePermits();
}

/**
* Checks whether or not the shard should be flushed based on translog files.
* This checks if number of translog files breaches the threshold count determined by
* {@code cluster.remote_store.translog.max_readers} setting
* @return {@code true} if the shard should be flushed
*/
@Override
protected boolean shouldFlush() {
return readers.size() >= translogTransferManager.getMaxRemoteTranslogReadersSettings();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2082,4 +2082,13 @@ public static String createEmptyTranslog(
public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommit) {
return minUnrefCheckpointInLastCommit;
}

/**
* Checks whether or not the shard should be flushed based on translog files.
* each translog type can have it's own decider
* @return {@code true} if the shard should be flushed
*/
protected boolean shouldFlush() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -577,4 +577,8 @@ public void onFailure(Exception e) {
throw e;
}
}

public int getMaxRemoteTranslogReadersSettings() {
return this.remoteStoreSettings.getMaxRemoteTranslogReaders();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,20 @@ public class RemoteStoreSettings {
Property.Dynamic
);

/**
* Controls the maximum referenced remote translog files. If breached the shard will be flushed.
*/
public static final Setting<Integer> CLUSTER_REMOTE_MAX_TRANSLOG_READERS = Setting.intSetting(
"cluster.remote_store.translog.max_readers",
1000,
100,
Property.Dynamic,
Property.NodeScope
);

private volatile TimeValue clusterRemoteTranslogBufferInterval;
private volatile int minRemoteSegmentMetadataFiles;
private volatile int maxRemoteTranslogReaders;

public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) {
this.clusterRemoteTranslogBufferInterval = CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings);
Expand All @@ -69,6 +81,10 @@ public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) {
CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
this::setMinRemoteSegmentMetadataFiles
);

maxRemoteTranslogReaders = CLUSTER_REMOTE_MAX_TRANSLOG_READERS.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_REMOTE_MAX_TRANSLOG_READERS, this::setMaxRemoteTranslogReaders);

}

// Exclusively for testing, please do not use it elsewhere.
Expand All @@ -87,4 +103,12 @@ private void setMinRemoteSegmentMetadataFiles(int minRemoteSegmentMetadataFiles)
public int getMinRemoteSegmentMetadataFiles() {
return this.minRemoteSegmentMetadataFiles;
}

public int getMaxRemoteTranslogReaders() {
return maxRemoteTranslogReaders;
}

private void setMaxRemoteTranslogReaders(int maxRemoteTranslogReaders) {
this.maxRemoteTranslogReaders = maxRemoteTranslogReaders;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,24 @@ public void testSegmentMetadataRetention() {
);
assertEquals(15, remoteStoreSettings.getMinRemoteSegmentMetadataFiles());
}

public void testMaxRemoteReferencedTranslogFiles() {
// Test default value
assertEquals(1000, remoteStoreSettings.getMaxRemoteTranslogReaders());

// Test override with valid value
clusterSettings.applySettings(
Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "500").build()
);
assertEquals(500, remoteStoreSettings.getMaxRemoteTranslogReaders());

// Test override with value less than minimum
assertThrows(
IllegalArgumentException.class,
() -> clusterSettings.applySettings(
Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS.getKey(), "99").build()
)
);
assertEquals(500, remoteStoreSettings.getMaxRemoteTranslogReaders());
}
}

0 comments on commit c9e2677

Please sign in to comment.