Skip to content

Commit 7b4bd23

Browse files
authored
Resolver Thread management (#1541)
Rework thread management. High level changes: * introduce simple `SmartExecutor` to hide underlying things; offer minimal surface just enough for consumers * make 3 consumers use them with try-with-resources consistently (basic connector, metadata resolver, BF collector) * on Java 21 use virtual threads, as all 3 consumer executes very IO bound tasks (ideal contenders) Details: * consumer basic connector logic simplified; instead of going directly for "direct" executor, use one single method, and perform direct executions as needed (as it had already all in place already); basic connector retains the ability to "direct execute" tasks when appropriate (as it knows ahead the task count) * all 3 consumers use session cached `SmartExecutor` instances (basic connector exceptions for direct execution explained above) Consequences: the parameters * for basic connector: `aether.connector.basic.downstreamThreads`, `aether.connector.basic.upstreamThreads` and `aether.connector.basic.threads` (they may be suffixed with `.repoId`) * for BF collector `aether.dependencyCollector.bf.threads` * for metadata resolver `aether.metadataResolver.threads` are from now on **global and reflect the reality for maven session**, in a way they are now truly aplied per-Maven session (limits now represent true per-session limits). Before, this was not the case, and Maven was able to fully "step over" these limits: consider BF collector, it created always "local" thread pool with size 5. But alas, Maven 4 builds project models already in parallel (!), hence, BF collector (to resolve models for model builder) was already called from several threads, and each thread created "local" pool for it's own use. This _may imply_ that pool sizes _may need a revisit_, as they _may_ be too conservative now (or they were just not truly applied, as intended). Fixes #1537
1 parent a65529b commit 7b4bd23

File tree

8 files changed

+453
-85
lines changed

8 files changed

+453
-85
lines changed

maven-resolver-connector-basic/src/main/java/org/eclipse/aether/connector/basic/BasicRepositoryConnector.java

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import java.util.List;
2929
import java.util.Map;
3030
import java.util.concurrent.ConcurrentHashMap;
31-
import java.util.concurrent.Executor;
3231
import java.util.concurrent.atomic.AtomicBoolean;
3332

3433
import org.eclipse.aether.RepositorySystemSession;
@@ -62,8 +61,9 @@
6261
import org.eclipse.aether.transfer.TransferResource;
6362
import org.eclipse.aether.util.ConfigUtils;
6463
import org.eclipse.aether.util.FileUtils;
65-
import org.eclipse.aether.util.concurrency.ExecutorUtils;
6664
import org.eclipse.aether.util.concurrency.RunnableErrorForwarder;
65+
import org.eclipse.aether.util.concurrency.SmartExecutor;
66+
import org.eclipse.aether.util.concurrency.SmartExecutorUtils;
6767
import org.slf4j.Logger;
6868
import org.slf4j.LoggerFactory;
6969

@@ -109,7 +109,7 @@ final class BasicRepositoryConnector implements RepositoryConnector {
109109

110110
private final boolean persistedChecksums;
111111

112-
private final ConcurrentHashMap<Boolean, Executor> executors;
112+
private final ConcurrentHashMap<Boolean, SmartExecutor> executors;
113113

114114
private final AtomicBoolean closed;
115115

@@ -144,13 +144,13 @@ final class BasicRepositoryConnector implements RepositoryConnector {
144144
this.executors = new ConcurrentHashMap<>();
145145
this.closed = new AtomicBoolean(false);
146146

147-
maxUpstreamThreads = ExecutorUtils.threadCount(
147+
maxUpstreamThreads = ConfigUtils.getInteger(
148148
session,
149149
DEFAULT_THREADS,
150150
CONFIG_PROP_UPSTREAM_THREADS + "." + repository.getId(),
151151
CONFIG_PROP_UPSTREAM_THREADS,
152152
CONFIG_PROP_THREADS);
153-
maxDownstreamThreads = ExecutorUtils.threadCount(
153+
maxDownstreamThreads = ConfigUtils.getInteger(
154154
session,
155155
DEFAULT_THREADS,
156156
CONFIG_PROP_DOWNSTREAM_THREADS + "." + repository.getId(),
@@ -166,25 +166,28 @@ final class BasicRepositoryConnector implements RepositoryConnector {
166166
ConfigUtils.getBoolean(session, DEFAULT_PERSISTED_CHECKSUMS, CONFIG_PROP_PERSISTED_CHECKSUMS);
167167
}
168168

169-
private Executor getExecutor(boolean downstream, int tasks) {
169+
/**
170+
* Returns {@link SmartExecutor} to execute tasks with or {@code null} if "direct execution" is more appropriate.
171+
*/
172+
private SmartExecutor getExecutor(boolean downstream, int tasks) {
170173
int maxThreads = downstream ? maxDownstreamThreads : maxUpstreamThreads;
171-
if (maxThreads <= 1) {
172-
return ExecutorUtils.DIRECT_EXECUTOR;
173-
}
174-
if (tasks <= 1) {
175-
return ExecutorUtils.DIRECT_EXECUTOR;
174+
if (maxThreads <= 1 || tasks <= 1) {
175+
// direct and do not cache it
176+
return null;
176177
}
178+
// we intentionally ignore tasks here as deployer may invoke several times connector with different count of
179+
// payloads; so maximize it
177180
return executors.computeIfAbsent(
178181
downstream,
179-
k -> ExecutorUtils.threadPool(
180-
maxThreads, getClass().getSimpleName() + '-' + repository.getHost() + '-'));
182+
k -> SmartExecutorUtils.smartExecutor(
183+
session, null, maxThreads, getClass().getSimpleName() + '-' + repository.getHost() + '-'));
181184
}
182185

183186
@Override
184187
public void close() {
185188
if (closed.compareAndSet(false, true)) {
186-
for (Executor executor : executors.values()) {
187-
ExecutorUtils.shutdown(executor);
189+
for (SmartExecutor executor : executors.values()) {
190+
executor.close();
188191
}
189192
transporter.close();
190193
}
@@ -205,7 +208,7 @@ public void get(
205208
Collection<? extends ArtifactDownload> safeArtifactDownloads = safe(artifactDownloads);
206209
Collection<? extends MetadataDownload> safeMetadataDownloads = safe(metadataDownloads);
207210

208-
Executor executor = getExecutor(true, safeArtifactDownloads.size() + safeMetadataDownloads.size());
211+
SmartExecutor executor = getExecutor(true, safeArtifactDownloads.size() + safeMetadataDownloads.size());
209212
RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
210213
List<ChecksumAlgorithmFactory> checksumAlgorithmFactories = layout.getChecksumAlgorithmFactories();
211214

@@ -232,11 +235,11 @@ public void get(
232235
checksumLocations,
233236
null,
234237
listener);
235-
if (first) {
238+
if (executor == null || first) {
236239
task.run();
237240
first = false;
238241
} else {
239-
executor.execute(errorForwarder.wrap(task));
242+
executor.submit(errorForwarder.wrap(task));
240243
}
241244
}
242245

@@ -277,11 +280,11 @@ public void get(
277280
providedChecksums,
278281
listener);
279282
}
280-
if (first) {
283+
if (executor == null || first) {
281284
task.run();
282285
first = false;
283286
} else {
284-
executor.execute(errorForwarder.wrap(task));
287+
executor.submit(errorForwarder.wrap(task));
285288
}
286289
}
287290

@@ -297,7 +300,7 @@ public void put(
297300
Collection<? extends ArtifactUpload> safeArtifactUploads = safe(artifactUploads);
298301
Collection<? extends MetadataUpload> safeMetadataUploads = safe(metadataUploads);
299302

300-
Executor executor =
303+
SmartExecutor executor =
301304
getExecutor(false, parallelPut ? safeArtifactUploads.size() + safeMetadataUploads.size() : 1);
302305
RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
303306

@@ -314,11 +317,11 @@ public void put(
314317
layout.getChecksumLocations(transfer.getArtifact(), true, location);
315318

316319
Runnable task = new PutTaskRunner(location, transfer.getPath(), checksumLocations, listener);
317-
if (first) {
320+
if (executor == null || first) {
318321
task.run();
319322
first = false;
320323
} else {
321-
executor.execute(errorForwarder.wrap(task));
324+
executor.submit(errorForwarder.wrap(task));
322325
}
323326
}
324327

@@ -336,11 +339,11 @@ public void put(
336339
layout.getChecksumLocations(transfer.getMetadata(), true, location);
337340

338341
Runnable task = new PutTaskRunner(location, transfer.getPath(), checksumLocations, listener);
339-
if (first) {
342+
if (executor == null || first) {
340343
task.run();
341344
first = false;
342345
} else {
343-
executor.execute(errorForwarder.wrap(task));
346+
executor.submit(errorForwarder.wrap(task));
344347
}
345348
}
346349

maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/DefaultMetadataResolver.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import java.util.HashMap;
3333
import java.util.List;
3434
import java.util.Map;
35-
import java.util.concurrent.Executor;
3635

3736
import org.eclipse.aether.ConfigurationProperties;
3837
import org.eclipse.aether.RepositoryEvent;
@@ -68,8 +67,10 @@
6867
import org.eclipse.aether.transfer.MetadataTransferException;
6968
import org.eclipse.aether.transfer.NoRepositoryConnectorException;
7069
import org.eclipse.aether.transfer.RepositoryOfflineException;
71-
import org.eclipse.aether.util.concurrency.ExecutorUtils;
70+
import org.eclipse.aether.util.ConfigUtils;
7271
import org.eclipse.aether.util.concurrency.RunnableErrorForwarder;
72+
import org.eclipse.aether.util.concurrency.SmartExecutor;
73+
import org.eclipse.aether.util.concurrency.SmartExecutorUtils;
7374

7475
import static java.util.Objects.requireNonNull;
7576

@@ -305,17 +306,18 @@ private List<MetadataResult> resolve(
305306
}
306307

307308
if (!tasks.isEmpty()) {
308-
int threads = ExecutorUtils.threadCount(session, DEFAULT_THREADS, CONFIG_PROP_THREADS);
309-
Executor executor = ExecutorUtils.executor(
310-
Math.min(tasks.size(), threads), getClass().getSimpleName() + '-');
311-
try {
309+
try (SmartExecutor executor = SmartExecutorUtils.smartExecutor(
310+
session,
311+
null, // we want global executor
312+
ConfigUtils.getInteger(session, DEFAULT_THREADS, CONFIG_PROP_THREADS),
313+
getClass().getSimpleName() + "-")) {
312314
RunnableErrorForwarder errorForwarder = new RunnableErrorForwarder();
313315

314316
for (ResolveTask task : tasks) {
315317
metadataDownloading(
316318
task.session, task.trace, task.request.getMetadata(), task.request.getRepository());
317319

318-
executor.execute(errorForwarder.wrap(task));
320+
executor.submit(errorForwarder.wrap(task));
319321
}
320322

321323
errorForwarder.await();
@@ -340,8 +342,6 @@ private List<MetadataResult> resolve(
340342

341343
task.result.setException(task.exception);
342344
}
343-
} finally {
344-
ExecutorUtils.shutdown(executor);
345345
}
346346
for (ResolveTask task : tasks) {
347347
Metadata metadata = task.request.getMetadata();

maven-resolver-impl/src/main/java/org/eclipse/aether/internal/impl/collect/bf/BfDependencyCollector.java

Lines changed: 23 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,8 @@
3333
import java.util.Queue;
3434
import java.util.Set;
3535
import java.util.concurrent.Callable;
36+
import java.util.concurrent.CompletableFuture;
3637
import java.util.concurrent.ConcurrentHashMap;
37-
import java.util.concurrent.ExecutionException;
38-
import java.util.concurrent.ExecutorService;
39-
import java.util.concurrent.Future;
40-
import java.util.concurrent.TimeUnit;
41-
import java.util.concurrent.TimeoutException;
4238
import java.util.concurrent.atomic.AtomicReference;
4339
import java.util.stream.Collectors;
4440
import java.util.stream.Stream;
@@ -72,7 +68,8 @@
7268
import org.eclipse.aether.spi.artifact.decorator.ArtifactDecoratorFactory;
7369
import org.eclipse.aether.util.ConfigUtils;
7470
import org.eclipse.aether.util.artifact.ArtifactIdUtils;
75-
import org.eclipse.aether.util.concurrency.ExecutorUtils;
71+
import org.eclipse.aether.util.concurrency.SmartExecutor;
72+
import org.eclipse.aether.util.concurrency.SmartExecutorUtils;
7673
import org.eclipse.aether.util.graph.manager.DependencyManagerUtils;
7774
import org.eclipse.aether.version.Version;
7875

@@ -150,8 +147,6 @@ protected void doCollectDependencies(
150147
Results results)
151148
throws DependencyCollectionException {
152149
boolean useSkip = ConfigUtils.getBoolean(session, DEFAULT_SKIPPER, CONFIG_PROP_SKIPPER);
153-
int nThreads = ExecutorUtils.threadCount(session, DEFAULT_THREADS, CONFIG_PROP_THREADS);
154-
logger.debug("Using thread pool with {} threads to resolve descriptors.", nThreads);
155150

156151
if (useSkip) {
157152
logger.debug("Collector skip mode enabled");
@@ -160,7 +155,12 @@ protected void doCollectDependencies(
160155
try (DependencyResolutionSkipper skipper = useSkip
161156
? DependencyResolutionSkipper.defaultSkipper()
162157
: DependencyResolutionSkipper.neverSkipper();
163-
ParallelDescriptorResolver parallelDescriptorResolver = new ParallelDescriptorResolver(nThreads)) {
158+
ParallelDescriptorResolver parallelDescriptorResolver =
159+
new ParallelDescriptorResolver(SmartExecutorUtils.smartExecutor(
160+
session,
161+
null, // we don't know ahead of time; we want global executor
162+
ConfigUtils.getInteger(session, DEFAULT_THREADS, CONFIG_PROP_THREADS),
163+
getClass().getSimpleName() + "-"))) {
164164
Args args = new Args(session, pool, context, versionContext, request, skipper, parallelDescriptorResolver);
165165

166166
DependencySelector rootDepSelector = session.getDependencySelector() != null
@@ -230,7 +230,8 @@ private void processDependency(
230230
boolean traverse =
231231
!noDescriptor && (context.depTraverser == null || context.depTraverser.traverseDependency(dependency));
232232

233-
Future<DescriptorResolutionResult> resolutionResultFuture = args.resolver.find(dependency.getArtifact());
233+
CompletableFuture<DescriptorResolutionResult> resolutionResultFuture =
234+
args.resolver.find(dependency.getArtifact());
234235
DescriptorResolutionResult resolutionResult;
235236
VersionRangeResult rangeResult;
236237
try {
@@ -470,65 +471,36 @@ private ArtifactDescriptorResult resolveDescriptorForVersion(
470471
}
471472

472473
static class ParallelDescriptorResolver implements Closeable {
473-
private final ExecutorService executorService;
474+
private final SmartExecutor smartExecutor;
474475

475476
/**
476477
* Artifact ID -> Future of DescriptorResolutionResult
477478
*/
478-
private final Map<String, Future<DescriptorResolutionResult>> results = new ConcurrentHashMap<>(256);
479+
private final Map<String, CompletableFuture<DescriptorResolutionResult>> results = new ConcurrentHashMap<>(256);
479480

480-
ParallelDescriptorResolver(int threads) {
481-
this.executorService = ExecutorUtils.threadPool(threads, getClass().getSimpleName() + "-");
481+
ParallelDescriptorResolver(SmartExecutor smartExecutor) {
482+
this.smartExecutor = smartExecutor;
482483
}
483484

484485
void resolveDescriptors(Artifact artifact, Callable<DescriptorResolutionResult> callable) {
485-
results.computeIfAbsent(ArtifactIdUtils.toId(artifact), key -> this.executorService.submit(callable));
486+
results.computeIfAbsent(ArtifactIdUtils.toId(artifact), key -> smartExecutor.submit(callable));
486487
}
487488

488489
void cacheVersionRangeDescriptor(Artifact artifact, DescriptorResolutionResult resolutionResult) {
489-
results.computeIfAbsent(ArtifactIdUtils.toId(artifact), key -> new DoneFuture<>(resolutionResult));
490+
results.computeIfAbsent(ArtifactIdUtils.toId(artifact), key -> {
491+
CompletableFuture<DescriptorResolutionResult> future = new CompletableFuture<>();
492+
future.complete(resolutionResult);
493+
return future;
494+
});
490495
}
491496

492-
Future<DescriptorResolutionResult> find(Artifact artifact) {
497+
CompletableFuture<DescriptorResolutionResult> find(Artifact artifact) {
493498
return results.get(ArtifactIdUtils.toId(artifact));
494499
}
495500

496501
@Override
497502
public void close() {
498-
executorService.shutdown();
499-
}
500-
}
501-
502-
static class DoneFuture<V> implements Future<V> {
503-
private final V v;
504-
505-
DoneFuture(V v) {
506-
this.v = v;
507-
}
508-
509-
@Override
510-
public boolean cancel(boolean mayInterruptIfRunning) {
511-
return false;
512-
}
513-
514-
@Override
515-
public boolean isCancelled() {
516-
return false;
517-
}
518-
519-
@Override
520-
public boolean isDone() {
521-
return true;
522-
}
523-
524-
@Override
525-
public V get() throws InterruptedException, ExecutionException {
526-
return v;
527-
}
528-
529-
@Override
530-
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
531-
return v;
503+
smartExecutor.close();
532504
}
533505
}
534506

maven-resolver-util/pom.xml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,26 @@
6161
<groupId>com.github.siom79.japicmp</groupId>
6262
<artifactId>japicmp-maven-plugin</artifactId>
6363
</plugin>
64+
<plugin>
65+
<groupId>org.apache.maven.plugins</groupId>
66+
<artifactId>maven-compiler-plugin</artifactId>
67+
<executions>
68+
<execution>
69+
<id>java21</id>
70+
<goals>
71+
<goal>compile</goal>
72+
</goals>
73+
<phase>compile</phase>
74+
<configuration>
75+
<release>21</release>
76+
<compileSourceRoots>
77+
<compileSourceRoot>${project.basedir}/src/main/java21</compileSourceRoot>
78+
</compileSourceRoots>
79+
<multiReleaseOutput>true</multiReleaseOutput>
80+
</configuration>
81+
</execution>
82+
</executions>
83+
</plugin>
6484
</plugins>
6585
</build>
6686
</project>

maven-resolver-util/src/main/java/org/eclipse/aether/util/concurrency/ExecutorUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@
3131
* Utilities for executors and sizing them.
3232
*
3333
* @since 1.9.5
34+
* @deprecated For removal. Nothing is using this class within Resolver.
3435
*/
36+
@Deprecated
3537
public final class ExecutorUtils {
3638
/**
3739
* Shared instance of "direct executor".

0 commit comments

Comments
 (0)