Skip to content

Commit

Permalink
Added stats debug logs in remote store stats publisher and added metr…
Browse files Browse the repository at this point in the history
…ic collector in async flow

Signed-off-by: vikasvb90 <vikasvb@amazon.com>
  • Loading branch information
vikasvb90 committed Nov 26, 2023
1 parent bd280b6 commit 6ed696f
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ final class S3ClientSettings {
static final Setting.AffixSetting<TimeValue> REQUEST_TIMEOUT_SETTING = Setting.affixKeySetting(
PREFIX,
"request_timeout",
key -> Setting.timeSetting(key, TimeValue.timeValueMinutes(10), Property.NodeScope)
key -> Setting.timeSetting(key, TimeValue.timeValueMinutes(5), Property.NodeScope)
);

/** The connection timeout for connecting to s3. */
Expand All @@ -198,14 +198,14 @@ final class S3ClientSettings {
static final Setting.AffixSetting<Integer> MAX_CONNECTIONS_SETTING = Setting.affixKeySetting(
PREFIX,
"max_connections",
key -> Setting.intSetting(key, 100, Property.NodeScope)
key -> Setting.intSetting(key, 500, Property.NodeScope)
);

/** Connection acquisition timeout for new connections to S3. */
static final Setting.AffixSetting<TimeValue> CONNECTION_ACQUISITION_TIMEOUT = Setting.affixKeySetting(
PREFIX,
"connection_acquisition_timeout",
key -> Setting.timeSetting(key, TimeValue.timeValueMinutes(20), Property.NodeScope)
key -> Setting.timeSetting(key, TimeValue.timeValueMinutes(15), Property.NodeScope)
);

/** The maximum pending connections to S3. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,25 +99,32 @@ public S3RepositoryPlugin(final Settings settings, final Path configPath) {
@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
List<ExecutorBuilder<?>> executorBuilders = new ArrayList<>();
int halfProcMaxAt5 = halfAllocatedProcessorsMaxFive(allocatedProcessors(settings));
int halfProc = halfNumberOfProcessors(allocatedProcessors(settings));

Check warning on line 102 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java#L102

Added line #L102 was not covered by tests
executorBuilders.add(
new FixedExecutorBuilder(settings, URGENT_FUTURE_COMPLETION, urgentPoolCount(settings), 10_000, URGENT_FUTURE_COMPLETION)
);
executorBuilders.add(new ScalingExecutorBuilder(URGENT_STREAM_READER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
executorBuilders.add(new ScalingExecutorBuilder(URGENT_STREAM_READER, 1, halfProc, TimeValue.timeValueMinutes(5)));

Check warning on line 106 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java#L106

Added line #L106 was not covered by tests
executorBuilders.add(
new FixedExecutorBuilder(settings, PRIORITY_FUTURE_COMPLETION, priorityPoolCount(settings), 10_000, PRIORITY_FUTURE_COMPLETION)
new ScalingExecutorBuilder(PRIORITY_FUTURE_COMPLETION, 1, allocatedProcessors(settings), TimeValue.timeValueMinutes(5))

Check warning on line 108 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java#L108

Added line #L108 was not covered by tests
);
executorBuilders.add(new ScalingExecutorBuilder(PRIORITY_STREAM_READER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
executorBuilders.add(new ScalingExecutorBuilder(PRIORITY_STREAM_READER, 1, halfProc, TimeValue.timeValueMinutes(5)));

Check warning on line 110 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java#L110

Added line #L110 was not covered by tests

executorBuilders.add(new FixedExecutorBuilder(settings, FUTURE_COMPLETION, normalPoolCount(settings), 10_000, FUTURE_COMPLETION));
executorBuilders.add(
new ScalingExecutorBuilder(STREAM_READER, 1, 3 * allocatedProcessors(settings), TimeValue.timeValueMinutes(5))
new ScalingExecutorBuilder(FUTURE_COMPLETION, 1, allocatedProcessors(settings), TimeValue.timeValueMinutes(5))

Check warning on line 113 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java#L112-L113

Added lines #L112 - L113 were not covered by tests
);
executorBuilders.add(

Check warning on line 115 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java#L115

Added line #L115 was not covered by tests
new ScalingExecutorBuilder(
STREAM_READER,
allocatedProcessors(settings),
4 * allocatedProcessors(settings),
TimeValue.timeValueMinutes(5)

Check warning on line 120 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java#L118-L120

Added lines #L118 - L120 were not covered by tests
)
);
return executorBuilders;
}

static int halfAllocatedProcessorsMaxFive(final int allocatedProcessors) {
return boundedBy((allocatedProcessors + 1) / 2, 1, 5);
static int halfNumberOfProcessors(int numberOfProcessors) {
return (numberOfProcessors + 1) / 2;

Check warning on line 127 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java#L127

Added line #L127 was not covered by tests
}

S3RepositoryPlugin(final Settings settings, final Path configPath, final S3Service service, final S3AsyncService s3AsyncService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import software.amazon.awssdk.metrics.MetricPublisher;
import software.amazon.awssdk.metrics.MetricRecord;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.blobstore.BlobStore;

import java.time.Duration;
Expand All @@ -21,6 +23,7 @@

public class StatsMetricPublisher {

private static final Logger LOGGER = LogManager.getLogger(StatsMetricPublisher.class);
private final Stats stats = new Stats();

private final Map<BlobStore.Metric, Stats> extendedStats = new HashMap<>() {
Expand All @@ -35,6 +38,7 @@ public class StatsMetricPublisher {
public MetricPublisher listObjectsMetricPublisher = new MetricPublisher() {
@Override
public void publish(MetricCollection metricCollection) {
LOGGER.debug(() -> "List objects request metrics: " + metricCollection);

Check warning on line 41 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/StatsMetricPublisher.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/StatsMetricPublisher.java#L41

Added line #L41 was not covered by tests
for (MetricRecord<?> metricRecord : metricCollection) {
switch (metricRecord.metric().name()) {
case "ApiCallDuration":
Expand Down Expand Up @@ -64,6 +68,7 @@ public void close() {}
public MetricPublisher deleteObjectsMetricPublisher = new MetricPublisher() {
@Override
public void publish(MetricCollection metricCollection) {
LOGGER.debug(() -> "Delete objects request metrics: " + metricCollection);

Check warning on line 71 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/StatsMetricPublisher.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/StatsMetricPublisher.java#L71

Added line #L71 was not covered by tests
for (MetricRecord<?> metricRecord : metricCollection) {
switch (metricRecord.metric().name()) {
case "ApiCallDuration":
Expand Down Expand Up @@ -93,6 +98,7 @@ public void close() {}
public MetricPublisher getObjectMetricPublisher = new MetricPublisher() {
@Override
public void publish(MetricCollection metricCollection) {
LOGGER.debug(() -> "Get object request metrics: " + metricCollection);
for (MetricRecord<?> metricRecord : metricCollection) {
switch (metricRecord.metric().name()) {
case "ApiCallDuration":
Expand Down Expand Up @@ -122,6 +128,7 @@ public void close() {}
public MetricPublisher putObjectMetricPublisher = new MetricPublisher() {
@Override
public void publish(MetricCollection metricCollection) {
LOGGER.debug(() -> "Put object request metrics: " + metricCollection);
for (MetricRecord<?> metricRecord : metricCollection) {
switch (metricRecord.metric().name()) {
case "ApiCallDuration":
Expand Down Expand Up @@ -151,6 +158,7 @@ public void close() {}
public MetricPublisher multipartUploadMetricCollector = new MetricPublisher() {
@Override
public void publish(MetricCollection metricCollection) {
LOGGER.debug(() -> "Multi-part request metrics: " + metricCollection);
for (MetricRecord<?> metricRecord : metricCollection) {
switch (metricRecord.metric().name()) {
case "ApiCallDuration":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.repositories.s3.SocketAccess;
import org.opensearch.repositories.s3.StatsMetricPublisher;
import org.opensearch.repositories.s3.io.CheckedContainer;

import java.io.BufferedInputStream;
Expand Down Expand Up @@ -55,6 +56,7 @@ public class AsyncPartsHandler {
* @param completedParts Reference of completed parts
* @param inputStreamContainers Checksum containers
* @return list of completable futures
* @param statsMetricPublisher sdk metric publisher
* @throws IOException thrown in case of an IO error
*/
public static List<CompletableFuture<CompletedPart>> uploadParts(
Expand All @@ -66,7 +68,8 @@ public static List<CompletableFuture<CompletedPart>> uploadParts(
StreamContext streamContext,
String uploadId,
AtomicReferenceArray<CompletedPart> completedParts,
AtomicReferenceArray<CheckedContainer> inputStreamContainers
AtomicReferenceArray<CheckedContainer> inputStreamContainers,
StatsMetricPublisher statsMetricPublisher
) throws IOException {
List<CompletableFuture<CompletedPart>> futures = new ArrayList<>();
for (int partIdx = 0; partIdx < streamContext.getNumberOfParts(); partIdx++) {
Expand All @@ -77,6 +80,7 @@ public static List<CompletableFuture<CompletedPart>> uploadParts(
.partNumber(partIdx + 1)
.key(uploadRequest.getKey())
.uploadId(uploadId)
.overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector))
.contentLength(inputStreamContainer.getContentLength());
if (uploadRequest.doRemoteDataIntegrityCheck()) {
uploadPartRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,14 @@ private void uploadInParts(
handleException(returnFuture, () -> "Failed to initiate multipart upload", throwable);
} else {
log.debug(() -> "Initiated new multipart upload, uploadId: " + createMultipartUploadResponse.uploadId());
doUploadInParts(s3AsyncClient, uploadRequest, streamContext, returnFuture, createMultipartUploadResponse.uploadId());
doUploadInParts(
s3AsyncClient,
uploadRequest,
streamContext,
returnFuture,
createMultipartUploadResponse.uploadId(),
statsMetricPublisher
);
}
});
}
Expand All @@ -156,7 +163,8 @@ private void doUploadInParts(
UploadRequest uploadRequest,
StreamContext streamContext,
CompletableFuture<Void> returnFuture,
String uploadId
String uploadId,
StatsMetricPublisher statsMetricPublisher
) {

// The list of completed parts must be sorted
Expand All @@ -174,7 +182,8 @@ private void doUploadInParts(
streamContext,
uploadId,
completedParts,
inputStreamContainers
inputStreamContainers,
statsMetricPublisher
);
} catch (Exception ex) {
try {
Expand All @@ -198,7 +207,7 @@ private void doUploadInParts(
}
return null;
})
.thenCompose(ignore -> completeMultipartUpload(s3AsyncClient, uploadRequest, uploadId, completedParts))
.thenCompose(ignore -> completeMultipartUpload(s3AsyncClient, uploadRequest, uploadId, completedParts, statsMetricPublisher))
.handle(handleExceptionOrResponse(s3AsyncClient, uploadRequest, returnFuture, uploadId))
.exceptionally(throwable -> {
handleException(returnFuture, () -> "Unexpected exception occurred", throwable);
Expand Down Expand Up @@ -245,7 +254,8 @@ private CompletableFuture<CompleteMultipartUploadResponse> completeMultipartUplo
S3AsyncClient s3AsyncClient,
UploadRequest uploadRequest,
String uploadId,
AtomicReferenceArray<CompletedPart> completedParts
AtomicReferenceArray<CompletedPart> completedParts,
StatsMetricPublisher statsMetricPublisher
) {

log.debug(() -> new ParameterizedMessage("Sending completeMultipartUploadRequest, uploadId: {}", uploadId));
Expand All @@ -254,6 +264,7 @@ private CompletableFuture<CompleteMultipartUploadResponse> completeMultipartUplo
.bucket(uploadRequest.getBucket())
.key(uploadRequest.getKey())
.uploadId(uploadId)
.overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector))
.multipartUpload(CompletedMultipartUpload.builder().parts(parts).build())
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ public void testThereIsADefaultClientByDefault() {
assertThat(defaultSettings.protocol, is(Protocol.HTTPS));
assertThat(defaultSettings.proxySettings, is(ProxySettings.NO_PROXY_SETTINGS));
assertThat(defaultSettings.readTimeoutMillis, is(50 * 1000));
assertThat(defaultSettings.requestTimeoutMillis, is(120 * 1000));
assertThat(defaultSettings.requestTimeoutMillis, is(5 * 60 * 1000));
assertThat(defaultSettings.connectionTimeoutMillis, is(10 * 1000));
assertThat(defaultSettings.connectionTTLMillis, is(5 * 1000));
assertThat(defaultSettings.maxConnections, is(100));
assertThat(defaultSettings.maxConnections, is(500));
assertThat(defaultSettings.maxRetries, is(3));
assertThat(defaultSettings.throttleRetries, is(true));
}
Expand Down

0 comments on commit 6ed696f

Please sign in to comment.