Skip to content

Commit 9a9ec86

Browse files
RajatGupta02Rajat Guptaatrisandrrosscwperks
authored andcommitted
Add Channel Factory parameter to Translog (opensearch-project#18918)
* Add overload for channelFactory Signed-off-by: Rajat Gupta <gptrajat@amazon.com> * Fix tests Signed-off-by: Rajat Gupta <gptrajat@amazon.com> * Add Changelog entry Signed-off-by: Rajat Gupta <gptrajat@amazon.com> * Fix conflicts Signed-off-by: Rajat Gupta <gptrajat@amazon.com> * When update operations fail during preparation (e.g., version conflicts), (opensearch-project#18917) TransportShardBulkAction still triggers refresh even though no actual writes occurred. This fix checks if locationToSync is null (indicating no writes) and prevents refresh in such cases. Fixes opensearch-project#15261 Signed-off-by: Atri Sharma <atri.jiit@gmail.com> * Remove all entries from changelog to be released in 3.2 (opensearch-project#18989) Signed-off-by: Andrew Ross <andrross@amazon.com> * Add temporal routing processors for time-based document routing (opensearch-project#18966) Implements TemporalRoutingProcessor for ingest pipelines and TemporalRoutingSearchProcessor for search pipelines based on RFC opensearch-project#18920. Features: - Route documents to shards based on timestamp fields - Support hour, day, week, and month granularities - Optional hash bucketing for better distribution - Automatic search routing to relevant time ranges - ISO week format support The processors enable efficient time-based data organization for log and metrics workloads by co-locating documents from the same time period on the same shards. --------- Signed-off-by: Atri Sharma <atri.jiit@gmail.com> * Add CompletionStage variants to methods in the Client Interface and default to ActionListener impl (opensearch-project#18998) * Add CompletableFuture variables to methods in the Client Interface and default to ActionListener impl Signed-off-by: Craig Perkins <cwperx@amazon.com> * Add to CHANGELOG Signed-off-by: Craig Perkins <cwperx@amazon.com> * Fix typo in CHANGELOG Signed-off-by: Craig Perkins <cwperx@amazon.com> * Switch to CompletionStage Signed-off-by: Craig Perkins <cwperx@amazon.com> * Update CHANGELOG entry Signed-off-by: Craig Perkins <cwperx@amazon.com> --------- Signed-off-by: Craig Perkins <cwperx@amazon.com> * Expand fetch phase profiling to support inner hits and top hits aggregation phases (opensearch-project#18936) --------- Signed-off-by: Andre van de Ven <andrebvandeven@gmail.com> Signed-off-by: Andre van de Ven <113951599+andrevandeven@users.noreply.github.com> Signed-off-by: Andre van de Ven <andrevdv@amazon.com> Co-authored-by: Andre van de Ven <andrevdv@amazon.com> * IllegalArgumentException when scroll ID has a node no longer part of the Cluster (opensearch-project#19031) --------- Signed-off-by: Anurag Rai <anurag.rai@uber.com> Signed-off-by: Anurag Rai <91844619+anuragrai16@users.noreply.github.com> * Add Changelog entry Signed-off-by: Rajat Gupta <gptrajat@amazon.com> * Add secondary constructor Signed-off-by: Rajat Gupta <gptrajat@amazon.com> * Modify changelog Signed-off-by: Rajat Gupta <gptrajat@amazon.com> * Update changelog Signed-off-by: Rajat Gupta <gptrajat@amazon.com> * Add another constructor to fix breaking change check Signed-off-by: Rajat Gupta <gptrajat@amazon.com> --------- Signed-off-by: Rajat Gupta <gptrajat@amazon.com> Signed-off-by: Atri Sharma <atri.jiit@gmail.com> Signed-off-by: Andrew Ross <andrross@amazon.com> Signed-off-by: Craig Perkins <cwperx@amazon.com> Signed-off-by: Andre van de Ven <andrebvandeven@gmail.com> Signed-off-by: Andre van de Ven <113951599+andrevandeven@users.noreply.github.com> Signed-off-by: Andre van de Ven <andrevdv@amazon.com> Signed-off-by: Anurag Rai <anurag.rai@uber.com> Signed-off-by: Anurag Rai <91844619+anuragrai16@users.noreply.github.com> Co-authored-by: Rajat Gupta <gptrajat@amazon.com> Co-authored-by: Atri Sharma <atri.jiit@gmail.com> Co-authored-by: Andrew Ross <andrross@amazon.com> Co-authored-by: Craig Perkins <cwperx@amazon.com> Co-authored-by: Andre van de Ven <113951599+andrevandeven@users.noreply.github.com> Co-authored-by: Andre van de Ven <andrevdv@amazon.com> Co-authored-by: Anurag Rai <91844619+anuragrai16@users.noreply.github.com>
1 parent d2a3c27 commit 9a9ec86

File tree

14 files changed

+160
-104
lines changed

14 files changed

+160
-104
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1313
- Add StoreFactory plugin interface for custom Store implementations([#19091](https://github.com/opensearch-project/OpenSearch/pull/19091))
1414
- Use S3CrtClient for higher throughput while uploading files to S3 ([#18800](https://github.com/opensearch-project/OpenSearch/pull/18800))
1515
- Add a dynamic setting to change skip_cache_factor and min_frequency for querycache ([#18351](https://github.com/opensearch-project/OpenSearch/issues/18351))
16+
- Add overload constructor for Translog to accept Channel Factory as a parameter ([#18918](https://github.com/opensearch-project/OpenSearch/pull/18918))
1617

1718
### Changed
1819
- Add CompletionStage variants to methods in the Client Interface and default to ActionListener impl ([#18998](https://github.com/opensearch-project/OpenSearch/pull/18998))

server/src/main/java/org/opensearch/index/translog/InternalTranslogFactory.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ public Translog newTranslog(
4141
globalCheckpointSupplier,
4242
primaryTermSupplier,
4343
persistedSequenceNumberConsumer,
44-
TranslogOperationHelper.DEFAULT
44+
TranslogOperationHelper.DEFAULT,
45+
null
4546
);
4647
}
4748

@@ -64,7 +65,8 @@ public Translog newTranslog(
6465
globalCheckpointSupplier,
6566
primaryTermSupplier,
6667
persistedSequenceNumberConsumer,
67-
translogOperationHelper
68+
translogOperationHelper,
69+
null
6870
);
6971
}
7072
}

server/src/main/java/org/opensearch/index/translog/LocalTranslog.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ public LocalTranslog(
5050
final LongSupplier globalCheckpointSupplier,
5151
final LongSupplier primaryTermSupplier,
5252
final LongConsumer persistedSequenceNumberConsumer,
53-
final TranslogOperationHelper translogOperationHelper
53+
final TranslogOperationHelper translogOperationHelper,
54+
final ChannelFactory channelFactory
5455
) throws IOException {
5556
super(
5657
config,
@@ -59,7 +60,8 @@ public LocalTranslog(
5960
globalCheckpointSupplier,
6061
primaryTermSupplier,
6162
persistedSequenceNumberConsumer,
62-
translogOperationHelper
63+
translogOperationHelper,
64+
channelFactory
6365
);
6466
try {
6567
final Checkpoint checkpoint = readCheckpoint(location);
@@ -113,6 +115,30 @@ public LocalTranslog(
113115
}
114116
}
115117

118+
/**
119+
* Secondary constructor that does not accept ChannelFactory parameter.
120+
*/
121+
public LocalTranslog(
122+
final TranslogConfig config,
123+
final String translogUUID,
124+
TranslogDeletionPolicy deletionPolicy,
125+
final LongSupplier globalCheckpointSupplier,
126+
final LongSupplier primaryTermSupplier,
127+
final LongConsumer persistedSequenceNumberConsumer,
128+
final TranslogOperationHelper translogOperationHelper
129+
) throws IOException {
130+
this(
131+
config,
132+
translogUUID,
133+
deletionPolicy,
134+
globalCheckpointSupplier,
135+
primaryTermSupplier,
136+
persistedSequenceNumberConsumer,
137+
translogOperationHelper,
138+
null
139+
);
140+
}
141+
116142
/**
117143
* Ensures that the given location has be synced / written to the underlying storage.
118144
*

server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,8 @@ public Translog newTranslog(
122122
startedPrimarySupplier,
123123
remoteTranslogTransferTracker,
124124
remoteStoreSettings,
125-
translogOperationHelper
125+
translogOperationHelper,
126+
null
126127
);
127128
}
128129
}

server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,8 @@ public RemoteFsTimestampAwareTranslog(
9090
startedPrimarySupplier,
9191
remoteTranslogTransferTracker,
9292
remoteStoreSettings,
93-
translogOperationHelper
93+
translogOperationHelper,
94+
null
9495
);
9596
logger = Loggers.getLogger(getClass(), shardId);
9697
this.metadataFilePinnedTimestampMap = new HashMap<>();

server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ public RemoteFsTranslog(
108108
BooleanSupplier startedPrimarySupplier,
109109
RemoteTranslogTransferTracker remoteTranslogTransferTracker,
110110
RemoteStoreSettings remoteStoreSettings,
111-
TranslogOperationHelper translogOperationHelper
111+
TranslogOperationHelper translogOperationHelper,
112+
ChannelFactory channelFactory
112113
) throws IOException {
113114
super(
114115
config,
@@ -117,7 +118,8 @@ public RemoteFsTranslog(
117118
globalCheckpointSupplier,
118119
primaryTermSupplier,
119120
persistedSequenceNumberConsumer,
120-
translogOperationHelper
121+
translogOperationHelper,
122+
channelFactory
121123
);
122124
logger = Loggers.getLogger(getClass(), shardId);
123125
this.startedPrimarySupplier = startedPrimarySupplier;

server/src/main/java/org/opensearch/index/translog/Translog.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ public abstract class Translog extends AbstractIndexShardComponent implements In
154154
protected final TranslogDeletionPolicy deletionPolicy;
155155
protected final LongConsumer persistedSequenceNumberConsumer;
156156
protected final TranslogOperationHelper translogOperationHelper;
157+
protected final ChannelFactory channelFactory;
157158

158159
/**
159160
* Creates a new Translog instance. This method will create a new transaction log unless the given {@link TranslogGeneration} is
@@ -182,7 +183,8 @@ public Translog(
182183
final LongSupplier globalCheckpointSupplier,
183184
final LongSupplier primaryTermSupplier,
184185
final LongConsumer persistedSequenceNumberConsumer,
185-
final TranslogOperationHelper translogOperationHelper
186+
final TranslogOperationHelper translogOperationHelper,
187+
final ChannelFactory channelFactory
186188
) throws IOException {
187189
super(config.getShardId(), config.getIndexSettings());
188190
this.config = config;
@@ -198,6 +200,31 @@ public Translog(
198200
this.location = config.getTranslogPath();
199201
Files.createDirectories(this.location);
200202
this.translogOperationHelper = translogOperationHelper;
203+
this.channelFactory = channelFactory != null ? channelFactory : FileChannel::open;
204+
}
205+
206+
/**
207+
* Constructor that does not accept channelFactory parameter but accepts translogOperationHelper
208+
*/
209+
public Translog(
210+
final TranslogConfig config,
211+
final String translogUUID,
212+
TranslogDeletionPolicy deletionPolicy,
213+
final LongSupplier globalCheckpointSupplier,
214+
final LongSupplier primaryTermSupplier,
215+
final LongConsumer persistedSequenceNumberConsumer,
216+
final TranslogOperationHelper translogOperationHelper
217+
) throws IOException {
218+
this(
219+
config,
220+
translogUUID,
221+
deletionPolicy,
222+
globalCheckpointSupplier,
223+
primaryTermSupplier,
224+
persistedSequenceNumberConsumer,
225+
translogOperationHelper,
226+
null
227+
);
201228
}
202229

203230
/**
@@ -218,7 +245,8 @@ public Translog(
218245
globalCheckpointSupplier,
219246
primaryTermSupplier,
220247
persistedSequenceNumberConsumer,
221-
TranslogOperationHelper.DEFAULT
248+
TranslogOperationHelper.DEFAULT,
249+
FileChannel::open
222250
);
223251
assert config.getIndexSettings().isDerivedSourceEnabled() == false; // For derived source supported index, it is incorrect to use
224252
// this constructor
@@ -324,7 +352,7 @@ protected void copyCheckpointTo(Path targetPath) throws IOException {
324352
}
325353

326354
TranslogReader openReader(Path path, Checkpoint checkpoint) throws IOException {
327-
FileChannel channel = FileChannel.open(path, StandardOpenOption.READ);
355+
FileChannel channel = getChannelFactory().open(path, StandardOpenOption.READ);
328356
try {
329357
assert Translog.parseIdFromFileName(path) == checkpoint.generation : "expected generation: "
330358
+ Translog.parseIdFromFileName(path)
@@ -1931,7 +1959,7 @@ protected void ensureOpen() {
19311959
}
19321960

19331961
ChannelFactory getChannelFactory() {
1934-
return FileChannel::open;
1962+
return this.channelFactory;
19351963
}
19361964

19371965
/**

server/src/main/java/org/opensearch/index/translog/TruncateTranslogAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,8 @@ public long minTranslogGenRequired(List<TranslogReader> readers, TranslogWriter
222222
() -> translogGlobalCheckpoint,
223223
() -> primaryTerm,
224224
seqNo -> {},
225-
TranslogOperationHelper.DEFAULT
225+
TranslogOperationHelper.DEFAULT,
226+
null
226227
);
227228
Translog.Snapshot snapshot = translog.newSnapshot(0, Long.MAX_VALUE)
228229
) {

server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4182,7 +4182,8 @@ public void testRecoverFromForeignTranslog() throws IOException {
41824182
() -> SequenceNumbers.NO_OPS_PERFORMED,
41834183
primaryTerm::get,
41844184
seqNo -> {},
4185-
TranslogOperationHelper.DEFAULT
4185+
TranslogOperationHelper.DEFAULT,
4186+
null
41864187
);
41874188
translog.add(new Translog.Index("SomeBogusId", 0, primaryTerm.get(), "{}".getBytes(Charset.forName("UTF-8"))));
41884189
assertEquals(generation.translogFileGeneration, translog.currentFileGeneration());

0 commit comments

Comments
 (0)