From c90b6eac285055959d419ae67c369421355e78f8 Mon Sep 17 00:00:00 2001 From: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com> Date: Fri, 25 Aug 2023 12:22:53 +0530 Subject: [PATCH] [Remote Store] Add total upload and download time from remote store to nodes stats (#9454) --------- Signed-off-by: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com> --- CHANGELOG.md | 2 +- .../indices/stats/IndexStatsIT.java | 2 + .../RemoteSegmentStatsFromNodesStatsIT.java | 67 ++++++---- .../index/remote/RemoteSegmentStats.java | 41 +++++- .../remote/RemoteSegmentTransferTracker.java | 49 +++++--- .../shard/RemoteStoreRefreshListener.java | 56 +++++---- .../store/DirectoryFileTransferTracker.java | 118 +++++++++++------- .../org/opensearch/index/store/Store.java | 8 +- .../cluster/node/stats/NodeStatsTests.java | 4 + .../stats/RemoteStoreStatsTestHelper.java | 7 +- .../RemoteSegmentTransferTrackerTests.java | 50 ++++++-- .../RemoteStorePressureServiceTests.java | 2 +- 12 files changed, 285 insertions(+), 121 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 11524d3d66161..99b8120ee93df 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,7 +48,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Change http code on create index API with bad input raising NotXContentException from 500 to 400 ([#4773](https://github.com/opensearch-project/OpenSearch/pull/4773)) - Improve summary error message for invalid setting updates ([#4792](https://github.com/opensearch-project/OpenSearch/pull/4792)) - [Remote Store] Add Segment download stats to remotestore stats API ([#8718](https://github.com/opensearch-project/OpenSearch/pull/8718)) -- [Remote Store] Add remote segment transfer stats on NodesStats API ([#9168](https://github.com/opensearch-project/OpenSearch/pull/9168) [#9393](https://github.com/opensearch-project/OpenSearch/pull/9393)) +- [Remote Store] Add remote segment transfer stats on NodesStats API ([#9168](https://github.com/opensearch-project/OpenSearch/pull/9168) [#9393](https://github.com/opensearch-project/OpenSearch/pull/9393) [#9454](https://github.com/opensearch-project/OpenSearch/pull/9454)) - Return 409 Conflict HTTP status instead of 503 on failure to concurrently execute snapshots ([#8986](https://github.com/opensearch-project/OpenSearch/pull/5855)) ### Deprecated diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java index af5191d7d2039..1a131a2a7eb3d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java @@ -1457,6 +1457,8 @@ private void assertZeroRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats) assertEquals(0, remoteSegmentStats.getTotalRefreshBytesLag()); assertEquals(0, remoteSegmentStats.getMaxRefreshBytesLag()); assertEquals(0, remoteSegmentStats.getMaxRefreshTimeLag()); + assertEquals(0, remoteSegmentStats.getTotalUploadTime()); + assertEquals(0, remoteSegmentStats.getTotalDownloadTime()); } /** diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteSegmentStatsFromNodesStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteSegmentStatsFromNodesStatsIT.java index 19ad43b503ab7..c2e79ea2de5ef 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteSegmentStatsFromNodesStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteSegmentStatsFromNodesStatsIT.java @@ -67,7 +67,8 @@ public void testNodesStatsParityWithOnlyPrimaryShards() { indexSingleDoc(secondIndex, true); long cumulativeUploadsSucceeded = 0, cumulativeUploadsStarted = 0, cumulativeUploadsFailed = 0; - long total_bytes_lag = 0, max_bytes_lag = 0, max_time_lag = 0; + long totalBytesLag = 0, maxBytesLag = 0, maxTimeLag = 0; + long totalUploadTime = 0; // Fetch upload stats RemoteStoreStatsResponse remoteStoreStatsFirstIndex = client(randomDataNode).admin() .cluster() @@ -77,9 +78,10 @@ public void testNodesStatsParityWithOnlyPrimaryShards() { cumulativeUploadsSucceeded += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesSucceeded; cumulativeUploadsStarted += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesStarted; cumulativeUploadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesFailed; - total_bytes_lag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag; - max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag); - max_time_lag = Math.max(max_time_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs); + totalBytesLag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag; + maxBytesLag = Math.max(maxBytesLag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag); + maxTimeLag = Math.max(maxTimeLag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs); + totalUploadTime += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().totalUploadTimeInMs; RemoteStoreStatsResponse remoteStoreStatsSecondIndex = client(randomDataNode).admin() .cluster() @@ -90,9 +92,10 @@ public void testNodesStatsParityWithOnlyPrimaryShards() { cumulativeUploadsSucceeded += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesSucceeded; cumulativeUploadsStarted += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesStarted; cumulativeUploadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesFailed; - total_bytes_lag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag; - max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag); - max_time_lag = Math.max(max_time_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs); + totalBytesLag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag; + maxBytesLag = Math.max(maxBytesLag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag); + maxTimeLag = Math.max(maxTimeLag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs); + totalUploadTime += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().totalUploadTimeInMs; // Fetch nodes stats NodesStatsResponse nodesStatsResponse = client().admin() @@ -101,12 +104,13 @@ public void testNodesStatsParityWithOnlyPrimaryShards() { .setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.Segments, true)) .get(); RemoteSegmentStats remoteSegmentStats = nodesStatsResponse.getNodes().get(0).getIndices().getSegments().getRemoteSegmentStats(); - assertEquals(cumulativeUploadsSucceeded, remoteSegmentStats.getUploadBytesSucceeded()); - assertEquals(cumulativeUploadsStarted, remoteSegmentStats.getUploadBytesStarted()); + assertTrue(cumulativeUploadsSucceeded > 0 && cumulativeUploadsSucceeded == remoteSegmentStats.getUploadBytesSucceeded()); + assertTrue(cumulativeUploadsStarted > 0 && cumulativeUploadsStarted == remoteSegmentStats.getUploadBytesStarted()); assertEquals(cumulativeUploadsFailed, remoteSegmentStats.getUploadBytesFailed()); - assertEquals(total_bytes_lag, remoteSegmentStats.getTotalRefreshBytesLag()); - assertEquals(max_bytes_lag, remoteSegmentStats.getMaxRefreshBytesLag()); - assertEquals(max_time_lag, remoteSegmentStats.getMaxRefreshTimeLag()); + assertEquals(totalBytesLag, remoteSegmentStats.getTotalRefreshBytesLag()); + assertEquals(maxBytesLag, remoteSegmentStats.getMaxRefreshBytesLag()); + assertEquals(maxTimeLag, remoteSegmentStats.getMaxRefreshTimeLag()); + assertTrue(totalUploadTime > 0 && totalUploadTime == remoteSegmentStats.getTotalUploadTime()); } /** @@ -180,13 +184,16 @@ private void assertZeroRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats) assertEquals(0, remoteSegmentStats.getTotalRefreshBytesLag()); assertEquals(0, remoteSegmentStats.getMaxRefreshBytesLag()); assertEquals(0, remoteSegmentStats.getMaxRefreshTimeLag()); + assertEquals(0, remoteSegmentStats.getTotalUploadTime()); + assertEquals(0, remoteSegmentStats.getTotalDownloadTime()); } private static void assertNodeStatsParityAcrossNodes(String firstIndex, String secondIndex) { for (String dataNode : internalCluster().getDataNodeNames()) { long cumulativeUploadsSucceeded = 0, cumulativeUploadsStarted = 0, cumulativeUploadsFailed = 0; long cumulativeDownloadsSucceeded = 0, cumulativeDownloadsStarted = 0, cumulativeDownloadsFailed = 0; - long total_bytes_lag = 0, max_bytes_lag = 0, max_time_lag = 0; + long totalBytesLag = 0, maxBytesLag = 0, maxTimeLag = 0; + long totalUploadTime = 0, totalDownloadTime = 0; // Fetch upload stats RemoteStoreStatsResponse remoteStoreStatsFirstIndex = client(dataNode).admin() .cluster() @@ -202,9 +209,12 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s .getSegmentStats().directoryFileTransferTrackerStats.transferredBytesStarted; cumulativeDownloadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0] .getSegmentStats().directoryFileTransferTrackerStats.transferredBytesFailed; - total_bytes_lag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag; - max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag); - max_time_lag = Math.max(max_time_lag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs); + totalBytesLag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag; + maxBytesLag = Math.max(maxBytesLag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag); + maxTimeLag = Math.max(maxTimeLag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs); + totalUploadTime += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().totalUploadTimeInMs; + totalDownloadTime += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0] + .getSegmentStats().directoryFileTransferTrackerStats.totalTransferTimeInMs; RemoteStoreStatsResponse remoteStoreStatsSecondIndex = client(dataNode).admin() .cluster() @@ -220,9 +230,12 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s .getSegmentStats().directoryFileTransferTrackerStats.transferredBytesStarted; cumulativeDownloadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0] .getSegmentStats().directoryFileTransferTrackerStats.transferredBytesFailed; - total_bytes_lag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag; - max_bytes_lag = Math.max(max_bytes_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag); - max_time_lag = Math.max(max_time_lag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs); + totalBytesLag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag; + maxBytesLag = Math.max(maxBytesLag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag); + maxTimeLag = Math.max(maxTimeLag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs); + totalUploadTime += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().totalUploadTimeInMs; + totalDownloadTime += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0] + .getSegmentStats().directoryFileTransferTrackerStats.totalTransferTimeInMs; // Fetch nodes stats NodesStatsResponse nodesStatsResponse = client().admin() @@ -237,9 +250,19 @@ private static void assertNodeStatsParityAcrossNodes(String firstIndex, String s assertEquals(cumulativeDownloadsSucceeded, remoteSegmentStats.getDownloadBytesSucceeded()); assertEquals(cumulativeDownloadsStarted, remoteSegmentStats.getDownloadBytesStarted()); assertEquals(cumulativeDownloadsFailed, remoteSegmentStats.getDownloadBytesFailed()); - assertEquals(total_bytes_lag, remoteSegmentStats.getTotalRefreshBytesLag()); - assertEquals(max_bytes_lag, remoteSegmentStats.getMaxRefreshBytesLag()); - assertEquals(max_time_lag, remoteSegmentStats.getMaxRefreshTimeLag()); + assertEquals(totalBytesLag, remoteSegmentStats.getTotalRefreshBytesLag()); + assertEquals(maxBytesLag, remoteSegmentStats.getMaxRefreshBytesLag()); + assertEquals(maxTimeLag, remoteSegmentStats.getMaxRefreshTimeLag()); + // Ensure that total upload time has non-zero value if there has been segments uploaded from the node + if (cumulativeUploadsStarted > 0) { + assertTrue(totalUploadTime > 0); + } + assertEquals(totalUploadTime, remoteSegmentStats.getTotalUploadTime()); + // Ensure that total download time has non-zero value if there has been segments downloaded to the node + if (cumulativeDownloadsStarted > 0) { + assertTrue(totalDownloadTime > 0); + } + assertEquals(totalDownloadTime, remoteSegmentStats.getTotalDownloadTime()); } } } diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java index 0ff61d49c00f8..ace026e28ab7c 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java @@ -67,6 +67,14 @@ public class RemoteSegmentStats implements Writeable, ToXContentFragment { * Used to check for data freshness in the remote store */ private long totalRefreshBytesLag; + /** + * Total time spent in uploading segments to remote store + */ + private long totalUploadTime; + /** + * Total time spent in downloading segments from remote store + */ + private long totalDownloadTime; public RemoteSegmentStats() {} @@ -89,8 +97,10 @@ public RemoteSegmentStats(StreamInput in) throws IOException { This would have to be removed after the new field addition PRs are also backported to 2.x. If possible we would need to ensure that all field addition PRs are backported at once */ - if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + if (in.getVersion().onOrAfter(Version.CURRENT)) { totalRefreshBytesLag = in.readLong(); + totalUploadTime = in.readLong(); + totalDownloadTime = in.readLong(); } } @@ -115,9 +125,12 @@ public RemoteSegmentStats(RemoteSegmentTransferTracker.Stats trackerStats) { // Aggregations would be performed on the add method this.maxRefreshBytesLag = trackerStats.bytesLag; this.totalRefreshBytesLag = trackerStats.bytesLag; + this.totalUploadTime = trackerStats.totalUploadTimeInMs; + this.totalDownloadTime = trackerStats.directoryFileTransferTrackerStats.totalTransferTimeInMs; } // Getter and setters. All are visible for testing + // Setters are only used for testing public long getUploadBytesStarted() { return uploadBytesStarted; } @@ -190,6 +203,22 @@ public void addTotalRefreshBytesLag(long totalRefreshBytesLag) { this.totalRefreshBytesLag += totalRefreshBytesLag; } + public long getTotalUploadTime() { + return totalUploadTime; + } + + public void addTotalUploadTime(long totalUploadTime) { + this.totalUploadTime += totalUploadTime; + } + + public long getTotalDownloadTime() { + return totalDownloadTime; + } + + public void addTotalDownloadTime(long totalDownloadTime) { + this.totalDownloadTime += totalDownloadTime; + } + /** * Adds existing stats. Used for stats roll-ups at index or node level * @@ -206,6 +235,8 @@ public void add(RemoteSegmentStats existingStats) { this.maxRefreshTimeLag = Math.max(this.maxRefreshTimeLag, existingStats.getMaxRefreshTimeLag()); this.maxRefreshBytesLag = Math.max(this.maxRefreshBytesLag, existingStats.getMaxRefreshBytesLag()); this.totalRefreshBytesLag += existingStats.getTotalRefreshBytesLag(); + this.totalUploadTime += existingStats.getTotalUploadTime(); + this.totalDownloadTime += existingStats.getTotalDownloadTime(); } } @@ -229,8 +260,10 @@ public void writeTo(StreamOutput out) throws IOException { This would have to be removed after the new field addition PRs are also backported to 2.x. If possible we would need to ensure that all field addition PRs are backported at once */ - if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + if (out.getVersion().onOrAfter(Version.CURRENT)) { out.writeLong(totalRefreshBytesLag); + out.writeLong(totalUploadTime); + out.writeLong(totalDownloadTime); } } @@ -258,6 +291,7 @@ private void buildUploadStats(XContentBuilder builder) throws IOException { builder.humanReadableField(Fields.MAX_BYTES, Fields.MAX, new ByteSizeValue(maxRefreshBytesLag)); builder.endObject(); builder.humanReadableField(Fields.MAX_REFRESH_TIME_LAG_IN_MILLIS, Fields.MAX_REFRESH_TIME_LAG, new TimeValue(maxRefreshTimeLag)); + builder.humanReadableField(Fields.TOTAL_TIME_SPENT_IN_MILLIS, Fields.TOTAL_TIME_SPENT, new TimeValue(totalUploadTime)); } private void buildDownloadStats(XContentBuilder builder) throws IOException { @@ -266,6 +300,7 @@ private void buildDownloadStats(XContentBuilder builder) throws IOException { builder.humanReadableField(Fields.SUCCEEDED_BYTES, Fields.SUCCEEDED, new ByteSizeValue(downloadBytesSucceeded)); builder.humanReadableField(Fields.FAILED_BYTES, Fields.FAILED, new ByteSizeValue(downloadBytesFailed)); builder.endObject(); + builder.humanReadableField(Fields.TOTAL_TIME_SPENT_IN_MILLIS, Fields.TOTAL_TIME_SPENT, new TimeValue(totalDownloadTime)); } static final class Fields { @@ -287,5 +322,7 @@ static final class Fields { static final String TOTAL_BYTES = "total_bytes"; static final String MAX = "max"; static final String MAX_BYTES = "max_bytes"; + static final String TOTAL_TIME_SPENT = "total_time_spent"; + static final String TOTAL_TIME_SPENT_IN_MILLIS = "total_time_spent_in_millis"; } } diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java index 1531f74597a03..95902fd375145 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentTransferTracker.java @@ -96,17 +96,17 @@ public class RemoteSegmentTransferTracker { /** * Cumulative sum of size in bytes of segment files for which upload has started during remote refresh. */ - private volatile long uploadBytesStarted; + private final AtomicLong uploadBytesStarted = new AtomicLong(); /** * Cumulative sum of size in bytes of segment files for which upload has failed during remote refresh. */ - private volatile long uploadBytesFailed; + private final AtomicLong uploadBytesFailed = new AtomicLong(); /** * Cumulative sum of size in bytes of segment files for which upload has succeeded during remote refresh. */ - private volatile long uploadBytesSucceeded; + private final AtomicLong uploadBytesSucceeded = new AtomicLong(); /** * Cumulative sum of count of remote refreshes that have started. @@ -123,6 +123,11 @@ public class RemoteSegmentTransferTracker { */ private volatile long totalUploadsSucceeded; + /** + * Cumulative sum of time taken in remote refresh (in milliseconds) [Tracked per file] + */ + private AtomicLong totalUploadTimeInMs = new AtomicLong(); + /** * Cumulative sum of rejection counts for this shard. */ @@ -316,31 +321,31 @@ public long getBytesLag() { } public long getUploadBytesStarted() { - return uploadBytesStarted; + return uploadBytesStarted.get(); } public void addUploadBytesStarted(long size) { - uploadBytesStarted += size; + uploadBytesStarted.getAndAdd(size); } public long getUploadBytesFailed() { - return uploadBytesFailed; + return uploadBytesFailed.get(); } public void addUploadBytesFailed(long size) { - uploadBytesFailed += size; + uploadBytesFailed.getAndAdd(size); } public long getUploadBytesSucceeded() { - return uploadBytesSucceeded; + return uploadBytesSucceeded.get(); } public void addUploadBytesSucceeded(long size) { - uploadBytesSucceeded += size; + uploadBytesSucceeded.getAndAdd(size); } public long getInflightUploadBytes() { - return uploadBytesStarted - uploadBytesFailed - uploadBytesSucceeded; + return uploadBytesStarted.get() - uploadBytesFailed.get() - uploadBytesSucceeded.get(); } public long getTotalUploadsStarted() { @@ -508,7 +513,7 @@ boolean isUploadTimeMsAverageReady() { return uploadTimeMsMovingAverageReference.get().getAverage(); } - public void addUploadTimeMs(long timeMs) { + public void addTimeForCompletedUploadSync(long timeMs) { synchronized (uploadTimeMsMutex) { this.uploadTimeMsMovingAverageReference.get().record(timeMs); } @@ -525,6 +530,14 @@ void updateUploadTimeMsMovingAverageWindowSize(int updatedSize) { } } + public void addTotalUploadTimeInMs(long fileUploadTimeInMs) { + this.totalUploadTimeInMs.addAndGet(fileUploadTimeInMs); + } + + public long getTotalUploadTimeInMs() { + return totalUploadTimeInMs.get(); + } + public DirectoryFileTransferTracker getDirectoryFileTransferTracker() { return directoryFileTransferTracker; } @@ -537,9 +550,9 @@ public RemoteSegmentTransferTracker.Stats stats() { timeMsLag, localRefreshSeqNo, remoteRefreshSeqNo, - uploadBytesStarted, - uploadBytesSucceeded, - uploadBytesFailed, + uploadBytesStarted.get(), + uploadBytesSucceeded.get(), + uploadBytesFailed.get(), totalUploadsStarted, totalUploadsSucceeded, totalUploadsFailed, @@ -550,6 +563,7 @@ public RemoteSegmentTransferTracker.Stats stats() { uploadBytesPerSecMovingAverageReference.get().getAverage(), uploadTimeMsMovingAverageReference.get().getAverage(), getBytesLag(), + totalUploadTimeInMs.get(), directoryFileTransferTracker.stats() ); } @@ -578,6 +592,7 @@ public static class Stats implements Writeable { public final long lastSuccessfulRemoteRefreshBytes; public final double uploadBytesMovingAverage; public final double uploadBytesPerSecMovingAverage; + public final long totalUploadTimeInMs; public final double uploadTimeMovingAverage; public final long bytesLag; public final DirectoryFileTransferTracker.Stats directoryFileTransferTrackerStats; @@ -602,6 +617,7 @@ public Stats( double uploadBytesPerSecMovingAverage, double uploadTimeMovingAverage, long bytesLag, + long totalUploadTimeInMs, DirectoryFileTransferTracker.Stats directoryFileTransferTrackerStats ) { this.shardId = shardId; @@ -623,6 +639,7 @@ public Stats( this.uploadBytesPerSecMovingAverage = uploadBytesPerSecMovingAverage; this.uploadTimeMovingAverage = uploadTimeMovingAverage; this.bytesLag = bytesLag; + this.totalUploadTimeInMs = totalUploadTimeInMs; this.directoryFileTransferTrackerStats = directoryFileTransferTrackerStats; } @@ -647,6 +664,7 @@ public Stats(StreamInput in) throws IOException { this.uploadBytesPerSecMovingAverage = in.readDouble(); this.uploadTimeMovingAverage = in.readDouble(); this.bytesLag = in.readLong(); + this.totalUploadTimeInMs = in.readLong(); this.directoryFileTransferTrackerStats = in.readOptionalWriteable(DirectoryFileTransferTracker.Stats::new); } catch (IOException e) { throw e; @@ -674,6 +692,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeDouble(uploadBytesPerSecMovingAverage); out.writeDouble(uploadTimeMovingAverage); out.writeLong(bytesLag); + out.writeLong(totalUploadTimeInMs); out.writeOptionalWriteable(directoryFileTransferTrackerStats); } @@ -702,6 +721,7 @@ public boolean equals(Object obj) { && Double.compare(this.uploadBytesPerSecMovingAverage, other.uploadBytesPerSecMovingAverage) == 0 && Double.compare(this.uploadTimeMovingAverage, other.uploadTimeMovingAverage) == 0 && this.bytesLag == other.bytesLag + && this.totalUploadTimeInMs == other.totalUploadTimeInMs && this.directoryFileTransferTrackerStats.equals(other.directoryFileTransferTrackerStats); } @@ -727,6 +747,7 @@ public int hashCode() { uploadBytesPerSecMovingAverage, uploadTimeMovingAverage, bytesLag, + totalUploadTimeInMs, directoryFileTransferTrackerStats ); } diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 2f0d11fb6a8b3..e8a9ec866ac01 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -89,7 +89,6 @@ public final class RemoteStoreRefreshListener extends CloseableRetryableRefreshL private long primaryTerm; private volatile Iterator backoffDelayIterator; private final SegmentReplicationCheckpointPublisher checkpointPublisher; - private final UploadListener statsListener; public RemoteStoreRefreshListener( IndexShard indexShard, @@ -117,26 +116,6 @@ public RemoteStoreRefreshListener( this.segmentTracker = segmentTracker; resetBackOffDelayIterator(); this.checkpointPublisher = checkpointPublisher; - this.statsListener = new UploadListener() { - @Override - public void beforeUpload(String file) { - // Start tracking the upload bytes started - segmentTracker.addUploadBytesStarted(segmentTracker.getLatestLocalFileNameLengthMap().get(file)); - } - - @Override - public void onSuccess(String file) { - // Track upload success - segmentTracker.addUploadBytesSucceeded(segmentTracker.getLatestLocalFileNameLengthMap().get(file)); - segmentTracker.addToLatestUploadedFiles(file); - } - - @Override - public void onFailure(String file) { - // Track upload failure - segmentTracker.addUploadBytesFailed(segmentTracker.getLatestLocalFileNameLengthMap().get(file)); - } - }; } @Override @@ -373,6 +352,8 @@ private void uploadNewSegments(Collection localSegmentsPostRefresh, Acti GroupedActionListener batchUploadListener = new GroupedActionListener<>(mappedListener, filteredFiles.size()); for (String src : filteredFiles) { + // Initializing listener here to ensure that the stats increment operations are thread-safe + UploadListener statsListener = createUploadListener(); ActionListener aggregatedListener = ActionListener.wrap(resp -> { statsListener.onSuccess(src); batchUploadListener.onResponse(resp); @@ -443,12 +424,43 @@ private void updateFinalStatusInSegmentTracker(boolean uploadStatus, long bytesB segmentTracker.incrementTotalUploadsSucceeded(); segmentTracker.addUploadBytes(bytesUploaded); segmentTracker.addUploadBytesPerSec((bytesUploaded * 1_000L) / Math.max(1, timeTakenInMS)); - segmentTracker.addUploadTimeMs(timeTakenInMS); + segmentTracker.addTimeForCompletedUploadSync(timeTakenInMS); } else { segmentTracker.incrementTotalUploadsFailed(); } } + /** + * Creates an {@link UploadListener} containing the stats population logic which would be triggered before and after segment upload events + */ + private UploadListener createUploadListener() { + return new UploadListener() { + private long uploadStartTime = 0; + + @Override + public void beforeUpload(String file) { + // Start tracking the upload bytes started + segmentTracker.addUploadBytesStarted(segmentTracker.getLatestLocalFileNameLengthMap().get(file)); + uploadStartTime = System.currentTimeMillis(); + } + + @Override + public void onSuccess(String file) { + // Track upload success + segmentTracker.addUploadBytesSucceeded(segmentTracker.getLatestLocalFileNameLengthMap().get(file)); + segmentTracker.addToLatestUploadedFiles(file); + segmentTracker.addTotalUploadTimeInMs(Math.max(1, System.currentTimeMillis() - uploadStartTime)); + } + + @Override + public void onFailure(String file) { + // Track upload failure + segmentTracker.addUploadBytesFailed(segmentTracker.getLatestLocalFileNameLengthMap().get(file)); + segmentTracker.addTotalUploadTimeInMs(Math.max(1, System.currentTimeMillis() - uploadStartTime)); + } + }; + } + @Override protected Logger getLogger() { return logger; diff --git a/server/src/main/java/org/opensearch/index/store/DirectoryFileTransferTracker.java b/server/src/main/java/org/opensearch/index/store/DirectoryFileTransferTracker.java index 5e12517becaf2..7ad48cb56a33b 100644 --- a/server/src/main/java/org/opensearch/index/store/DirectoryFileTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/store/DirectoryFileTransferTracker.java @@ -8,6 +8,7 @@ package org.opensearch.index.store; +import org.apache.lucene.store.Directory; import org.opensearch.common.util.MovingAverage; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -15,128 +16,150 @@ import java.io.IOException; import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; /** - * Tracks the amount of bytes transferred between two {@link org.apache.lucene.store.Directory} instances + * Tracks the amount of bytes transferred between two {@link Directory} instances * * @opensearch.internal */ public class DirectoryFileTransferTracker { /** - * Cumulative size of files (in bytes) attempted to be transferred over from the source {@link org.apache.lucene.store.Directory} + * Cumulative size of files (in bytes) attempted to be transferred over from the source {@link Directory} */ - private volatile long transferredBytesStarted; + private final AtomicLong transferredBytesStarted = new AtomicLong(); /** - * Cumulative size of files (in bytes) successfully transferred over from the source {@link org.apache.lucene.store.Directory} + * Cumulative size of files (in bytes) successfully transferred over from the source {@link Directory} */ - private volatile long transferredBytesFailed; + private final AtomicLong transferredBytesFailed = new AtomicLong(); /** - * Cumulative size of files (in bytes) failed in transfer over from the source {@link org.apache.lucene.store.Directory} + * Cumulative size of files (in bytes) failed in transfer over from the source {@link Directory} */ - private volatile long transferredBytesSucceeded; + private final AtomicLong transferredBytesSucceeded = new AtomicLong(); /** - * Time in milliseconds for the last successful transfer from the source {@link org.apache.lucene.store.Directory} + * Time in milliseconds for the last successful transfer from the source {@link Directory} */ - private volatile long lastTransferTimestampMs; + private final AtomicLong lastTransferTimestampMs = new AtomicLong(); /** - * Provides moving average over the last N total size in bytes of files transferred from the source {@link org.apache.lucene.store.Directory}. + * Cumulative time in milliseconds spent in successful transfers from the source {@link Directory} + */ + private final AtomicLong totalTransferTimeInMs = new AtomicLong(); + + /** + * Provides moving average over the last N total size in bytes of files transferred from the source {@link Directory}. * N is window size */ - private volatile MovingAverage transferredBytesMovingAverageReference; + private final AtomicReference transferredBytesMovingAverageReference; - private volatile long lastSuccessfulTransferInBytes; + private final AtomicLong lastSuccessfulTransferInBytes = new AtomicLong(); /** - * Provides moving average over the last N transfer speed (in bytes/s) of segment files transferred from the source {@link org.apache.lucene.store.Directory}. + * Provides moving average over the last N transfer speed (in bytes/s) of segment files transferred from the source {@link Directory}. * N is window size */ - private volatile MovingAverage transferredBytesPerSecMovingAverageReference; + private final AtomicReference transferredBytesPerSecMovingAverageReference; private final int DIRECTORY_FILES_TRANSFER_DEFAULT_WINDOW_SIZE = 20; + // Getters and Setters, all are visible for testing public long getTransferredBytesStarted() { - return transferredBytesStarted; + return transferredBytesStarted.get(); } public void addTransferredBytesStarted(long size) { - transferredBytesStarted += size; + transferredBytesStarted.getAndAdd(size); } public long getTransferredBytesFailed() { - return transferredBytesFailed; + return transferredBytesFailed.get(); } - public void addTransferredBytesFailed(long size) { - transferredBytesFailed += size; + public void addTransferredBytesFailed(long size, long startTimeInMs) { + transferredBytesFailed.getAndAdd(size); + addTotalTransferTimeInMs(Math.max(1, System.currentTimeMillis() - startTimeInMs)); } public long getTransferredBytesSucceeded() { - return transferredBytesSucceeded; + return transferredBytesSucceeded.get(); } public void addTransferredBytesSucceeded(long size, long startTimeInMs) { - transferredBytesSucceeded += size; - updateLastSuccessfulTransferSize(size); + transferredBytesSucceeded.getAndAdd(size); + updateSuccessfulTransferSize(size); long currentTimeInMs = System.currentTimeMillis(); updateLastTransferTimestampMs(currentTimeInMs); long timeTakenInMS = Math.max(1, currentTimeInMs - startTimeInMs); + addTotalTransferTimeInMs(timeTakenInMS); addTransferredBytesPerSec((size * 1_000L) / timeTakenInMS); } public boolean isTransferredBytesPerSecAverageReady() { - return transferredBytesPerSecMovingAverageReference.isReady(); + return transferredBytesPerSecMovingAverageReference.get().isReady(); } public double getTransferredBytesPerSecAverage() { - return transferredBytesPerSecMovingAverageReference.getAverage(); + return transferredBytesPerSecMovingAverageReference.get().getAverage(); } - // Visible for testing public void addTransferredBytesPerSec(long bytesPerSec) { - this.transferredBytesPerSecMovingAverageReference.record(bytesPerSec); + this.transferredBytesPerSecMovingAverageReference.get().record(bytesPerSec); } public boolean isTransferredBytesAverageReady() { - return transferredBytesMovingAverageReference.isReady(); + return transferredBytesMovingAverageReference.get().isReady(); } public double getTransferredBytesAverage() { - return transferredBytesMovingAverageReference.getAverage(); + return transferredBytesMovingAverageReference.get().getAverage(); + } + + public void updateLastSuccessfulTransferInBytes(long size) { + lastSuccessfulTransferInBytes.set(size); } - // Visible for testing - public void updateLastSuccessfulTransferSize(long size) { - lastSuccessfulTransferInBytes = size; - this.transferredBytesMovingAverageReference.record(size); + public void updateSuccessfulTransferSize(long size) { + updateLastSuccessfulTransferInBytes(size); + this.transferredBytesMovingAverageReference.get().record(size); } public long getLastTransferTimestampMs() { - return lastTransferTimestampMs; + return lastTransferTimestampMs.get(); } - // Visible for testing public void updateLastTransferTimestampMs(long downloadTimestampInMs) { - this.lastTransferTimestampMs = downloadTimestampInMs; + this.lastTransferTimestampMs.set(downloadTimestampInMs); + } + + public void addTotalTransferTimeInMs(long totalTransferTimeInMs) { + this.totalTransferTimeInMs.addAndGet(totalTransferTimeInMs); + } + + public long getTotalTransferTimeInMs() { + return totalTransferTimeInMs.get(); } public DirectoryFileTransferTracker() { - transferredBytesMovingAverageReference = new MovingAverage(DIRECTORY_FILES_TRANSFER_DEFAULT_WINDOW_SIZE); - transferredBytesPerSecMovingAverageReference = new MovingAverage(DIRECTORY_FILES_TRANSFER_DEFAULT_WINDOW_SIZE); + transferredBytesMovingAverageReference = new AtomicReference<>(new MovingAverage(DIRECTORY_FILES_TRANSFER_DEFAULT_WINDOW_SIZE)); + transferredBytesPerSecMovingAverageReference = new AtomicReference<>( + new MovingAverage(DIRECTORY_FILES_TRANSFER_DEFAULT_WINDOW_SIZE) + ); } public DirectoryFileTransferTracker.Stats stats() { return new Stats( - transferredBytesStarted, - transferredBytesFailed, - transferredBytesSucceeded, - lastTransferTimestampMs, - transferredBytesMovingAverageReference.getAverage(), - lastSuccessfulTransferInBytes, - transferredBytesPerSecMovingAverageReference.getAverage() + transferredBytesStarted.get(), + transferredBytesFailed.get(), + transferredBytesSucceeded.get(), + lastTransferTimestampMs.get(), + totalTransferTimeInMs.get(), + transferredBytesMovingAverageReference.get().getAverage(), + lastSuccessfulTransferInBytes.get(), + transferredBytesPerSecMovingAverageReference.get().getAverage() ); } @@ -150,6 +173,7 @@ public static class Stats implements Writeable { public final long transferredBytesFailed; public final long transferredBytesSucceeded; public final long lastTransferTimestampMs; + public final long totalTransferTimeInMs; public final double transferredBytesMovingAverage; public final long lastSuccessfulTransferInBytes; public final double transferredBytesPerSecMovingAverage; @@ -159,6 +183,7 @@ public Stats( long transferredBytesFailed, long downloadBytesSucceeded, long lastTransferTimestampMs, + long totalTransferTimeInMs, double transferredBytesMovingAverage, long lastSuccessfulTransferInBytes, double transferredBytesPerSecMovingAverage @@ -167,6 +192,7 @@ public Stats( this.transferredBytesFailed = transferredBytesFailed; this.transferredBytesSucceeded = downloadBytesSucceeded; this.lastTransferTimestampMs = lastTransferTimestampMs; + this.totalTransferTimeInMs = totalTransferTimeInMs; this.transferredBytesMovingAverage = transferredBytesMovingAverage; this.lastSuccessfulTransferInBytes = lastSuccessfulTransferInBytes; this.transferredBytesPerSecMovingAverage = transferredBytesPerSecMovingAverage; @@ -177,6 +203,7 @@ public Stats(StreamInput in) throws IOException { this.transferredBytesFailed = in.readLong(); this.transferredBytesSucceeded = in.readLong(); this.lastTransferTimestampMs = in.readLong(); + this.totalTransferTimeInMs = in.readLong(); this.transferredBytesMovingAverage = in.readDouble(); this.lastSuccessfulTransferInBytes = in.readLong(); this.transferredBytesPerSecMovingAverage = in.readDouble(); @@ -188,6 +215,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(transferredBytesFailed); out.writeLong(transferredBytesSucceeded); out.writeLong(lastTransferTimestampMs); + out.writeLong(totalTransferTimeInMs); out.writeDouble(transferredBytesMovingAverage); out.writeLong(lastSuccessfulTransferInBytes); out.writeDouble(transferredBytesPerSecMovingAverage); @@ -203,6 +231,7 @@ public boolean equals(Object obj) { && transferredBytesFailed == stats.transferredBytesFailed && transferredBytesSucceeded == stats.transferredBytesSucceeded && lastTransferTimestampMs == stats.lastTransferTimestampMs + && totalTransferTimeInMs == stats.totalTransferTimeInMs && Double.compare(stats.transferredBytesMovingAverage, transferredBytesMovingAverage) == 0 && lastSuccessfulTransferInBytes == stats.lastSuccessfulTransferInBytes && Double.compare(stats.transferredBytesPerSecMovingAverage, transferredBytesPerSecMovingAverage) == 0; @@ -215,6 +244,7 @@ public int hashCode() { transferredBytesFailed, transferredBytesSucceeded, lastTransferTimestampMs, + totalTransferTimeInMs, transferredBytesMovingAverage, lastSuccessfulTransferInBytes, transferredBytesPerSecMovingAverage diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 4f51994a6ac2f..b3ea2cdd02e21 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -950,14 +950,14 @@ public void copyFrom(Directory from, String src, String dest, IOContext context) long fileSize = from.fileLength(src); beforeDownload(fileSize); boolean success = false; + long startTime = System.currentTimeMillis(); try { - long startTime = System.currentTimeMillis(); super.copyFrom(from, src, dest, context); success = true; afterDownload(fileSize, startTime); } finally { if (!success) { - downloadFailed(fileSize); + downloadFailed(fileSize, startTime); } } } @@ -983,8 +983,8 @@ private void afterDownload(long fileSize, long startTimeInMs) { /** * Updates the amount of bytes failed in download */ - private void downloadFailed(long fileSize) { - directoryFileTransferTracker.addTransferredBytesFailed(fileSize); + private void downloadFailed(long fileSize, long startTimeInMs) { + directoryFileTransferTracker.addTransferredBytesFailed(fileSize, startTimeInMs); } } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index fbe70748adf2d..8a450b99904cf 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -460,6 +460,8 @@ public void testSerialization() throws IOException { assertEquals(remoteSegmentStats.getMaxRefreshTimeLag(), deserializedRemoteSegmentStats.getMaxRefreshTimeLag()); assertEquals(remoteSegmentStats.getMaxRefreshBytesLag(), deserializedRemoteSegmentStats.getMaxRefreshBytesLag()); assertEquals(remoteSegmentStats.getTotalRefreshBytesLag(), deserializedRemoteSegmentStats.getTotalRefreshBytesLag()); + assertEquals(remoteSegmentStats.getTotalUploadTime(), deserializedRemoteSegmentStats.getTotalUploadTime()); + assertEquals(remoteSegmentStats.getTotalDownloadTime(), deserializedRemoteSegmentStats.getTotalDownloadTime()); } } } @@ -793,6 +795,8 @@ private static NodeIndicesStats getNodeIndicesStats(boolean remoteStoreStats) { remoteSegmentStats.addTotalRefreshBytesLag(5L); remoteSegmentStats.addMaxRefreshBytesLag(2L); remoteSegmentStats.setMaxRefreshTimeLag(2L); + remoteSegmentStats.addTotalUploadTime(20L); + remoteSegmentStats.addTotalDownloadTime(20L); } return indicesStats; } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTestHelper.java b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTestHelper.java index 7430ccaed725b..e2a0209503976 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTestHelper.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStatsTestHelper.java @@ -46,6 +46,7 @@ static RemoteSegmentTransferTracker.Stats createStatsForNewPrimary(ShardId shard 0, 0, 0, + 10, createZeroDirectoryFileTransferStats() ); } @@ -71,6 +72,7 @@ static RemoteSegmentTransferTracker.Stats createStatsForNewReplica(ShardId shard 0, 0, 0, + 0, createSampleDirectoryFileTransferStats() ); } @@ -96,16 +98,17 @@ static RemoteSegmentTransferTracker.Stats createStatsForRemoteStoreRestoredPrima 0, 0, 100, + 10, createSampleDirectoryFileTransferStats() ); } static DirectoryFileTransferTracker.Stats createSampleDirectoryFileTransferStats() { - return new DirectoryFileTransferTracker.Stats(10, 0, 10, 12345, 5, 5, 5); + return new DirectoryFileTransferTracker.Stats(10, 0, 10, 12345, 5, 5, 5, 10); } static DirectoryFileTransferTracker.Stats createZeroDirectoryFileTransferStats() { - return new DirectoryFileTransferTracker.Stats(0, 0, 0, 0, 0, 0, 0); + return new DirectoryFileTransferTracker.Stats(0, 0, 0, 0, 0, 0, 0, 0); } static ShardRouting createShardRouting(ShardId shardId, boolean isPrimary) { diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteSegmentTransferTrackerTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteSegmentTransferTrackerTests.java index 94934d5b4dca6..10fe3f95ab47c 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteSegmentTransferTrackerTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteSegmentTransferTrackerTests.java @@ -242,10 +242,10 @@ public void testAddDownloadBytesFailed() { pressureSettings.getUploadTimeMovingAverageWindowSize() ); long bytesToAdd = randomLongBetween(1000, 1000000); - pressureTracker.getDirectoryFileTransferTracker().addTransferredBytesFailed(bytesToAdd); + pressureTracker.getDirectoryFileTransferTracker().addTransferredBytesFailed(bytesToAdd, System.currentTimeMillis()); assertEquals(bytesToAdd, pressureTracker.getDirectoryFileTransferTracker().getTransferredBytesFailed()); long moreBytesToAdd = randomLongBetween(1000, 10000); - pressureTracker.getDirectoryFileTransferTracker().addTransferredBytesFailed(moreBytesToAdd); + pressureTracker.getDirectoryFileTransferTracker().addTransferredBytesFailed(moreBytesToAdd, System.currentTimeMillis()); assertEquals(bytesToAdd + moreBytesToAdd, pressureTracker.getDirectoryFileTransferTracker().getTransferredBytesFailed()); } @@ -473,18 +473,18 @@ public void testIsUploadTimeMsAverageReady() { long sum = 0; for (int i = 1; i < uploadTimeMovingAverageWindowSize; i++) { - pressureTracker.addUploadTimeMs(i); + pressureTracker.addTimeForCompletedUploadSync(i); sum += i; assertFalse(pressureTracker.isUploadTimeMsAverageReady()); assertEquals((double) sum / i, pressureTracker.getUploadTimeMsAverage(), 0.0d); } - pressureTracker.addUploadTimeMs(uploadTimeMovingAverageWindowSize); + pressureTracker.addTimeForCompletedUploadSync(uploadTimeMovingAverageWindowSize); sum += uploadTimeMovingAverageWindowSize; assertTrue(pressureTracker.isUploadTimeMsAverageReady()); assertEquals((double) sum / uploadTimeMovingAverageWindowSize, pressureTracker.getUploadTimeMsAverage(), 0.0d); - pressureTracker.addUploadTimeMs(100); + pressureTracker.addTimeForCompletedUploadSync(100); sum = sum + 100 - 1; assertEquals((double) sum / uploadTimeMovingAverageWindowSize, pressureTracker.getUploadTimeMsAverage(), 0.0d); } @@ -501,18 +501,18 @@ public void testIsDownloadBytesAverageReady() { long sum = 0; for (int i = 1; i < 20; i++) { - pressureTracker.getDirectoryFileTransferTracker().updateLastSuccessfulTransferSize(i); + pressureTracker.getDirectoryFileTransferTracker().updateSuccessfulTransferSize(i); sum += i; assertFalse(pressureTracker.getDirectoryFileTransferTracker().isTransferredBytesAverageReady()); assertEquals((double) sum / i, pressureTracker.getDirectoryFileTransferTracker().getTransferredBytesAverage(), 0.0d); } - pressureTracker.getDirectoryFileTransferTracker().updateLastSuccessfulTransferSize(20); + pressureTracker.getDirectoryFileTransferTracker().updateSuccessfulTransferSize(20); sum += 20; assertTrue(pressureTracker.getDirectoryFileTransferTracker().isTransferredBytesAverageReady()); assertEquals((double) sum / 20, pressureTracker.getDirectoryFileTransferTracker().getTransferredBytesAverage(), 0.0d); - pressureTracker.getDirectoryFileTransferTracker().updateLastSuccessfulTransferSize(100); + pressureTracker.getDirectoryFileTransferTracker().updateSuccessfulTransferSize(100); sum = sum + 100 - 1; assertEquals((double) sum / 20, pressureTracker.getDirectoryFileTransferTracker().getTransferredBytesAverage(), 0.0d); } @@ -545,6 +545,38 @@ public void testIsDownloadBytesPerSecAverageReady() { assertEquals((double) sum / 20, pressureTracker.getDirectoryFileTransferTracker().getTransferredBytesPerSecAverage(), 0.0d); } + public void testAddTotalUploadTimeInMs() { + pressureTracker = new RemoteSegmentTransferTracker( + shardId, + directoryFileTransferTracker, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize() + ); + long timeToAdd = randomLongBetween(100, 200); + pressureTracker.addTotalUploadTimeInMs(timeToAdd); + assertEquals(timeToAdd, pressureTracker.getTotalUploadTimeInMs()); + long moreTimeToAdd = randomLongBetween(100, 200); + pressureTracker.addTotalUploadTimeInMs(moreTimeToAdd); + assertEquals(timeToAdd + moreTimeToAdd, pressureTracker.getTotalUploadTimeInMs()); + } + + public void testAddTotalTransferTimeMs() { + pressureTracker = new RemoteSegmentTransferTracker( + shardId, + directoryFileTransferTracker, + pressureSettings.getUploadBytesMovingAverageWindowSize(), + pressureSettings.getUploadBytesPerSecMovingAverageWindowSize(), + pressureSettings.getUploadTimeMovingAverageWindowSize() + ); + long timeToAdd = randomLongBetween(100, 200); + pressureTracker.getDirectoryFileTransferTracker().addTotalTransferTimeInMs(timeToAdd); + assertEquals(timeToAdd, pressureTracker.getDirectoryFileTransferTracker().getTotalTransferTimeInMs()); + long moreTimeToAdd = randomLongBetween(100, 200); + pressureTracker.getDirectoryFileTransferTracker().addTotalTransferTimeInMs(moreTimeToAdd); + assertEquals(timeToAdd + moreTimeToAdd, pressureTracker.getDirectoryFileTransferTracker().getTotalTransferTimeInMs()); + } + /** * Tests whether RemoteSegmentTransferTracker.Stats object generated correctly from RemoteSegmentTransferTracker. * */ @@ -625,7 +657,7 @@ private RemoteSegmentTransferTracker constructTracker() { pressureSettings.getUploadTimeMovingAverageWindowSize() ); segmentPressureTracker.incrementTotalUploadsFailed(); - segmentPressureTracker.addUploadTimeMs(System.nanoTime() / 1_000_000L + randomIntBetween(10, 100)); + segmentPressureTracker.addTimeForCompletedUploadSync(System.nanoTime() / 1_000_000L + randomIntBetween(10, 100)); segmentPressureTracker.addUploadBytes(99); segmentPressureTracker.updateRemoteRefreshTimeMs(System.nanoTime() / 1_000_000L + randomIntBetween(10, 100)); segmentPressureTracker.incrementRejectionCount(); diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureServiceTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureServiceTests.java index d79e5ae99b696..e164269d96a3d 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureServiceTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStorePressureServiceTests.java @@ -108,7 +108,7 @@ public void testValidateSegmentUploadLag() { pressureTracker.updateRemoteRefreshSeqNo(3); AtomicLong sum = new AtomicLong(); IntStream.range(0, 20).forEach(i -> { - pressureTracker.addUploadTimeMs(i); + pressureTracker.addTimeForCompletedUploadSync(i); sum.addAndGet(i); }); double avg = (double) sum.get() / 20;