Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Created new urgent priority threadpool for remote cluster state uploads #10685

Merged
Prev Previous commit
Next Next commit
Merge remote-tracking branch 'origin/main' into remote-state-threadpool
  • Loading branch information
shiv0408 committed Oct 19, 2023
commit 4baf8019b9cd8b57b5d28ddccf4fa3d5c78b36c4
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,13 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
List<ExecutorBuilder<?>> executorBuilders = new ArrayList<>();
int halfProcMaxAt5 = halfAllocatedProcessorsMaxFive(allocatedProcessors(settings));
executorBuilders.add(
new FixedExecutorBuilder(settings, URGENT_FUTURE_COMPLETION, urgentPoolCount(settings), 10_000, URGENT_FUTURE_COMPLETION)
);
executorBuilders.add(
new FixedExecutorBuilder(settings, URGENT_STREAM_READER, urgentPoolCount(settings), 10_000, URGENT_STREAM_READER)
new FixedExecutorBuilder(settings, URGENT_FUTURE_COMPLETION, urgentPoolCount(settings), 10_000, URGENT_FUTURE_COMPLETION)
);
executorBuilders.add(new ScalingExecutorBuilder(URGENT_STREAM_READER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
executorBuilders.add(
new FixedExecutorBuilder(settings, PRIORITY_FUTURE_COMPLETION, priorityPoolCount(settings), 10_000, PRIORITY_FUTURE_COMPLETION)
);
executorBuilders.add(
new FixedExecutorBuilder(settings, PRIORITY_STREAM_READER, priorityPoolCount(settings), 10_000, PRIORITY_STREAM_READER)
);
executorBuilders.add(new ScalingExecutorBuilder(PRIORITY_STREAM_READER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));

executorBuilders.add(new FixedExecutorBuilder(settings, FUTURE_COMPLETION, normalPoolCount(settings), 10_000, FUTURE_COMPLETION));
executorBuilders.add(new ScalingExecutorBuilder(STREAM_READER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ private static void uploadPart(
} else {
streamReadExecutor = executorService;
}
// Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered
// data can be retried instead of retrying whole file by the application.
InputStream inputStream = new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1));
CompletableFuture<UploadPartResponse> uploadPartResponseFuture = SocketAccess.doPrivileged(
() -> s3AsyncClient.uploadPart(
uploadPartRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,9 @@ private void uploadInOneChunk(
} else {
streamReadExecutor = executorService;
}
// Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered
// data can be retried instead of retrying whole file by the application.
InputStream inputStream = new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1));
CompletableFuture<Void> putObjectFuture = SocketAccess.doPrivileged(
() -> s3AsyncClient.putObject(
putObjectRequestBuilder.build(),
Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.