Skip to content

Commit

Permalink
Experiment work stealing pools
Browse files Browse the repository at this point in the history
  • Loading branch information
pderop committed Nov 26, 2023
1 parent 6f350ef commit 2ef6a19
Show file tree
Hide file tree
Showing 7 changed files with 671 additions and 7 deletions.
2 changes: 2 additions & 0 deletions reactor-pool/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ task japicmp(type: JapicmpTask) {
classExcludes = [
]
methodExcludes = [
"reactor.pool.decorators.InstrumentedPoolDecorators#concurrentPools(int, java.util.function.Function)",
"reactor.pool.decorators.InstrumentedPoolDecorators#concurrentPools(int, int, int, java.util.function.BiFunction)"
]
}
check.dependsOn japicmp
Expand Down
22 changes: 16 additions & 6 deletions reactor-pool/src/main/java/reactor/pool/AbstractPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -390,7 +391,7 @@ static final class Borrower<POOLABLE> extends AtomicBoolean implements Scannable
static final Disposable TIMEOUT_DISPOSED = Disposables.disposed();

final CoreSubscriber<? super AbstractPooledRef<POOLABLE>> actual;
final AbstractPool<POOLABLE> pool;
final AtomicReference<AbstractPool<POOLABLE>> pool;
final Duration pendingAcquireTimeout;

