Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Created new urgent priority threadpool for remote cluster state uploads #10685

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ protected S3Repository createRepository(
ClusterService clusterService,
RecoverySettings recoverySettings
) {
return new S3Repository(metadata, registry, service, clusterService, recoverySettings, null, null, null, null, false) {
return new S3Repository(metadata, registry, service, clusterService, recoverySettings, null, null, null, null, null, false) {

@Override
public BlobStore blobStore() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class AmazonAsyncS3Reference extends RefCountedReleasable<AmazonAsyncS3Wi
super("AWS_S3_CLIENT", client, () -> {
client.client().close();
client.priorityClient().close();
client.urgentClient().close();
AwsCredentialsProvider credentials = client.credentials();
if (credentials instanceof Closeable) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,19 @@
final class AmazonAsyncS3WithCredentials {
private final S3AsyncClient client;
private final S3AsyncClient priorityClient;
private final S3AsyncClient urgentClient;
private final AwsCredentialsProvider credentials;

private AmazonAsyncS3WithCredentials(
final S3AsyncClient client,
final S3AsyncClient priorityClient,
final S3AsyncClient urgentClient,
@Nullable final AwsCredentialsProvider credentials
) {
this.client = client;
this.credentials = credentials;
this.priorityClient = priorityClient;
this.urgentClient = urgentClient;
}

S3AsyncClient client() {
Expand All @@ -39,15 +42,20 @@ S3AsyncClient priorityClient() {
return priorityClient;
}

S3AsyncClient urgentClient() {
return urgentClient;
}

AwsCredentialsProvider credentials() {
return credentials;
}

static AmazonAsyncS3WithCredentials create(
final S3AsyncClient client,
final S3AsyncClient priorityClient,
final S3AsyncClient urgentClient,
@Nullable final AwsCredentialsProvider credentials
) {
return new AmazonAsyncS3WithCredentials(client, priorityClient, credentials);
return new AmazonAsyncS3WithCredentials(client, priorityClient, urgentClient, credentials);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public synchronized void refreshAndClearCache(Map<String, S3ClientSettings> clie
*/
public AmazonAsyncS3Reference client(
RepositoryMetadata repositoryMetadata,
AsyncExecutorContainer urgentExecutorBuilder,
AsyncExecutorContainer priorityExecutorBuilder,
AsyncExecutorContainer normalExecutorBuilder
) {
Expand All @@ -119,7 +120,7 @@ public AmazonAsyncS3Reference client(
return existing;
}
final AmazonAsyncS3Reference clientReference = new AmazonAsyncS3Reference(
buildClient(clientSettings, priorityExecutorBuilder, normalExecutorBuilder)
buildClient(clientSettings, urgentExecutorBuilder, priorityExecutorBuilder, normalExecutorBuilder)
);
clientReference.incRef();
clientsCache = MapBuilder.newMapBuilder(clientsCache).put(clientSettings, clientReference).immutableMap();
Expand Down Expand Up @@ -165,6 +166,7 @@ S3ClientSettings settings(RepositoryMetadata repositoryMetadata) {
// proxy for testing
synchronized AmazonAsyncS3WithCredentials buildClient(
final S3ClientSettings clientSettings,
AsyncExecutorContainer urgentExecutorBuilder,
AsyncExecutorContainer priorityExecutorBuilder,
AsyncExecutorContainer normalExecutorBuilder
) {
Expand Down Expand Up @@ -195,6 +197,17 @@ synchronized AmazonAsyncS3WithCredentials buildClient(
builder.forcePathStyle(true);
}

builder.httpClient(buildHttpClient(clientSettings, urgentExecutorBuilder.getAsyncTransferEventLoopGroup()));
builder.asyncConfiguration(
ClientAsyncConfiguration.builder()
.advancedOption(
SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
urgentExecutorBuilder.getFutureCompletionExecutor()
)
.build()
);
final S3AsyncClient urgentClient = SocketAccess.doPrivileged(builder::build);

builder.httpClient(buildHttpClient(clientSettings, priorityExecutorBuilder.getAsyncTransferEventLoopGroup()));
builder.asyncConfiguration(
ClientAsyncConfiguration.builder()
Expand All @@ -217,7 +230,7 @@ synchronized AmazonAsyncS3WithCredentials buildClient(
);
final S3AsyncClient client = SocketAccess.doPrivileged(builder::build);

return AmazonAsyncS3WithCredentials.create(client, priorityClient, credentials);
return AmazonAsyncS3WithCredentials.create(client, priorityClient, urgentClient, credentials);
}

static ClientOverrideConfiguration buildOverrideConfiguration(final S3ClientSettings clientSettings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,14 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
StreamContext streamContext = SocketAccess.doPrivileged(() -> writeContext.getStreamProvider(partSize));
try (AmazonAsyncS3Reference amazonS3Reference = SocketAccess.doPrivileged(blobStore::asyncClientReference)) {

S3AsyncClient s3AsyncClient = writeContext.getWritePriority() == WritePriority.HIGH
? amazonS3Reference.get().priorityClient()
: amazonS3Reference.get().client();
S3AsyncClient s3AsyncClient;
if (writeContext.getWritePriority() == WritePriority.URGENT) {
s3AsyncClient = amazonS3Reference.get().urgentClient();
} else if (writeContext.getWritePriority() == WritePriority.HIGH) {
s3AsyncClient = amazonS3Reference.get().priorityClient();
} else {
s3AsyncClient = amazonS3Reference.get().client();
}
CompletableFuture<Void> completableFuture = blobStore.getAsyncTransferManager()
.uploadObject(s3AsyncClient, uploadRequest, streamContext, blobStore.getStatsMetricPublisher());
completableFuture.whenComplete((response, throwable) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class S3BlobStore implements BlobStore {
private final StatsMetricPublisher statsMetricPublisher = new StatsMetricPublisher();

private final AsyncTransferManager asyncTransferManager;
private final AsyncExecutorContainer urgentExecutorBuilder;
private final AsyncExecutorContainer priorityExecutorBuilder;
private final AsyncExecutorContainer normalExecutorBuilder;
private final boolean multipartUploadEnabled;
Expand All @@ -100,6 +101,7 @@ class S3BlobStore implements BlobStore {
int bulkDeletesSize,
RepositoryMetadata repositoryMetadata,
AsyncTransferManager asyncTransferManager,
AsyncExecutorContainer urgentExecutorBuilder,
AsyncExecutorContainer priorityExecutorBuilder,
AsyncExecutorContainer normalExecutorBuilder
) {
Expand All @@ -116,6 +118,7 @@ class S3BlobStore implements BlobStore {
this.asyncTransferManager = asyncTransferManager;
this.normalExecutorBuilder = normalExecutorBuilder;
this.priorityExecutorBuilder = priorityExecutorBuilder;
this.urgentExecutorBuilder = urgentExecutorBuilder;
}

@Override
Expand All @@ -139,7 +142,7 @@ public AmazonS3Reference clientReference() {
}

public AmazonAsyncS3Reference asyncClientReference() {
return s3AsyncService.client(repositoryMetadata, priorityExecutorBuilder, normalExecutorBuilder);
return s3AsyncService.client(repositoryMetadata, urgentExecutorBuilder, priorityExecutorBuilder, normalExecutorBuilder);
}

int getMaxRetries() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ class S3Repository extends MeteredBlobStoreRepository {
private final AsyncTransferManager asyncUploadUtils;
private final S3AsyncService s3AsyncService;
private final boolean multipartUploadEnabled;
private final AsyncExecutorContainer urgentExecutorBuilder;
private final AsyncExecutorContainer priorityExecutorBuilder;
private final AsyncExecutorContainer normalExecutorBuilder;
private final Path pluginConfigPath;
Expand All @@ -248,6 +249,7 @@ class S3Repository extends MeteredBlobStoreRepository {
final ClusterService clusterService,
final RecoverySettings recoverySettings,
final AsyncTransferManager asyncUploadUtils,
final AsyncExecutorContainer urgentExecutorBuilder,
final AsyncExecutorContainer priorityExecutorBuilder,
final AsyncExecutorContainer normalExecutorBuilder,
final S3AsyncService s3AsyncService,
Expand All @@ -260,6 +262,7 @@ class S3Repository extends MeteredBlobStoreRepository {
clusterService,
recoverySettings,
asyncUploadUtils,
urgentExecutorBuilder,
priorityExecutorBuilder,
normalExecutorBuilder,
s3AsyncService,
Expand All @@ -278,6 +281,7 @@ class S3Repository extends MeteredBlobStoreRepository {
final ClusterService clusterService,
final RecoverySettings recoverySettings,
final AsyncTransferManager asyncUploadUtils,
final AsyncExecutorContainer urgentExecutorBuilder,
final AsyncExecutorContainer priorityExecutorBuilder,
final AsyncExecutorContainer normalExecutorBuilder,
final S3AsyncService s3AsyncService,
Expand All @@ -290,6 +294,7 @@ class S3Repository extends MeteredBlobStoreRepository {
this.multipartUploadEnabled = multipartUploadEnabled;
this.pluginConfigPath = pluginConfigPath;
this.asyncUploadUtils = asyncUploadUtils;
this.urgentExecutorBuilder = urgentExecutorBuilder;
this.priorityExecutorBuilder = priorityExecutorBuilder;
this.normalExecutorBuilder = normalExecutorBuilder;

Expand Down Expand Up @@ -352,6 +357,7 @@ protected S3BlobStore createBlobStore() {
bulkDeletesSize,
metadata,
asyncUploadUtils,
urgentExecutorBuilder,
priorityExecutorBuilder,
normalExecutorBuilder
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@
* A plugin to add a repository type that writes to and from the AWS S3.
*/
public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, ReloadablePlugin {

private static final String URGENT_FUTURE_COMPLETION = "urgent_future_completion";
private static final String URGENT_STREAM_READER = "urgent_stream_reader";
private static final String PRIORITY_FUTURE_COMPLETION = "priority_future_completion";
private static final String PRIORITY_STREAM_READER = "priority_stream_reader";
private static final String FUTURE_COMPLETION = "future_completion";
Expand All @@ -85,6 +88,7 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo

private final Path configPath;

private AsyncExecutorContainer urgentExecutorBuilder;
private AsyncExecutorContainer priorityExecutorBuilder;
private AsyncExecutorContainer normalExecutorBuilder;

Expand All @@ -96,6 +100,10 @@ public S3RepositoryPlugin(final Settings settings, final Path configPath) {
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
List<ExecutorBuilder<?>> executorBuilders = new ArrayList<>();
int halfProcMaxAt5 = halfAllocatedProcessorsMaxFive(allocatedProcessors(settings));
executorBuilders.add(
new FixedExecutorBuilder(settings, URGENT_FUTURE_COMPLETION, urgentPoolCount(settings), 10_000, URGENT_FUTURE_COMPLETION)
);
executorBuilders.add(new ScalingExecutorBuilder(URGENT_STREAM_READER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
executorBuilders.add(
new FixedExecutorBuilder(settings, PRIORITY_FUTURE_COMPLETION, priorityPoolCount(settings), 10_000, PRIORITY_FUTURE_COMPLETION)
);
Expand Down Expand Up @@ -128,6 +136,10 @@ private static int allocatedProcessors(Settings settings) {
return OpenSearchExecutors.allocatedProcessors(settings);
}

private static int urgentPoolCount(Settings settings) {
return boundedBy((allocatedProcessors(settings) + 7) / 8, 1, 2);
}

private static int priorityPoolCount(Settings settings) {
return boundedBy((allocatedProcessors(settings) + 1) / 2, 2, 4);
}
Expand All @@ -150,8 +162,14 @@ public Collection<Object> createComponents(
final IndexNameExpressionResolver expressionResolver,
final Supplier<RepositoriesService> repositoriesServiceSupplier
) {
int urgentEventLoopThreads = urgentPoolCount(clusterService.getSettings());
int priorityEventLoopThreads = priorityPoolCount(clusterService.getSettings());
int normalEventLoopThreads = normalPoolCount(clusterService.getSettings());
this.urgentExecutorBuilder = new AsyncExecutorContainer(
threadPool.executor(URGENT_FUTURE_COMPLETION),
threadPool.executor(URGENT_STREAM_READER),
new AsyncTransferEventLoopGroup(urgentEventLoopThreads)
);
this.priorityExecutorBuilder = new AsyncExecutorContainer(
threadPool.executor(PRIORITY_FUTURE_COMPLETION),
threadPool.executor(PRIORITY_STREAM_READER),
Expand All @@ -176,7 +194,8 @@ protected S3Repository createRepository(
AsyncTransferManager asyncUploadUtils = new AsyncTransferManager(
S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING.get(clusterService.getSettings()).getBytes(),
normalExecutorBuilder.getStreamReader(),
priorityExecutorBuilder.getStreamReader()
priorityExecutorBuilder.getStreamReader(),
urgentExecutorBuilder.getStreamReader()
);
return new S3Repository(
metadata,
Expand All @@ -185,6 +204,7 @@ protected S3Repository createRepository(
clusterService,
recoverySettings,
asyncUploadUtils,
urgentExecutorBuilder,
priorityExecutorBuilder,
normalExecutorBuilder,
s3AsyncService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class AsyncPartsHandler {
* @param s3AsyncClient S3 client to use for upload
* @param executorService Thread pool for regular upload
* @param priorityExecutorService Thread pool for priority uploads
* @param urgentExecutorService Thread pool for urgent uploads
* @param uploadRequest request for upload
* @param streamContext Stream context used in supplying individual file parts
* @param uploadId Upload Id against which multi-part is being performed
Expand All @@ -60,6 +61,7 @@ public static List<CompletableFuture<CompletedPart>> uploadParts(
S3AsyncClient s3AsyncClient,
ExecutorService executorService,
ExecutorService priorityExecutorService,
ExecutorService urgentExecutorService,
UploadRequest uploadRequest,
StreamContext streamContext,
String uploadId,
Expand All @@ -83,6 +85,7 @@ public static List<CompletableFuture<CompletedPart>> uploadParts(
s3AsyncClient,
executorService,
priorityExecutorService,
urgentExecutorService,
completedParts,
inputStreamContainers,
futures,
Expand Down Expand Up @@ -129,6 +132,7 @@ private static void uploadPart(
S3AsyncClient s3AsyncClient,
ExecutorService executorService,
ExecutorService priorityExecutorService,
ExecutorService urgentExecutorService,
AtomicReferenceArray<CompletedPart> completedParts,
AtomicReferenceArray<CheckedContainer> inputStreamContainers,
List<CompletableFuture<CompletedPart>> futures,
Expand All @@ -138,9 +142,14 @@ private static void uploadPart(
) {
Integer partNumber = uploadPartRequest.partNumber();

ExecutorService streamReadExecutor = uploadRequest.getWritePriority() == WritePriority.HIGH
? priorityExecutorService
: executorService;
ExecutorService streamReadExecutor;
if (uploadRequest.getWritePriority() == WritePriority.URGENT) {
streamReadExecutor = urgentExecutorService;
} else if (uploadRequest.getWritePriority() == WritePriority.HIGH) {
streamReadExecutor = priorityExecutorService;
} else {
streamReadExecutor = executorService;
}
// Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered
// data can be retried instead of retrying whole file by the application.
InputStream inputStream = new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public final class AsyncTransferManager {
private static final Logger log = LogManager.getLogger(AsyncTransferManager.class);
private final ExecutorService executorService;
private final ExecutorService priorityExecutorService;
private final ExecutorService urgentExecutorService;
private final long minimumPartSize;

/**
Expand All @@ -75,10 +76,16 @@ public final class AsyncTransferManager {
* @param executorService The stream reader {@link ExecutorService} for normal priority uploads
* @param priorityExecutorService The stream read {@link ExecutorService} for high priority uploads
*/
public AsyncTransferManager(long minimumPartSize, ExecutorService executorService, ExecutorService priorityExecutorService) {
public AsyncTransferManager(
long minimumPartSize,
ExecutorService executorService,
ExecutorService priorityExecutorService,
ExecutorService urgentExecutorService
) {
this.executorService = executorService;
this.priorityExecutorService = priorityExecutorService;
this.minimumPartSize = minimumPartSize;
this.urgentExecutorService = urgentExecutorService;
}

/**
Expand Down Expand Up @@ -162,6 +169,7 @@ private void doUploadInParts(
s3AsyncClient,
executorService,
priorityExecutorService,
urgentExecutorService,
uploadRequest,
streamContext,
uploadId,
Expand Down Expand Up @@ -308,9 +316,14 @@ private void uploadInOneChunk(
putObjectRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32);
putObjectRequestBuilder.checksumCRC32(base64StringFromLong(uploadRequest.getExpectedChecksum()));
}
ExecutorService streamReadExecutor = uploadRequest.getWritePriority() == WritePriority.HIGH
? priorityExecutorService
: executorService;
ExecutorService streamReadExecutor;
if (uploadRequest.getWritePriority() == WritePriority.URGENT) {
streamReadExecutor = urgentExecutorService;
} else if (uploadRequest.getWritePriority() == WritePriority.HIGH) {
streamReadExecutor = priorityExecutorService;
} else {
streamReadExecutor = executorService;
}
// Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered
// data can be retried instead of retrying whole file by the application.
InputStream inputStream = new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ protected S3Repository createRepository(
ClusterService clusterService,
RecoverySettings recoverySettings
) {
return new S3Repository(metadata, registry, service, clusterService, recoverySettings, null, null, null, null, false) {
return new S3Repository(metadata, registry, service, clusterService, recoverySettings, null, null, null, null, null, false) {
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we create repo manually on test/main threads
Expand Down
Loading