Skip to content

Commit d422f60

Browse files
author
Aditya Khera
committed
rebased with merged-segment-warmer-stats
Signed-off-by: Aditya Khera <kheraadi@amazon.com>
1 parent e4ea4cc commit d422f60

File tree

12 files changed

+82
-86
lines changed

12 files changed

+82
-86
lines changed

server/src/main/java/org/opensearch/index/engine/MergedSegmentWarmer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
import org.apache.lucene.index.SegmentReader;
1717
import org.opensearch.cluster.service.ClusterService;
1818
import org.opensearch.common.logging.Loggers;
19-
import org.opensearch.index.merge.MergedSegmentWarmerPressureService;
2019
import org.opensearch.index.merge.MergedSegmentTransferTracker;
20+
import org.opensearch.index.merge.MergedSegmentWarmerPressureService;
2121
import org.opensearch.index.shard.IndexShard;
2222
import org.opensearch.indices.recovery.RecoverySettings;
2323
import org.opensearch.transport.TransportService;
@@ -99,9 +99,9 @@ boolean shouldWarm() {
9999
return false;
100100
}
101101

102-
if (mergedSegmentWarmerPressureService.isEnabled() &&
103-
mergedSegmentWarmerPressureService.shouldWarm(mergedSegmentReplicationTracker.stats()) == false) {
104-
mergedSegmentReplicationTracker.incrementTotalRejectedWarms();
102+
if (mergedSegmentWarmerPressureService.isEnabled()
103+
&& mergedSegmentWarmerPressureService.shouldWarm(mergedSegmentTransferTracker.stats()) == false) {
104+
mergedSegmentTransferTracker.incrementTotalRejectedCount();
105105
return false;
106106
}
107107

server/src/main/java/org/opensearch/index/merge/MergedSegmentTransferTracker.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public class MergedSegmentTransferTracker {
2727
private final CounterMetric totalBytesDownloaded = new CounterMetric();
2828
private final CounterMetric totalUploadTimeMillis = new CounterMetric();
2929
private final CounterMetric totalDownloadTimeMillis = new CounterMetric();
30+
private final CounterMetric totalRejectedCount = new CounterMetric();
3031
private final CounterMetric ongoingWarms = new CounterMetric();
3132

3233
public void incrementTotalWarmInvocationsCount() {
@@ -45,6 +46,10 @@ public void incrementTotalWarmFailureCount() {
4546
totalWarmFailureCount.inc();
4647
}
4748

49+
public void incrementTotalRejectedCount() {
50+
totalRejectedCount.inc();
51+
}
52+
4853
public void addTotalWarmTimeMillis(long time) {
4954
totalWarmTimeMillis.inc(time);
5055
}
@@ -57,11 +62,11 @@ public void addTotalDownloadTimeMillis(long time) {
5762
totalDownloadTimeMillis.inc(time);
5863
}
5964

60-
public void addTotalBytesUploaded(long bytes) {
65+
public void addTotalBytesSent(long bytes) {
6166
totalBytesUploaded.inc(bytes);
6267
}
6368

64-
public void addTotalBytesDownloaded(long bytes) {
69+
public void addTotalBytesReceived(long bytes) {
6570
totalBytesDownloaded.inc(bytes);
6671
}
6772

@@ -75,6 +80,7 @@ public MergedSegmentWarmerStats stats() {
7580
totalBytesDownloaded.count(),
7681
totalUploadTimeMillis.count(),
7782
totalDownloadTimeMillis.count(),
83+
totalRejectedCount.count(),
7884
ongoingWarms.count()
7985
);
8086
return stats;

server/src/main/java/org/opensearch/index/merge/MergedSegmentWarmerPressureService.java

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import java.util.function.Predicate;
2020

2121
/**
22-
* Service that applies throttling predicates to determine if merged segment warming should proceed.
22+
* Service that applies throttling rules to determine if merged segment warming should proceed.
2323
* Evaluates conditions like concurrency limits and applies backpressure when thresholds are exceeded.
2424
*
2525
* @opensearch.internal
@@ -31,16 +31,16 @@ public class MergedSegmentWarmerPressureService {
3131

3232
private final PressureSettings pressureSettings;
3333

34-
private final List<ThrottlePredicate> throttlePredicates;
34+
private final List<Rule> throttleRules;
3535

3636
public MergedSegmentWarmerPressureService(IndexShard indexShard) {
3737
this.pressureSettings = new PressureSettings(indexShard);
3838
this.logger = Loggers.getLogger(MergedSegmentWarmerPressureService.class, indexShard.shardId());
39-
this.throttlePredicates = List.of(new ConcurrencyLimiterPredicate(indexShard, pressureSettings));
39+
this.throttleRules = List.of(new ConcurrencyLimiterRule(indexShard, pressureSettings));
4040
}
4141

4242
public boolean isEnabled() {
43-
return pressureSettings.isMergedSegmentWarmerPressureServiceEnabled();
43+
return pressureSettings.isEnabled();
4444
}
4545

4646
/**
@@ -51,7 +51,7 @@ public boolean isEnabled() {
5151
* @return true if all predicates pass, false if any predicate fails
5252
*/
5353
public boolean shouldWarm(MergedSegmentWarmerStats stats) {
54-
return throttlePredicates.stream().allMatch(throttlePredicate -> {
54+
return throttleRules.stream().allMatch(throttlePredicate -> {
5555
boolean res = throttlePredicate.test(stats);
5656
if (res == false && logger.isTraceEnabled()) logger.trace(throttlePredicate.rejectionMessage(stats));
5757
return res;
@@ -63,44 +63,39 @@ public boolean shouldWarm(MergedSegmentWarmerStats stats) {
6363
*
6464
* @opensearch.internal
6565
*/
66-
private static abstract class ThrottlePredicate implements Predicate<MergedSegmentWarmerStats> {
66+
private static abstract class Rule implements Predicate<MergedSegmentWarmerStats> {
6767

6868
final PressureSettings pressureSettings;
6969
final IndexShard indexShard;
7070

71-
private ThrottlePredicate(IndexShard indexShard, PressureSettings pressureSettings) {
71+
private Rule(IndexShard indexShard, PressureSettings pressureSettings) {
7272
this.pressureSettings = pressureSettings;
7373
this.indexShard = indexShard;
7474
}
7575

7676
/**
77-
* Returns the name of the predicate.
77+
* Returns the name of the rule.
7878
*
7979
* @return the name using class name.
8080
*/
8181
abstract String name();
8282

8383
String rejectionMessage(MergedSegmentWarmerStats statsSnapshot) {
84-
return String.format(
85-
Locale.ROOT,
86-
"Merged segment warm rejected for shard [%s] by predicate: %s ",
87-
indexShard.shardId(),
88-
name()
89-
);
84+
return String.format(Locale.ROOT, "Merged segment warm rejected for shard [%s] by rule: %s | ", indexShard.shardId(), name());
9085
}
9186
}
9287

9388
/**
9489
* Predicate that limits concurrent segment warming operations to prevent blocking merges.
9590
* This is important because if all threads are blocked in merge operations and segment warming
96-
* is slow, we don't want to block new merges from proceeding. The predicate ensures there are
91+
* is slow, we don't want to block new merges from proceeding. The rule ensures there are
9792
* always enough threads available for merge operations by limiting concurrent warm operations
9893
* based on a configurable factor of the maximum concurrent merges.
9994
*/
100-
private static class ConcurrencyLimiterPredicate extends ThrottlePredicate {
101-
private final String NAME = "Concurrency limiter predicate for merged segment warmer throttling";
95+
private static class ConcurrencyLimiterRule extends Rule {
96+
private final String NAME = "Concurrency limiter rule for merged segment warmer throttling";
10297

103-
private ConcurrencyLimiterPredicate(IndexShard indexShard, PressureSettings pressureSettings) {
98+
private ConcurrencyLimiterRule(IndexShard indexShard, PressureSettings pressureSettings) {
10499
super(indexShard, pressureSettings);
105100
}
106101

@@ -118,15 +113,15 @@ String rejectionMessage(MergedSegmentWarmerStats stats) {
118113
long maxAllowed = calculateMaxAllowedConcurrentWarms(indexShard.getMaxMergesAllowed());
119114
return super.rejectionMessage(stats) + String.format(
120115
Locale.ROOT,
121-
"\nCurrent ongoing warms: %d, max allowed: %d",
122-
stats.getOngoingWarms(),
116+
"Current ongoing warms: %d, Max allowed: %d",
117+
stats.getOngoingCount(),
123118
maxAllowed
124119
);
125120
}
126121

127122
@Override
128123
public boolean test(MergedSegmentWarmerStats statsSnapshot) {
129-
long onGoingWarms = statsSnapshot.getOngoingWarms();
124+
long onGoingWarms = statsSnapshot.getOngoingCount();
130125
long maxAllowedWarms = calculateMaxAllowedConcurrentWarms(indexShard.getMaxMergesAllowed());
131126
return maxAllowedWarms > onGoingWarms;
132127
}
@@ -148,7 +143,7 @@ private IndexSettings indexSettings() {
148143
return indexShard.indexSettings();
149144
}
150145

151-
boolean isMergedSegmentWarmerPressureServiceEnabled() {
146+
boolean isEnabled() {
152147
return indexSettings().isMergedSegmentWarmerPressureEnabled();
153148
}
154149

server/src/main/java/org/opensearch/index/merge/MergedSegmentWarmerStats.java

Lines changed: 33 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -57,27 +57,27 @@ public class MergedSegmentWarmerStats implements Writeable, ToXContentFragment {
5757
public MergedSegmentWarmerStats() {}
5858

5959
public MergedSegmentWarmerStats(StreamInput in) throws IOException {
60-
totalWarmInvocationsCount = in.readVLong();
61-
totalWarmTimeMillis = in.readVLong();
62-
totalWarmFailureCount = in.readVLong();
63-
totalBytesUploaded = in.readVLong();
64-
totalBytesDownloaded = in.readVLong();
65-
totalUploadTimeMillis = in.readVLong();
66-
totalDownloadTimeMillis = in.readVLong();
60+
totalInvocationsCount = in.readVLong();
61+
totalTimeMillis = in.readVLong();
62+
totalFailureCount = in.readVLong();
63+
totalBytesSent = in.readVLong();
64+
totalBytesReceived = in.readVLong();
65+
totalSendTimeMillis = in.readVLong();
66+
totalReceiveTimeMillis = in.readVLong();
6767
totalRejectedCount = in.readVLong();
68-
ongoingWarms = in.readVLong();
68+
ongoingCount = in.readVLong();
6969
}
7070

7171
public synchronized void add(
72-
long totalWarmInvocationsCount,
73-
long totalWarmTimeMillis,
74-
long totalWarmFailureCount,
75-
long totalBytesUploaded,
76-
long totalBytesDownloaded,
77-
long totalUploadTimeMillis,
78-
long totalDownloadTimeMillis,
72+
long totalInvocationsCount,
73+
long totalTimeMillis,
74+
long totalFailureCount,
75+
long totalBytesSent,
76+
long totalBytesReceived,
77+
long totalSendTimeMillis,
78+
long totalReceiveTimeMillis,
7979
long totalRejectedCount,
80-
long ongoingWarms
80+
long ongoingCount
8181
) {
8282
this.totalInvocationsCount += totalInvocationsCount;
8383
this.totalTimeMillis += totalTimeMillis;
@@ -86,7 +86,7 @@ public synchronized void add(
8686
this.totalBytesReceived += totalBytesReceived;
8787
this.totalSendTimeMillis += totalSendTimeMillis;
8888
this.totalReceiveTimeMillis += totalReceiveTimeMillis;
89-
this.totalRejectedCount += totalRejectedWarms;
89+
this.totalRejectedCount += totalRejectedCount;
9090
this.ongoingCount += ongoingCount;
9191
}
9292

@@ -98,14 +98,14 @@ public synchronized void addTotals(MergedSegmentWarmerStats mergedSegmentWarmerS
9898
if (mergedSegmentWarmerStats == null) {
9999
return;
100100
}
101-
this.totalWarmInvocationsCount += mergedSegmentWarmerStats.totalWarmInvocationsCount;
102-
this.totalWarmTimeMillis += mergedSegmentWarmerStats.totalWarmTimeMillis;
103-
this.totalWarmFailureCount += mergedSegmentWarmerStats.totalWarmFailureCount;
104-
this.totalBytesUploaded += mergedSegmentWarmerStats.totalBytesUploaded;
105-
this.totalBytesDownloaded += mergedSegmentWarmerStats.totalBytesDownloaded;
106-
this.totalUploadTimeMillis += mergedSegmentWarmerStats.totalUploadTimeMillis;
107-
this.totalDownloadTimeMillis += mergedSegmentWarmerStats.totalDownloadTimeMillis;
108-
this.totalRejectedCount += mergedSegmentWarmerStats.totalRejectedWarms;
101+
this.totalInvocationsCount += mergedSegmentWarmerStats.totalInvocationsCount;
102+
this.totalTimeMillis += mergedSegmentWarmerStats.totalTimeMillis;
103+
this.totalFailureCount += mergedSegmentWarmerStats.totalFailureCount;
104+
this.totalBytesSent += mergedSegmentWarmerStats.totalBytesSent;
105+
this.totalBytesReceived += mergedSegmentWarmerStats.totalBytesReceived;
106+
this.totalSendTimeMillis += mergedSegmentWarmerStats.totalSendTimeMillis;
107+
this.totalReceiveTimeMillis += mergedSegmentWarmerStats.totalReceiveTimeMillis;
108+
this.totalRejectedCount += mergedSegmentWarmerStats.totalRejectedCount;
109109
}
110110

111111
public long getTotalInvocationsCount() {
@@ -147,13 +147,13 @@ public long getTotalRejectedCount() {
147147
@Override
148148
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
149149
builder.startObject(Fields.MERGED_SEGMENT_WARMER);
150-
builder.field(Fields.TOTAL_WARM_INVOCATIONS_COUNT, totalWarmInvocationsCount);
151-
builder.field(Fields.TOTAL_WARM_TIME_MILLIS, totalWarmTimeMillis);
152-
builder.field(Fields.TOTAL_WARM_FAILURE_COUNT, totalWarmFailureCount);
153-
builder.humanReadableField(Fields.TOTAL_BYTES_UPLOADED, Fields.TOTAL_BYTES_UPLOADED, new ByteSizeValue(totalBytesUploaded));
154-
builder.humanReadableField(Fields.TOTAL_BYTES_DOWNLOADED, Fields.TOTAL_BYTES_DOWNLOADED, new ByteSizeValue(totalBytesDownloaded));
155-
builder.field(Fields.TOTAL_UPLOAD_TIME_MILLIS, totalUploadTimeMillis);
156-
builder.field(Fields.TOTAL_DOWNLOAD_TIME_MILLIS, totalDownloadTimeMillis);
150+
builder.field(Fields.TOTAL_INVOCATIONS_COUNT, totalInvocationsCount);
151+
builder.field(Fields.TOTAL_TIME_MILLIS, totalTimeMillis);
152+
builder.field(Fields.TOTAL_FAILURE_COUNT, totalFailureCount);
153+
builder.humanReadableField(Fields.TOTAL_BYTES_SENT, Fields.TOTAL_SENT_SIZE, getTotalSentSize());
154+
builder.humanReadableField(Fields.TOTAL_BYTES_RECEIVED, Fields.TOTAL_RECEIVED_SIZE, getTotalReceivedSize());
155+
builder.humanReadableField(Fields.TOTAL_SEND_TIME_MILLIS, Fields.TOTAL_SEND_TIME, getTotalSendTime());
156+
builder.humanReadableField(Fields.TOTAL_RECEIVE_TIME_MILLIS, Fields.TOTAL_RECEIVE_TIME, getTotalReceiveTime());
157157
builder.field(Fields.TOTAL_REJECTED_COUNT, totalRejectedCount);
158158
builder.field(Fields.ONGOING_COUNT, ongoingCount);
159159
builder.endObject();
@@ -167,7 +167,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
167167
*/
168168
static final class Fields {
169169
static final String MERGED_SEGMENT_WARMER = "merged_segment_warmer";
170-
static final String WARM_INVOCATIONS_COUNT = "total_invocations_count";
170+
static final String TOTAL_INVOCATIONS_COUNT = "total_invocations_count";
171171
static final String TOTAL_TIME_MILLIS = "total_time_millis";
172172
static final String TOTAL_FAILURE_COUNT = "total_failure_count";
173173
static final String TOTAL_BYTES_SENT = "total_bytes_sent";

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import org.apache.lucene.search.QueryCachingPolicy;
4949
import org.apache.lucene.search.ReferenceManager;
5050
import org.apache.lucene.search.Sort;
51-
import org.apache.lucene.search.UsageTrackingQueryCachingPolicy;
5251
import org.apache.lucene.store.AlreadyClosedException;
5352
import org.apache.lucene.store.BufferedChecksumIndexInput;
5453
import org.apache.lucene.store.ChecksumIndexInput;
@@ -158,9 +157,9 @@
158157
import org.opensearch.index.mapper.SourceToParse;
159158
import org.opensearch.index.mapper.Uid;
160159
import org.opensearch.index.merge.MergeStats;
160+
import org.opensearch.index.merge.MergedSegmentTransferTracker;
161161
import org.opensearch.index.merge.MergedSegmentWarmerPressureService;
162162
import org.opensearch.index.merge.MergedSegmentWarmerStats;
163-
import org.opensearch.index.merge.MergedSegmentTransferTracker;
164163
import org.opensearch.index.recovery.RecoveryStats;
165164
import org.opensearch.index.refresh.RefreshStats;
166165
import org.opensearch.index.remote.RemoteSegmentStats;
@@ -1573,7 +1572,7 @@ public MergeStats mergeStats() {
15731572
}
15741573

15751574
public MergedSegmentWarmerStats mergedSegmentWarmerStats() {
1576-
return mergedSegmentReplicationTracker.stats();
1575+
return mergedSegmentTransferTracker.stats();
15771576
}
15781577

15791578
public SegmentsStats segmentStats(boolean includeSegmentFileSizes, boolean includeUnloadedSegments) {

server/src/main/java/org/opensearch/indices/replication/AbstractSegmentReplicationTarget.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -282,11 +282,6 @@ private boolean validateLocalChecksum(StoreFileMetadata file) {
282282
}
283283
}
284284

285-
protected void updateMergedSegmentFileRecoveryBytes(String fileName, long bytesRecovered) {
286-
indexShard.mergedSegmentReplicationTracker().addTotalBytesDownloaded(bytesRecovered);
287-
updateFileRecoveryBytes(fileName, bytesRecovered);
288-
}
289-
290285
/**
291286
* Updates the state to reflect recovery progress for the given file and
292287
* updates the last access time for the target.

server/src/main/java/org/opensearch/indices/replication/MergedSegmentReplicationTarget.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public MergedSegmentReplicationTarget retryCopy() {
7878
}
7979

8080
protected void updateMergedSegmentFileRecoveryBytes(String fileName, long bytesRecovered) {
81-
indexShard.mergedSegmentTransferTracker().addTotalBytesDownloaded(bytesRecovered);
81+
indexShard.mergedSegmentTransferTracker().addTotalBytesReceived(bytesRecovered);
8282
updateFileRecoveryBytes(fileName, bytesRecovered);
8383
}
8484
}

server/src/main/java/org/opensearch/indices/replication/checkpoint/RemoteStorePublishMergedSegmentAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ public void beforeUpload(String file) {}
180180
@Override
181181
public void onSuccess(String file) {
182182
localToRemoteStoreFilenames.put(file, indexShard.getRemoteDirectory().getExistingRemoteFilename(file));
183-
indexShard.mergedSegmentTransferTracker().addTotalBytesUploaded(checkpoint.getMetadataMap().get(file).length());
183+
indexShard.mergedSegmentTransferTracker().addTotalBytesSent(checkpoint.getMetadataMap().get(file).length());
184184
}
185185

186186
@Override

server/src/main/java/org/opensearch/rest/action/cat/RestIndicesAction.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -665,10 +665,13 @@ protected Table getTableWithHeader(final RestRequest request, final PageToken pa
665665
);
666666

667667
table.addCell(
668-
"merged_segment_warmer.total_rejected_warms",
669-
"alias:mswtrw,mergedSegmentWarmerTotalRejectedWarms;default:false;text-align:right;desc:UPDATE"
668+
"merged_segment_warmer.total_rejected_count",
669+
"alias:mswtrc,mergedSegmentWarmerTotalRejectedCount;default:false;text-align:right;desc:total number of merged segment warm operations rejected by the pressure service"
670+
);
671+
table.addCell(
672+
"pri.merged_segment_warmer.total_rejected_count",
673+
"default:false;text-align:right;desc:total number of merged segment warm operations rejected by the pressure service"
670674
);
671-
table.addCell("pri.merged_segment_warmer.total_rejected_warms", "default:false;text-align:right;desc:UPDATE");
672675

673676
table.addCell("refresh.total", "sibling:pri;alias:rto,refreshTotal;default:false;text-align:right;desc:total refreshes");
674677
table.addCell("pri.refresh.total", "default:false;text-align:right;desc:total refreshes");
@@ -1091,11 +1094,9 @@ protected Table buildTable(
10911094
table.addCell(mergedSegmentWarmerTotalStats == null ? null : mergedSegmentWarmerTotalStats.getTotalSendTime());
10921095
table.addCell(mergedSegmentWarmerPrimaryStats == null ? null : mergedSegmentWarmerPrimaryStats.getTotalSendTime());
10931096

1097+
table.addCell(totalStats.getMergedSegmentWarmer() == null ? null : totalStats.getMergedSegmentWarmer().getTotalRejectedCount());
10941098
table.addCell(
1095-
totalStats.getMergedSegmentWarmer() == null ? null : totalStats.getMergedSegmentWarmer().getTotalRejectedWarms()
1096-
);
1097-
table.addCell(
1098-
primaryStats.getMergedSegmentWarmer() == null ? null : primaryStats.getMergedSegmentWarmer().getTotalRejectedWarms()
1099+
primaryStats.getMergedSegmentWarmer() == null ? null : primaryStats.getMergedSegmentWarmer().getTotalRejectedCount()
10991100
);
11001101

11011102
table.addCell(totalStats.getRefresh() == null ? null : totalStats.getRefresh().getTotal());

server/src/main/java/org/opensearch/rest/action/cat/RestNodesAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -308,8 +308,8 @@ protected Table getTableWithHeader(final RestRequest request) {
308308
"alias:mswtst,mergedSegmentWarmerTotalSendTime;default:false;text-align:right;desc:total wallclock time spent sending merged segments by a primary shard"
309309
);
310310
table.addCell(
311-
"merged_segment_warmer.total_rejected_warms",
312-
"alias:mswtrw,mergedSegmentWarmerTotalRejectedWarms;default:false;text-align:right;desc:UPDATE"
311+
"merged_segment_warmer.total_rejected_count",
312+
"alias:mswtrc,mergedSegmentWarmerTotalRejectedCount;default:false;text-align:right;desc:total number of merged segment warm operations rejected by the pressure service"
313313
);
314314

315315
table.addCell("refresh.total", "alias:rto,refreshTotal;default:false;text-align:right;desc:total refreshes");

0 commit comments

Comments
 (0)