Skip to content

Commit ceda23e

Browse files
committed
First draft of implementing a new blocking queue
This queue is implemented using a pair of modified unpadded MpmcUnboundedXaddArrayQueues from JCTools. One queue is used for messages, while another is used for the wait-set. Since the wait-set is a queue, we get fairness from our blocking methods. A few modifications were necessary, in order to implement a wait-set using the queue. Most importantly, the 'offer' method now returns the claimed producer index, and a pair of 'casEntry' and 'hasEntry' methods has been added, which allow blocking threads to manage their wait-set entries. The queues have also gotten a 'count' method, which allow us to implement some inspection methods that Stormpot needs.
1 parent ec3b692 commit ceda23e

14 files changed

+813
-72
lines changed

src/main/java/stormpot/internal/AllocationProcess.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
import stormpot.Poolable;
1919

20-
import java.util.concurrent.LinkedTransferQueue;
21-
2220
import static stormpot.internal.AllocationProcessMode.DIRECT;
2321
import static stormpot.internal.AllocationProcessMode.INLINE;
2422
import static stormpot.internal.AllocationProcessMode.THREADED;
@@ -35,7 +33,7 @@ public static AllocationProcess threaded() {
3533
return new AllocationProcess(THREADED) {
3634
@Override
3735
<T extends Poolable> AllocationController<T> buildAllocationController(
38-
LinkedTransferQueue<BSlot<T>> live,
36+
MpmcChunkedBlockingQueue<BSlot<T>> live,
3937
RefillPile<T> disregardPile,
4038
RefillPile<T> newAllocations,
4139
PoolBuilderImpl<T> builder,
@@ -54,7 +52,7 @@ public static AllocationProcess inline() {
5452
return new AllocationProcess(INLINE) {
5553
@Override
5654
<T extends Poolable> AllocationController<T> buildAllocationController(
57-
LinkedTransferQueue<BSlot<T>> live,
55+
MpmcChunkedBlockingQueue<BSlot<T>> live,
5856
RefillPile<T> disregardPile,
5957
RefillPile<T> newAllocations,
6058
PoolBuilderImpl<T> builder,
@@ -73,7 +71,7 @@ public static AllocationProcess direct() {
7371
return new AllocationProcess(DIRECT) {
7472
@Override
7573
<T extends Poolable> AllocationController<T> buildAllocationController(
76-
LinkedTransferQueue<BSlot<T>> live,
74+
MpmcChunkedBlockingQueue<BSlot<T>> live,
7775
RefillPile<T> disregardPile,
7876
RefillPile<T> newAllocations,
7977
PoolBuilderImpl<T> builder,
@@ -103,7 +101,7 @@ public AllocationProcessMode getMode() {
103101
}
104102

105103
abstract <T extends Poolable> AllocationController<T> buildAllocationController(
106-
LinkedTransferQueue<BSlot<T>> live,
104+
MpmcChunkedBlockingQueue<BSlot<T>> live,
107105
RefillPile<T> disregardPile,
108106
RefillPile<T> newAllocations,
109107
PoolBuilderImpl<T> builder,

src/main/java/stormpot/internal/BAllocThread.java

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@
2424

2525
import java.util.ArrayList;
2626
import java.util.List;
27-
import java.util.concurrent.BlockingQueue;
28-
import java.util.concurrent.LinkedTransferQueue;
2927
import java.util.concurrent.TimeUnit;
3028
import java.util.concurrent.atomic.AtomicLong;
3129
import java.util.concurrent.locks.LockSupport;
@@ -47,7 +45,7 @@ public final class BAllocThread<T extends Poolable> implements Runnable {
4745
*/
4846
private static final long shutdownPauseNanos = MILLISECONDS.toNanos(10);
4947

50-
private final LinkedTransferQueue<BSlot<T>> live;
48+
private final MpmcChunkedBlockingQueue<BSlot<T>> live;
5149
private final RefillPile<T> disregardPile;
5250
private final RefillPile<T> newAllocations;
5351
private final BSlot<T> poisonPill;
@@ -56,11 +54,11 @@ public final class BAllocThread<T extends Poolable> implements Runnable {
5654
private final boolean backgroundExpirationEnabled;
5755
private final PreciseLeakDetector leakDetector;
5856
private final StackCompletion shutdownCompletion;
59-
private final BlockingQueue<BSlot<T>> dead;
57+
private final MpmcChunkedBlockingQueue<BSlot<T>> dead;
6058
private final AtomicLong poisonedSlots;
6159
private final long defaultDeadPollTimeout;
6260
private final boolean optimizeForMemory;
63-
private final LinkedTransferQueue<AllocatorSwitch<T>> switchRequests;
61+
private final MpmcChunkedBlockingQueue<AllocatorSwitch<T>> switchRequests;
6462

6563
// Single reader: this. Many writers.
6664
private volatile long targetSize;
@@ -78,7 +76,7 @@ public final class BAllocThread<T extends Poolable> implements Runnable {
7876
private long priorGenerationObjectsToReplace;
7977

8078
BAllocThread(
81-
LinkedTransferQueue<BSlot<T>> live,
79+
MpmcChunkedBlockingQueue<BSlot<T>> live,
8280
RefillPile<T> disregardPile,
8381
RefillPile<T> newAllocations,
8482
PoolBuilderImpl<T> builder,
@@ -95,11 +93,11 @@ public final class BAllocThread<T extends Poolable> implements Runnable {
9593
this.leakDetector = builder.isPreciseLeakDetectionEnabled() ?
9694
new PreciseLeakDetector() : null;
9795
this.shutdownCompletion = new StackCompletion();
98-
this.dead = new LinkedTransferQueue<>();
96+
this.dead = new MpmcChunkedBlockingQueue<>();
9997
this.poisonedSlots = new AtomicLong();
10098
this.defaultDeadPollTimeout = builder.getBackgroundExpirationCheckDelay();
10199
this.optimizeForMemory = builder.isOptimizeForReducedMemoryUsage();
102-
switchRequests = new LinkedTransferQueue<>();
100+
switchRequests = new MpmcChunkedBlockingQueue<>();
103101
this.size = 0;
104102
this.didAnythingLastIteration = true; // start out busy
105103
}
@@ -479,14 +477,8 @@ long allocatedSize() {
479477
}
480478

481479
long inUse() {
482-
long inUse = 0;
483-
long liveSize = 0;
484-
for (BSlot<T> slot: live) {
485-
liveSize++;
486-
if (slot.isClaimedOrThreadLocal()) {
487-
inUse++;
488-
}
489-
}
480+
long inUse = live.count(BSlot::isClaimedOrThreadLocal);
481+
long liveSize = live.size();
490482
return size - liveSize + inUse;
491483
}
492484
}

src/main/java/stormpot/internal/BSlot.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.lang.invoke.MethodHandles;
2525
import java.lang.invoke.VarHandle;
2626
import java.lang.ref.Reference;
27-
import java.util.concurrent.BlockingQueue;
2827
import java.util.concurrent.TimeUnit;
2928
import java.util.concurrent.atomic.AtomicLong;
3029

@@ -53,7 +52,7 @@ public class BSlot<T extends Poolable> implements Slot, SlotInfo<T> {
5352
@SuppressWarnings("FieldMayBeFinal")
5453
private volatile int state;
5554

56-
final BlockingQueue<BSlot<T>> live;
55+
final MpmcChunkedBlockingQueue<BSlot<T>> live;
5756
final AtomicLong poisonedSlots;
5857
long stamp;
5958
long createdNanos;
@@ -81,7 +80,7 @@ public class BSlot<T extends Poolable> implements Slot, SlotInfo<T> {
8180
* @param live The queue of live slots.
8281
* @param poisonedSlots The counter of poisoned slots.
8382
*/
84-
public BSlot(BlockingQueue<BSlot<T>> live, AtomicLong poisonedSlots) {
83+
public BSlot(MpmcChunkedBlockingQueue<BSlot<T>> live, AtomicLong poisonedSlots) {
8584
// Volatile write in the constructor: This object must be safely published,
8685
// so that we are sure that the volatile write happens-before other
8786
// threads observe the pointer to this object.

src/main/java/stormpot/internal/BSlotPadded.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
import stormpot.Poolable;
1919

20-
import java.util.concurrent.BlockingQueue;
2120
import java.util.concurrent.atomic.AtomicLong;
2221

2322
/**
@@ -42,7 +41,7 @@ public class BSlotPadded<T extends Poolable> extends BSlot<T> {
4241
* @param live The queue of live slots.
4342
* @param poisonedSlots The counter of poisoned slots.
4443
*/
45-
public BSlotPadded(BlockingQueue<BSlot<T>> live, AtomicLong poisonedSlots) {
44+
public BSlotPadded(MpmcChunkedBlockingQueue<BSlot<T>> live, AtomicLong poisonedSlots) {
4645
super(live, poisonedSlots);
4746
}
4847
}

src/main/java/stormpot/internal/BlazePool.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import java.lang.invoke.MethodHandles;
3131
import java.lang.invoke.VarHandle;
3232
import java.util.Objects;
33-
import java.util.concurrent.LinkedTransferQueue;
3433
import java.util.concurrent.TimeUnit;
3534

3635
/**
@@ -71,7 +70,7 @@ public final class BlazePool<T extends Poolable> implements Pool<T>, ManagedPool
7170
private static final Exception SHUTDOWN_POISON = new PoisonException("Stormpot Poison: Shutdown");
7271
static final Exception EXPLICIT_EXPIRE_POISON = new PoisonException("Stormpot Poison: Expired");
7372

74-
private final LinkedTransferQueue<BSlot<T>> live;
73+
private final MpmcChunkedBlockingQueue<BSlot<T>> live;
7574
private final RefillPile<T> disregardPile;
7675
private final RefillPile<T> newAllocations;
7776
private final AllocationController<T> allocator;
@@ -96,7 +95,7 @@ public final class BlazePool<T extends Poolable> implements Pool<T>, ManagedPool
9695
* @param factory The allocation process that builds the {@link AllocationController} used by this pool.
9796
*/
9897
public BlazePool(PoolBuilderImpl<T> builder, AllocationProcess factory) {
99-
live = new LinkedTransferQueue<>();
98+
live = new MpmcChunkedBlockingQueue<>();
10099
disregardPile = new RefillPile<>(live);
101100
newAllocations = new RefillPile<>(live);
102101
optimizeForMemory = builder.isOptimizeForReducedMemoryUsage();

src/main/java/stormpot/internal/DirectAllocationController.java

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import stormpot.PoolBuilder;
2121
import stormpot.Poolable;
2222

23-
import java.util.concurrent.LinkedTransferQueue;
2423
import java.util.concurrent.TimeUnit;
2524
import java.util.concurrent.atomic.AtomicLong;
2625

@@ -29,15 +28,15 @@
2928
* @param <T> The concrete poolable type.
3029
*/
3130
public final class DirectAllocationController<T extends Poolable> extends AllocationController<T> {
32-
private final LinkedTransferQueue<BSlot<T>> live;
31+
private final MpmcChunkedBlockingQueue<BSlot<T>> live;
3332
private final BSlot<T> poisonPill;
3433
private final long size;
3534
private final AtomicLong shutdownState;
3635
private final AtomicLong poisonedSlots;
3736
private final StackCompletion shutdownCompletion;
3837

3938
DirectAllocationController(
40-
LinkedTransferQueue<BSlot<T>> live,
39+
MpmcChunkedBlockingQueue<BSlot<T>> live,
4140
RefillPile<T> disregardPile,
4241
PoolBuilder<T> builder,
4342
BSlot<T> poisonPill) {
@@ -141,14 +140,8 @@ public long allocatedSize() {
141140

142141
@Override
143142
long inUse() {
144-
long inUse = 0;
145-
long liveSize = 0;
146-
for (BSlot<T> slot: live) {
147-
liveSize++;
148-
if (slot.isClaimedOrThreadLocal()) {
149-
inUse++;
150-
}
151-
}
143+
long inUse = live.count(BSlot::isClaimedOrThreadLocal);
144+
long liveSize = live.size();
152145
return size - liveSize + inUse;
153146
}
154147
}

src/main/java/stormpot/internal/InlineAllocationController.java

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.lang.invoke.VarHandle;
2727
import java.util.ArrayList;
2828
import java.util.List;
29-
import java.util.concurrent.LinkedTransferQueue;
3029
import java.util.concurrent.TimeUnit;
3130
import java.util.concurrent.atomic.AtomicLong;
3231

@@ -54,7 +53,7 @@ public final class InlineAllocationController<T extends Poolable> extends Alloca
5453
}
5554
}
5655

57-
private final LinkedTransferQueue<BSlot<T>> live;
56+
private final MpmcChunkedBlockingQueue<BSlot<T>> live;
5857
private final RefillPile<T> disregardPile;
5958
private final RefillPile<T> newAllocations;
6059
private final BSlot<T> poisonPill;
@@ -63,7 +62,7 @@ public final class InlineAllocationController<T extends Poolable> extends Alloca
6362
private final PreciseLeakDetector leakDetector;
6463
private final boolean optimizeForMemory;
6564
private final StackCompletion shutdownCompletion;
66-
private final LinkedTransferQueue<AllocatorSwitch<T>> switchRequests;
65+
private final MpmcChunkedBlockingQueue<AllocatorSwitch<T>> switchRequests;
6766

6867
private volatile long targetSize;
6968
@SuppressWarnings("unused") // Assigned via VarHandle.
@@ -79,7 +78,7 @@ public final class InlineAllocationController<T extends Poolable> extends Alloca
7978
private long priorGenerationObjectsToReplace;
8079

8180
InlineAllocationController(
82-
LinkedTransferQueue<BSlot<T>> live,
81+
MpmcChunkedBlockingQueue<BSlot<T>> live,
8382
RefillPile<T> disregardPile,
8483
RefillPile<T> newAllocations,
8584
PoolBuilderImpl<T> builder,
@@ -94,7 +93,7 @@ public final class InlineAllocationController<T extends Poolable> extends Alloca
9493
optimizeForMemory = builder.isOptimizeForReducedMemoryUsage();
9594
leakDetector = builder.isPreciseLeakDetectionEnabled() ?
9695
new PreciseLeakDetector() : null;
97-
switchRequests = new LinkedTransferQueue<>();
96+
switchRequests = new MpmcChunkedBlockingQueue<>();
9897
setTargetSize(builder.getSize());
9998
shutdownCompletion = new StackCompletion(this::shutdownCompletion);
10099
}
@@ -475,14 +474,8 @@ public long allocatedSize() {
475474

476475
@Override
477476
long inUse() {
478-
long inUse = 0;
479-
long liveSize = 0;
480-
for (BSlot<T> slot: live) {
481-
liveSize++;
482-
if (slot.isClaimedOrThreadLocal()) {
483-
inUse++;
484-
}
485-
}
477+
long inUse = live.count(BSlot::isClaimedOrThreadLocal);
478+
long liveSize = live.size();
486479
return size - liveSize + inUse;
487480
}
488481
}

0 commit comments

Comments
 (0)