diff --git a/reactor-pool/build.gradle b/reactor-pool/build.gradle index eb07557f..7a3551fd 100644 --- a/reactor-pool/build.gradle +++ b/reactor-pool/build.gradle @@ -127,6 +127,8 @@ task japicmp(type: JapicmpTask) { classExcludes = [ ] methodExcludes = [ + "reactor.pool.decorators.InstrumentedPoolDecorators#concurrentPools(int, java.util.function.Function)", + "reactor.pool.decorators.InstrumentedPoolDecorators#concurrentPools(int, int, int, java.util.function.BiFunction)" ] } check.dependsOn japicmp diff --git a/reactor-pool/src/main/java/reactor/pool/AbstractPool.java b/reactor-pool/src/main/java/reactor/pool/AbstractPool.java index 249a3581..b009635b 100644 --- a/reactor-pool/src/main/java/reactor/pool/AbstractPool.java +++ b/reactor-pool/src/main/java/reactor/pool/AbstractPool.java @@ -22,6 +22,7 @@ import java.time.Duration; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import org.reactivestreams.Publisher; @@ -390,7 +391,7 @@ static final class Borrower extends AtomicBoolean implements Scannable static final Disposable TIMEOUT_DISPOSED = Disposables.disposed(); final CoreSubscriber> actual; - final AbstractPool pool; + final AtomicReference> pool; final Duration pendingAcquireTimeout; long pendingAcquireStart; @@ -400,7 +401,7 @@ static final class Borrower extends AtomicBoolean implements Scannable AbstractPool pool, Duration pendingAcquireTimeout) { this.actual = actual; - this.pool = pool; + this.pool = new AtomicReference<>(pool); this.pendingAcquireTimeout = pendingAcquireTimeout; this.timeoutTask = TIMEOUT_DISPOSED; } @@ -414,7 +415,7 @@ public void run() { if (Borrower.this.compareAndSet(false, true)) { // this is failure, a timeout was observed stopPendingCountdown(false); - pool.cancelAcquire(Borrower.this); + pool().cancelAcquire(Borrower.this); actual.onError(new PoolAcquireTimeoutException(pendingAcquireTimeout)); } } @@ -423,13 +424,13 @@ public void run() { public void request(long n) { if (Operators.validate(n)) { //start the countdown - + AbstractPool pool = pool(); boolean noIdle = pool.idleSize() == 0; boolean noPermits = pool.poolConfig.allocationStrategy().estimatePermitCount() == 0; if (!pendingAcquireTimeout.isZero() && noIdle && noPermits) { pendingAcquireStart = pool.clock.millis(); - timeoutTask = this.pool.config().pendingAcquireTimer().apply(this, pendingAcquireTimeout); + timeoutTask = pool.config().pendingAcquireTimer().apply(this, pendingAcquireTimeout); } //doAcquire should interrupt the countdown if there is either an available //resource or the pool can allocate one @@ -442,6 +443,7 @@ public void request(long n) { */ void stopPendingCountdown(boolean success) { if (!timeoutTask.isDisposed()) { + AbstractPool pool = pool(); if (success) { pool.metricsRecorder.recordPendingSuccessAndLatency(pool.clock.millis() - pendingAcquireStart); } else { @@ -454,7 +456,7 @@ void stopPendingCountdown(boolean success) { @Override public void cancel() { set(true); - pool.cancelAcquire(this); + pool().cancelAcquire(this); stopPendingCountdown(true); // this is not failure, the subscription was canceled } @@ -493,6 +495,14 @@ void fail(Throwable error) { public String toString() { return get() ? "Borrower(cancelled)" : "Borrower"; } + + AbstractPool pool() { + return pool.get(); + } + + void setPool(AbstractPool replace) { + pool.set(replace); + } } } diff --git a/reactor-pool/src/main/java/reactor/pool/InstrumentedPool.java b/reactor-pool/src/main/java/reactor/pool/InstrumentedPool.java index 6eb41b3b..c7bc839b 100644 --- a/reactor-pool/src/main/java/reactor/pool/InstrumentedPool.java +++ b/reactor-pool/src/main/java/reactor/pool/InstrumentedPool.java @@ -31,6 +31,16 @@ public interface InstrumentedPool extends Pool { */ PoolMetrics metrics(); + /** + * Steal some pending borrowers from another pool. + * + * @param fromPool another pool to steal resources from + * @return true if some borrowers have been moved from fromPool into this pool instance + */ + default boolean steal(InstrumentedPool fromPool) { + return false; + } + /** * An object that can be used to get live information about a {@link Pool}, suitable * for gauge metrics. @@ -136,5 +146,14 @@ default boolean isInactiveForMoreThan(Duration duration) { * @return the maximum number of pending acquire that can be enqueued by this {@link Pool} */ int getMaxPendingAcquireSize(); + + /** + * Get the number of borrowers steal count (only if the Pool supports work stealing). + * + * @return the number of Pool steal count + */ + default long stealCount() { + return 0; + } } } diff --git a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java index 4b937bf8..57e9c276 100644 --- a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java +++ b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java @@ -121,6 +121,24 @@ public Mono> acquire(Duration timeout) { timeout); //the mono is unknown to the pool until requested } + @Override + public boolean steal(InstrumentedPool pool) { + SimpleDequePool other = (SimpleDequePool) pool; + + if (!other.isDisposed()) { + ConcurrentLinkedDeque> q = other.pending; + Borrower b = other.pendingPoll(q); + if (b != null && !b.get()) { + // TODO check race conditions when timer expires or subscription is cancelled concurrently ! + b.setPool(this); + doAcquire(b); + return true; + } + } + + return false; + } + @Override public int acquiredSize() { return acquired; diff --git a/reactor-pool/src/main/java/reactor/pool/decorators/InstrumentedPoolDecorators.java b/reactor-pool/src/main/java/reactor/pool/decorators/InstrumentedPoolDecorators.java index 63c25848..309d70b8 100644 --- a/reactor-pool/src/main/java/reactor/pool/decorators/InstrumentedPoolDecorators.java +++ b/reactor-pool/src/main/java/reactor/pool/decorators/InstrumentedPoolDecorators.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2021-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,9 +17,13 @@ package reactor.pool.decorators; import java.time.Duration; +import java.util.concurrent.Executor; +import java.util.function.BiFunction; import java.util.function.Function; +import java.util.stream.IntStream; import reactor.pool.InstrumentedPool; +import reactor.util.function.Tuple2; /** * Utility class to expose various {@link InstrumentedPool} decorators, which can also be used @@ -42,6 +46,70 @@ public static GracefulShutdownInstrumentedPool gracefulShutdown(Instrumen return new GracefulShutdownInstrumentedPool<>(pool); } + /** + * Creates a pool composed of multiple sub pools, each managing a portion of resources. Resource acquisitions will + * be concurrently distributed across sub pools using sub pool executors, in a work stealing style. + * + * @param size The number of sub pools to create. + * @param poolFactory A factory method creating sub pools called with the index of the sub pool to create. + * This BiFunction takes the index of the pool to create and returns a Tuple2 consisting of an + * InstrumentedPool and a dedicated single-thread Executor associated with that pool. Executors + * must not be shared between sub pools. + * @param the type of resources in the pool + * @return a decorated concurrent InstrumentedPool that will distribute resource acquisitions across all sub pools in a work + * stealing way, using dedicated sub pool executors + */ + public static InstrumentedPool concurrentPools(int size, + Function, Executor>> poolFactory) { + @SuppressWarnings("unchecked") + Tuple2, Executor>[] subPools = IntStream.range(0, size) + .mapToObj(poolFactory::apply) + .toArray(Tuple2[]::new); + return new WorkStealingPool<>(subPools); + } + + /** + * Creates a pool composed of multiple sub pools, each managing a portion of resources. Resource acquisitions will + * be concurrently distributed across sub pools using sub pool executors, in a work stealing style. + * + * @param size The number of sub pools to create. + * @param min The minimum number of resources to be kept by all sub pools collectively. + * @param max The maximum number of resources that can be allocated by all sub pools collectively. + * @param poolFactory A method that constructs sub pools. It takes adjusted min/max parameters specific to each sub + * pool to create and returns a Tuple2 comprising an InstrumentedPool and a dedicated + * single-thread Executor exclusively associated with that pool (Executors must not be shared + * between sub pools). The adjusted parameters ensure equitable resource distribution among sub + * pools. Remember, the key here is that the poolFactory is invoked for each sub pool creation + * with adjusted min/max parameters to maintain fairness across the pools. + *

+ * Example:
+ * Suppose you're creating 3 sub pools with a global min=10 and max=100. The poolFactory is invoked 3 times, each + * time with adjusted min/max parameters to ensure fair distribution: + *

    + *
  • First invocation with min=4, max=34
  • + *
  • Second invocation: min=3, max=33
  • + *
  • Third invocation: min=3, max=33
  • + *
+ * + * @param the type of resources in the pool + * @return a decorated concurrent InstrumentedPool which handles resource acquisitions concurrently across all sub pools + * utilizing dedicated sub pool executors. + */ + public static InstrumentedPool concurrentPools(int size, int min, int max, + BiFunction, Executor>> poolFactory) { + @SuppressWarnings("unchecked") + Tuple2, Executor>[] subPools = IntStream.range(0, size) + .mapToObj(poolIndex -> { + int minDiv = min / size; + int minMod = min % size; + int maxDiv = max / size; + int maxMod = max % size; + return poolFactory.apply(poolIndex < minMod ? minDiv + 1 : minDiv, poolIndex < maxMod ? maxDiv + 1 : maxDiv); + }) + .toArray(Tuple2[]::new); + return new WorkStealingPool<>(subPools); + } + private InstrumentedPoolDecorators() { } } diff --git a/reactor-pool/src/main/java/reactor/pool/decorators/WorkStealingPool.java b/reactor-pool/src/main/java/reactor/pool/decorators/WorkStealingPool.java new file mode 100644 index 00000000..32478dc5 --- /dev/null +++ b/reactor-pool/src/main/java/reactor/pool/decorators/WorkStealingPool.java @@ -0,0 +1,407 @@ +/* + * Copyright (c) 2023 VMware Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.pool.decorators; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.pool.InstrumentedPool; +import reactor.pool.Pool; +import reactor.pool.PoolConfig; +import reactor.pool.PooledRef; +import reactor.util.Logger; +import reactor.util.Loggers; +import reactor.util.function.Tuple2; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.LongAdder; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +/** + * The WorkStealingPool class represents a pool of pools (InstrumentedPool instances), with acquisition task stealing + * and borrower stealing between pools. It is suitable for scenarios where acquisition tasks + * can be distributed among multiple executors to maximize throughput and resource utilization, while preventing the + * SimpleDequeueLoop drainLoop method being run forever under very high load, when there are permantently hundreds of + * thousands of borrowers. + *

+ * Each pool is assigned to a unique executor, that is not shared among pools. + *

+ * When a resource is acquired: + *

    + *
  • a random pool is selected, using Power Of Two Choices, where two pool are randomly selected and the one which + * has less pending borrowers is then selected
  • + *
  • once a pool is selected, the acquisition is then scheduled in the associated (uniq, not shared) pool executor + *
  • + *
  • when an executor has finished to handle its acquisition tasks, it tries to steal acquisition tasks from other + * pools. it also tries to steal some borrowers from any other pools that do have some pending borrowers
  • + *
+ *

+ * Unlike the ForkJoinPool, this class doesn't create any daemon threads, it is up to + * you to provide an Executor for each pool instance (one single executor must be solely assigned to a single pool). + */ +public final class WorkStealingPool implements InstrumentedPool, InstrumentedPool.PoolMetrics { + + private static final Logger log = Loggers.getLogger(WorkStealingPool.class); + + private final Worker[] workers; + + private final InstrumentedPool[] pools; + + private final ThreadLocal> currentWorker = ThreadLocal.withInitial(() -> null); + + private final LongAdder stealCount = new LongAdder(); + + @SuppressWarnings("unchecked") + public WorkStealingPool(Tuple2, Executor>[] pools) { + this.workers = IntStream.range(0, pools.length) + .mapToObj(i -> new Worker<>(this, i, pools[i].getT1(), pools[i].getT2())) + .peek(worker -> worker.exec.execute(() -> currentWorker.set(worker))) + .toArray(Worker[]::new); + + this.pools = Stream.of(pools) + .map(Tuple2::getT1) + .toArray(InstrumentedPool[]::new); + } + + public Mono> acquire() { + return acquire(Duration.ZERO); + } + + public Mono> acquire(Duration timeout) { + return Mono.create(sink -> execute(pool -> pool.acquire(timeout).subscribe(sink::success, sink::error))); + } + + public Mono warmup() { + AtomicInteger remaining = new AtomicInteger(pools.length); + AtomicInteger count = new AtomicInteger(); + return Mono.create(sink -> + Stream.of(pools).forEach(pool -> pool.warmup().subscribe(result -> { + count.addAndGet(result); + if (remaining.decrementAndGet() == 0) { + sink.success(count.get()); + } + }))); + } + + public PoolConfig config() { + // TODO there is no config currently for this pool decorator, so what config to return ? + // FIXME For now, return the config of the first pool, that's probably not a good thing. + return pools[0].config(); + } + + public Mono disposeLater() { + return Mono.when(Flux.fromArray(pools) + .map(Pool::disposeLater)); + } + + public PoolMetrics metrics() { + return this; + } + + // Metrics (TODO should be reworked using shared LongAdders ...) + + @Override + public int acquiredSize() { + return Stream.of(pools).mapToInt(p -> p.metrics().acquiredSize()).sum(); + } + + @Override + public int allocatedSize() { + return Stream.of(pools).mapToInt(p -> p.metrics().allocatedSize()).sum(); + } + + @Override + public int idleSize() { + return Stream.of(pools).mapToInt(p -> p.metrics().idleSize()).sum(); + } + + @Override + public int pendingAcquireSize() { + return Stream.of(pools).mapToInt(p -> p.metrics().pendingAcquireSize()).sum(); + } + + @Override + public long secondsSinceLastInteraction() { + return Stream.of(pools).mapToLong(p -> p.metrics().secondsSinceLastInteraction()).sum(); + } + + @Override + public int getMaxAllocatedSize() { + return Stream.of(pools).mapToInt(p -> p.metrics().getMaxAllocatedSize()).sum(); + } + + @Override + public int getMaxPendingAcquireSize() { + return Stream.of(pools).mapToInt(p -> p.metrics().getMaxPendingAcquireSize()).sum(); + } + + @Override + public boolean isInactiveForMoreThan(Duration duration) { + for (InstrumentedPool pool : pools) { + if (pool.metrics().isInactiveForMoreThan(duration)) { + return true; + } + } + return false; + } + + public List> getPools() { + return Collections.unmodifiableList(Arrays.asList(pools)); + } + + public long stealCount() { + return stealCount.sum(); + } + + void execute(Task task) { + Worker currWorker = currentWorker.get(); + Worker worker = currWorker == null ? nextWorker() : currWorker; + worker.execute(task); + worker.signalWork(); + } + + private Worker nextWorker() { + int workersCount = workers.length; + + if (workersCount < 2) { + return workers[0]; + } + + // Select next worker using power of two choice + int index = ThreadLocalRandom.current().nextInt(0, workersCount); + int index2 = ThreadLocalRandom.current().nextInt(0, workersCount); + if (workers[index].pool.metrics().pendingAcquireSize() > workers[index2].pool.metrics().pendingAcquireSize()) { + // worker at index is detected to be busy, select the other one from index2 + index = index2; + } + return workers[index]; + } + + private interface Task { + void run(InstrumentedPool context); + } + + private static class SignalWork implements Task { + private final int workerIndex; + + public SignalWork(int workerIndex) { + this.workerIndex = workerIndex; + } + + @Override + public void run(InstrumentedPool ctx) { + } + } + + private static final class Worker implements Runnable { + final WorkStealingPool scheduler; + + final AtomicInteger tasksScheduled = new AtomicInteger(); + + final ConcurrentLinkedDeque> queue; + + final int workerIndex; + + final InstrumentedPool pool; + + final Executor exec; + + final AtomicInteger signals = new AtomicInteger(); + + public Worker(WorkStealingPool scheduler, int workerIndex, InstrumentedPool pool, Executor workerExecutor) { + this.scheduler = scheduler; + this.workerIndex = workerIndex; + this.queue = new ConcurrentLinkedDeque<>(); + this.pool = pool; + this.exec = workerExecutor; + } + + int getIndex() { + return workerIndex; + } + + InstrumentedPool pool() { + return pool; + } + + void execute(Task task) { + queue.add(task); + if (tasksScheduled.getAndIncrement() == 0) { + exec.execute(this); + } + } + + @Override + public void run() { + ConcurrentLinkedDeque> queue = this.queue; + AtomicInteger tasksScheduled = this.tasksScheduled; + AtomicInteger signals = this.signals; + int hint = -1; + + try { + Task task; + + do { + task = queue.poll(); + if ((task instanceof SignalWork)) { + hint = hint == -1 ? ((SignalWork) task).workerIndex : hint; + continue; + } + if (task != null) { + // if task is null, it means it has been stolen + runTask(task); + } + } while (tasksScheduled.decrementAndGet() > 0); + + if (pool.metrics().pendingAcquireSize() > 0) { + signalWork(); + } + + // Try to steal some borrowers from all workers. + stealBorrowers(hint); + + // Try to steal some acqisition tasks from any signaling worker, and all other workers. + stealAcquireTasks(hint); + } finally { + int currentSignals = signals.get(); + if (! signals.compareAndSet(currentSignals, 0)) { + // Someone signaled us right now, avoid potential signals miss and reschedule a signal now + execute(new SignalWork<>(-1)); + } + } + } + + private void signalWork() { + Worker[] workers = scheduler.workers; + int workerIndex = this.workerIndex; + + int workersCount = scheduler.workers.length; + + if (workersCount > 1) { + int index; + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while ((index = rnd.nextInt(workersCount)) == workerIndex) + ; + if (workers[index].signals.getAndIncrement() == 0) { + workers[index].execute(new SignalWork<>(workerIndex)); + } + } + } + + private void runTask(Task task) { + try { + task.run(pool); + } catch (Throwable t) { + log.warn("Exception caught while running worker task", t); + } + } + + private void stealAcquireTasks(int hint) { + Worker[] workers = scheduler.workers; + LongAdder stealCount = scheduler.stealCount; + int myIndex = workerIndex; + Task work; + Worker worker; + int index = hint == -1 ? 0 : hint; + + if (pool.metrics().pendingAcquireSize() > 0) { + return; + } + + int workersCount = scheduler.workers.length; + + for (int i = 0; i < workersCount && workersCount > 1; i++) { + worker = workers[index]; + + if (worker.getIndex() != myIndex) { + while ((work = worker.steal()) != null) { + stealCount.increment(); + runTask(work); + if (pool.metrics().pendingAcquireSize() > 0) { + return; + } + } + } + + if ((++index) == workersCount) { + index = 0; + } + } + } + + /** + * If this executor's pool don't have any pending borrowers (meaning it's full), + * try to steal any pending borrowers from other executor pools. + */ + private void stealBorrowers(int hint) { + InstrumentedPool currPool = pool; + InstrumentedPool otherPool; + Worker[] workers = scheduler.workers; + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + int workersCount = workers.length; + + if (workersCount > 1) { + if (hint == -1) { + while ((hint = rnd.nextInt(workersCount)) == getIndex()) + ; + } + + while (currPool.metrics().pendingAcquireSize() == 0) { + int steals = 0; + + for (int i = 0; i < workersCount; i++) { + if (currPool.metrics().pendingAcquireSize() > 0) { + return; + } + + if (hint >= workersCount) { + hint = 0; + } + + if (hint == getIndex()) { + continue; + } + + otherPool = workers[hint].pool(); + + if (otherPool.metrics().pendingAcquireSize() > 0 && currPool.steal(otherPool)) { + scheduler.stealCount.add(1); + steals++; + } + } + + if (steals == 0) { + return; + } + } + } + } + + private Task steal() { + return queue.pollFirst(); + } + } +} diff --git a/reactor-pool/src/test/java/reactor/pool/decorators/WorkStealingPoolTest.java b/reactor-pool/src/test/java/reactor/pool/decorators/WorkStealingPoolTest.java new file mode 100644 index 00000000..5bd66804 --- /dev/null +++ b/reactor-pool/src/test/java/reactor/pool/decorators/WorkStealingPoolTest.java @@ -0,0 +1,140 @@ +/* + * Copyright (c) 2023 VMware Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.pool.decorators; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.pool.*; +import reactor.test.StepVerifier; +import reactor.util.Logger; +import reactor.util.Loggers; +import reactor.util.function.Tuples; + +import java.time.Duration; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +class WorkStealingPoolTest { + + static final int POOLS = Runtime.getRuntime().availableProcessors(); + static final int LOOPS = 1000000; + static final int POOL_MAX_SIZE = 100; + InstrumentedPool pool; + List execs; + + static final class PiCalculator { + public double calculatePi(int terms) { + double pi = 0.0; + for (int i = 0; i < terms; i++) { + double term = 1.0 / (2 * i + 1); + if (i % 2 == 0) { + pi += term; + } else { + pi -= term; + } + } + return pi * 4.0; + } + } + + @BeforeEach + void setUp() { + execs = IntStream.range(0, POOLS).mapToObj(i -> Executors.newSingleThreadExecutor()).collect(Collectors.toList()); + Iterator execsIter = execs.iterator(); + AtomicInteger index = new AtomicInteger(); + Mono allocator = Mono.defer(() -> Mono.just(new PiCalculator())); + pool = InstrumentedPoolDecorators.concurrentPools(POOLS, 0, POOL_MAX_SIZE, (min, max) -> + Tuples.of(PoolBuilder.from(allocator).sizeBetween(min, max).buildPool(), execsIter.next())); + } + + @AfterEach + void teadDown() { + pool.disposeLater().block(Duration.ofSeconds(10)); + execs.forEach(exec -> { + try { + exec.shutdown(); + if (! exec.awaitTermination(10, TimeUnit.SECONDS)) { + throw new RuntimeException("Could not terminate executor timely."); + } + } catch (InterruptedException e) { + } + }); + } + + @Test + void smokeTest() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + pool.withPoolable(piCalculator -> Mono.just(piCalculator.calculatePi(3000))) + .subscribe(pi -> latch.countDown()); + + if (!latch.await(30, TimeUnit.SECONDS)) { + fail("could not acquire resource timely"); + } + + assertThat(pool.metrics().allocatedSize()).isEqualTo(1); + assertThat(pool.metrics().idleSize()).isEqualTo(1); + } + + final static Logger log = Loggers.getLogger(WorkStealingPoolTest.class); + + @Test + void timeoutTest() throws InterruptedException { + Iterator execsIter = execs.iterator(); + Mono allocator = Mono.defer(() -> Mono.just("foo")); + InstrumentedPool pool = InstrumentedPoolDecorators.concurrentPools(2, 1, 2, + (min, max) -> Tuples.of(PoolBuilder.from(allocator).sizeBetween(min, max).buildPool(), execsIter.next())); + + // allocate two resources (should work, but since alloc is done asynchronously, wait a bit, + // else, the test may fail if the system is slow, like in the CI. + PooledRef pooledRef1 = pool.acquire(Duration.ofMillis(1000)).block(Duration.ofSeconds(3)); + assertThat(pooledRef1).isNotNull(); + PooledRef pooledRef2 = pool.acquire(Duration.ofMillis(1000)).block(Duration.ofSeconds(3)); + assertThat(pooledRef2).isNotNull(); + + //error, we should get a timed out + pool.acquire(Duration.ofMillis(1)) + .as(StepVerifier::create) + .expectError(PoolAcquireTimeoutException.class) + .verify(Duration.ofSeconds(1)); + + pooledRef1.release().block(Duration.ofSeconds(1)); + pooledRef2.release().block(Duration.ofSeconds(1)); + } + + @Test + void workstealingPoolBench() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(LOOPS); + Flux.range(0, LOOPS) + .flatMap(i -> pool.withPoolable(piCalculator -> Mono.just(piCalculator.calculatePi(3000))) + , LOOPS) + .doOnNext(pi -> latch.countDown()) + .subscribe(); + + assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue(); + assertThat(pool.metrics().stealCount() > 0).isTrue(); + } +}