long pendingAcquireStart;
Expand All @@ -400,7 +401,7 @@ static final class Borrower<POOLABLE> extends AtomicBoolean implements Scannable
AbstractPool<POOLABLE> pool,
Duration pendingAcquireTimeout) {
this.actual = actual;
this.pool = pool;
this.pool = new AtomicReference<>(pool);
this.pendingAcquireTimeout = pendingAcquireTimeout;
this.timeoutTask = TIMEOUT_DISPOSED;
}
Expand All @@ -414,7 +415,7 @@ public void run() {
if (Borrower.this.compareAndSet(false, true)) {
// this is failure, a timeout was observed
stopPendingCountdown(false);
pool.cancelAcquire(Borrower.this);
pool().cancelAcquire(Borrower.this);
actual.onError(new PoolAcquireTimeoutException(pendingAcquireTimeout));
}
}
Expand All @@ -423,13 +424,13 @@ public void run() {
public void request(long n) {
if (Operators.validate(n)) {
//start the countdown

AbstractPool<POOLABLE> pool = pool();
boolean noIdle = pool.idleSize() == 0;
boolean noPermits = pool.poolConfig.allocationStrategy().estimatePermitCount() == 0;

if (!pendingAcquireTimeout.isZero() && noIdle && noPermits) {
pendingAcquireStart = pool.clock.millis();
timeoutTask = this.pool.config().pendingAcquireTimer().apply(this, pendingAcquireTimeout);
timeoutTask = pool.config().pendingAcquireTimer().apply(this, pendingAcquireTimeout);
}
//doAcquire should interrupt the countdown if there is either an available
//resource or the pool can allocate one
Expand All @@ -442,6 +443,7 @@ public void request(long n) {
*/
void stopPendingCountdown(boolean success) {
if (!timeoutTask.isDisposed()) {
AbstractPool<POOLABLE> pool = pool();
if (success) {
pool.metricsRecorder.recordPendingSuccessAndLatency(pool.clock.millis() - pendingAcquireStart);
} else {
Expand All @@ -454,7 +456,7 @@ void stopPendingCountdown(boolean success) {
@Override
public void cancel() {
set(true);
pool.cancelAcquire(this);
pool().cancelAcquire(this);
stopPendingCountdown(true); // this is not failure, the subscription was canceled
}

Expand Down Expand Up @@ -493,6 +495,14 @@ void fail(Throwable error) {
public String toString() {
return get() ? "Borrower(cancelled)" : "Borrower";
}

AbstractPool<POOLABLE> pool() {
return pool.get();
}

void setPool(AbstractPool<POOLABLE> replace) {
pool.set(replace);
}
}

}
19 changes: 19 additions & 0 deletions reactor-pool/src/main/java/reactor/pool/InstrumentedPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ public interface InstrumentedPool<POOLABLE> extends Pool<POOLABLE> {
*/
PoolMetrics metrics();

/**
* Steal some pending borrowers from another pool.
*
* @param fromPool another pool to steal resources from
* @return true if some borrowers have been moved from <code>fromPool</code> into this pool instance
*/
default boolean steal(InstrumentedPool<POOLABLE> fromPool) {
return false;
}

/**
* An object that can be used to get live information about a {@link Pool}, suitable
* for gauge metrics.
Expand Down Expand Up @@ -136,5 +146,14 @@ default boolean isInactiveForMoreThan(Duration duration) {
* @return the maximum number of pending acquire that can be enqueued by this {@link Pool}
*/
int getMaxPendingAcquireSize();

/**
* Get the number of borrowers steal count (only if the Pool supports work stealing).
*
* @return the number of Pool steal count
*/
default long stealCount() {
return 0;
}
}
}
18 changes: 18 additions & 0 deletions reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,24 @@ public Mono<PooledRef<POOLABLE>> acquire(Duration timeout) {
timeout); //the mono is unknown to the pool until requested
}

@Override
public boolean steal(InstrumentedPool<POOLABLE> pool) {
SimpleDequePool<POOLABLE> other = (SimpleDequePool<POOLABLE>) pool;

if (!other.isDisposed()) {
ConcurrentLinkedDeque<Borrower<POOLABLE>> q = other.pending;
Borrower<POOLABLE> b = other.pendingPoll(q);
if (b != null && !b.get()) {
// TODO check race conditions when timer expires or subscription is cancelled concurrently !
b.setPool(this);
doAcquire(b);
return true;
}
}

return false;
}

@Override
public int acquiredSize() {
return acquired;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2021-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,9 +17,13 @@
package reactor.pool.decorators;

import java.time.Duration;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.IntStream;

import reactor.pool.InstrumentedPool;
import reactor.util.function.Tuple2;

/**
* Utility class to expose various {@link InstrumentedPool} decorators, which can also be used
Expand All @@ -42,6 +46,70 @@ public static <T> GracefulShutdownInstrumentedPool<T> gracefulShutdown(Instrumen
return new GracefulShutdownInstrumentedPool<>(pool);
}

/**
* Creates a pool composed of multiple sub pools, each managing a portion of resources. Resource acquisitions will
* be concurrently distributed across sub pools using sub pool executors, in a work stealing style.
*
* @param size The number of sub pools to create.
* @param poolFactory A factory method creating sub pools called with the index of the sub pool to create.
* This BiFunction takes the index of the pool to create and returns a Tuple2 consisting of an
* InstrumentedPool and a dedicated single-thread Executor associated with that pool. Executors
* must not be shared between sub pools.
* @param <T> the type of resources in the pool
* @return a decorated concurrent InstrumentedPool that will distribute resource acquisitions across all sub pools in a work
* stealing way, using dedicated sub pool executors
*/
public static <T> InstrumentedPool<T> concurrentPools(int size,
Function<Integer, Tuple2<InstrumentedPool<T>, Executor>> poolFactory) {
@SuppressWarnings("unchecked")
Tuple2<InstrumentedPool<T>, Executor>[] subPools = IntStream.range(0, size)
.mapToObj(poolFactory::apply)
.toArray(Tuple2[]::new);
return new WorkStealingPool<>(subPools);
}

/**
* Creates a pool composed of multiple sub pools, each managing a portion of resources. Resource acquisitions will
* be concurrently distributed across sub pools using sub pool executors, in a work stealing style.
*
* @param size The number of sub pools to create.
* @param min The minimum number of resources to be kept by all sub pools collectively.
* @param max The maximum number of resources that can be allocated by all sub pools collectively.
* @param poolFactory A method that constructs sub pools. It takes adjusted min/max parameters specific to each sub
* pool to create and returns a Tuple2 comprising an InstrumentedPool and a dedicated
* single-thread Executor exclusively associated with that pool (Executors must not be shared
* between sub pools). The adjusted parameters ensure equitable resource distribution among sub
* pools. Remember, the key here is that the poolFactory is invoked for each sub pool creation
* with adjusted min/max parameters to maintain fairness across the pools.
* <p>
* <strong>Example:</strong><br>
* Suppose you're creating 3 sub pools with a global min=10 and max=100. The poolFactory is invoked 3 times, each
* time with adjusted min/max parameters to ensure fair distribution:
* <ul>
* <li>First invocation with min=4, max=34</li>
* <li>Second invocation: min=3, max=33</li>
* <li>Third invocation: min=3, max=33</li>
* </ul>
*
* @param <T> the type of resources in the pool
* @return a decorated concurrent InstrumentedPool which handles resource acquisitions concurrently across all sub pools
* utilizing dedicated sub pool executors.
*/
public static <T> InstrumentedPool<T> concurrentPools(int size, int min, int max,
BiFunction<Integer, Integer, Tuple2<InstrumentedPool<T>, Executor>> poolFactory) {
@SuppressWarnings("unchecked")
Tuple2<InstrumentedPool<T>, Executor>[] subPools = IntStream.range(0, size)
.mapToObj(poolIndex -> {
int minDiv = min / size;
int minMod = min % size;
int maxDiv = max / size;
int maxMod = max % size;
return poolFactory.apply(poolIndex < minMod ? minDiv + 1 : minDiv, poolIndex < maxMod ? maxDiv + 1 : maxDiv);
})
.toArray(Tuple2[]::new);
return new WorkStealingPool<>(subPools);
}

private InstrumentedPoolDecorators() { }

}
Loading

0 comments on commit 2ef6a19

Please sign in to comment.