1818import io .netty .util .ByteProcessor ;
1919import io .netty .util .NettyRuntime ;
2020import io .netty .util .ReferenceCounted ;
21+ import io .netty .util .concurrent .FastThreadLocal ;
2122import io .netty .util .concurrent .FastThreadLocalThread ;
2223import io .netty .util .internal .ObjectPool ;
2324import io .netty .util .internal .ObjectUtil ;
2425import io .netty .util .internal .PlatformDependent ;
2526import io .netty .util .internal .SuppressJava6Requirement ;
2627import io .netty .util .internal .SystemPropertyUtil ;
28+ import io .netty .util .internal .ThreadExecutorMap ;
2729import io .netty .util .internal .UnstableApi ;
2830
2931import java .io .IOException ;
3739import java .nio .channels .ScatteringByteChannel ;
3840import java .util .Arrays ;
3941import java .util .Queue ;
42+ import java .util .Set ;
4043import java .util .concurrent .ConcurrentLinkedQueue ;
44+ import java .util .concurrent .CopyOnWriteArraySet ;
4145import java .util .concurrent .atomic .AtomicLong ;
4246import java .util .concurrent .atomic .AtomicReferenceFieldUpdater ;
4347import java .util .concurrent .locks .StampedLock ;
7377@ SuppressJava6Requirement (reason = "Guarded by version check" )
7478@ UnstableApi
7579final class AdaptivePoolingAllocator {
80+
81+ enum MagazineCaching {
82+ EventLoopThreads ,
83+ FastThreadLocalThreads ,
84+ None
85+ }
86+
87+ private static final int EXPANSION_ATTEMPTS = 3 ;
88+ private static final int INITIAL_MAGAZINES = 4 ;
7689 private static final int RETIRE_CAPACITY = 4 * 1024 ;
7790 private static final int MIN_CHUNK_SIZE = 128 * 1024 ;
7891 private static final int MAX_STRIPES = NettyRuntime .availableProcessors () * 2 ;
@@ -97,20 +110,55 @@ final class AdaptivePoolingAllocator {
97110 private static final int CENTRAL_QUEUE_CAPACITY = SystemPropertyUtil .getInt (
98111 "io.netty.allocator.centralQueueCapacity" , NettyRuntime .availableProcessors ());
99112
113+ private static final Object NO_MAGAZINE = Boolean .TRUE ;
114+
100115 private final ChunkAllocator chunkAllocator ;
101116 private final Queue <ChunkByteBuf > centralQueue ;
102117 private final StampedLock magazineExpandLock ;
103118 private volatile Magazine [] magazines ;
119+ private final FastThreadLocal <Object > threadLocalMagazine ;
120+ private final Set <Magazine > liveCachedMagazines ;
104121
105- AdaptivePoolingAllocator (ChunkAllocator chunkAllocator ) {
122+ AdaptivePoolingAllocator (ChunkAllocator chunkAllocator , MagazineCaching magazineCaching ) {
123+ ObjectUtil .checkNotNull (chunkAllocator , "chunkAllocator" );
124+ ObjectUtil .checkNotNull (magazineCaching , "magazineCaching" );
106125 this .chunkAllocator = chunkAllocator ;
107126 if (javaVersion () < 8 ) {
108127 // The implementation uses StampedLock, which was introduced in Java 8.
109128 throw new IllegalStateException ("This allocator require Java 8 or newer." );
110129 }
111130 centralQueue = ObjectUtil .checkNotNull (createSharedChunkQueue (), "centralQueue" );
112131 magazineExpandLock = new StampedLock ();
113- Magazine [] mags = new Magazine [4 ];
132+ if (magazineCaching != MagazineCaching .None ) {
133+ assert magazineCaching == MagazineCaching .EventLoopThreads ||
134+ magazineCaching == MagazineCaching .FastThreadLocalThreads ;
135+ final boolean cachedMagazinesNonEventLoopThreads =
136+ magazineCaching == MagazineCaching .FastThreadLocalThreads ;
137+ final Set <Magazine > liveMagazines = new CopyOnWriteArraySet <Magazine >();
138+ threadLocalMagazine = new FastThreadLocal <Object >() {
139+ @ Override
140+ protected Object initialValue () {
141+ if (cachedMagazinesNonEventLoopThreads || ThreadExecutorMap .currentExecutor () != null ) {
142+ Magazine mag = new Magazine (AdaptivePoolingAllocator .this , false );
143+ liveMagazines .add (mag );
144+ return mag ;
145+ }
146+ return NO_MAGAZINE ;
147+ }
148+
149+ @ Override
150+ protected void onRemoval (final Object value ) throws Exception {
151+ if (value != NO_MAGAZINE ) {
152+ liveMagazines .remove (value );
153+ }
154+ }
155+ };
156+ liveCachedMagazines = liveMagazines ;
157+ } else {
158+ threadLocalMagazine = null ;
159+ liveCachedMagazines = null ;
160+ }
161+ Magazine [] mags = new Magazine [INITIAL_MAGAZINES ];
114162 for (int i = 0 ; i < mags .length ; i ++) {
115163 mags [i ] = new Magazine (this );
116164 }
@@ -158,8 +206,15 @@ ByteBuf allocate(int size, int maxCapacity) {
158206 }
159207
160208 private AdaptiveByteBuf allocate (int size , int maxCapacity , Thread currentThread , AdaptiveByteBuf buf ) {
161- long threadId = currentThread .getId ();
162209 int sizeBucket = AllocationStatistics .sizeBucket (size ); // Compute outside of Magazine lock for better ILP.
210+ FastThreadLocal <Object > threadLocalMagazine = this .threadLocalMagazine ;
211+ if (threadLocalMagazine != null && currentThread instanceof FastThreadLocalThread ) {
212+ Object mag = threadLocalMagazine .get ();
213+ if (mag != NO_MAGAZINE ) {
214+ return ((Magazine ) mag ).allocate (size , sizeBucket , maxCapacity , buf );
215+ }
216+ }
217+ long threadId = currentThread .getId ();
163218 Magazine [] mags ;
164219 int expansions = 0 ;
165220 do {
@@ -178,7 +233,7 @@ private AdaptiveByteBuf allocate(int size, int maxCapacity, Thread currentThread
178233 }
179234 }
180235 expansions ++;
181- } while (expansions <= 3 && tryExpandMagazines (mags .length ));
236+ } while (expansions <= EXPANSION_ATTEMPTS && tryExpandMagazines (mags .length ));
182237 return null ;
183238 }
184239
@@ -204,6 +259,11 @@ long usedMemory() {
204259 for (Magazine magazine : magazines ) {
205260 sum += magazine .usedMemory .get ();
206261 }
262+ if (liveCachedMagazines != null ) {
263+ for (Magazine magazine : liveCachedMagazines ) {
264+ sum += magazine .usedMemory .get ();
265+ }
266+ }
207267 return sum ;
208268 }
209269
@@ -247,6 +307,7 @@ private static class AllocationStatistics extends StampedLock {
247307 private static final int HISTO_MAX_BUCKET_MASK = HISTO_BUCKET_COUNT - 1 ;
248308
249309 protected final AdaptivePoolingAllocator parent ;
310+ private final boolean shareable ;
250311 private final short [][] histos = {
251312 new short [HISTO_BUCKET_COUNT ], new short [HISTO_BUCKET_COUNT ],
252313 new short [HISTO_BUCKET_COUNT ], new short [HISTO_BUCKET_COUNT ],
@@ -260,8 +321,9 @@ private static class AllocationStatistics extends StampedLock {
260321 private volatile int sharedPrefChunkSize = MIN_CHUNK_SIZE ;
261322 protected volatile int localPrefChunkSize = MIN_CHUNK_SIZE ;
262323
263- private AllocationStatistics (AdaptivePoolingAllocator parent ) {
324+ private AllocationStatistics (AdaptivePoolingAllocator parent , boolean shareable ) {
264325 this .parent = parent ;
326+ this .shareable = shareable ;
265327 }
266328
267329 protected void recordAllocationSize (int bucket ) {
@@ -300,8 +362,10 @@ private void rotateHistograms() {
300362 int percentileSize = 1 << sizeBucket + HISTO_MIN_BUCKET_SHIFT ;
301363 int prefChunkSize = Math .max (percentileSize * BUFS_PER_CHUNK , MIN_CHUNK_SIZE );
302364 localPrefChunkSize = prefChunkSize ;
303- for (Magazine mag : parent .magazines ) {
304- prefChunkSize = Math .max (prefChunkSize , mag .localPrefChunkSize );
365+ if (shareable ) {
366+ for (Magazine mag : parent .magazines ) {
367+ prefChunkSize = Math .max (prefChunkSize , mag .localPrefChunkSize );
368+ }
305369 }
306370 if (sharedPrefChunkSize != prefChunkSize ) {
307371 // Preferred chunk size changed. Increase check frequency.
@@ -344,7 +408,11 @@ private static final class Magazine extends AllocationStatistics {
344408 private final AtomicLong usedMemory ;
345409
346410 Magazine (AdaptivePoolingAllocator parent ) {
347- super (parent );
411+ this (parent , true );
412+ }
413+
414+ Magazine (AdaptivePoolingAllocator parent , boolean shareable ) {
415+ super (parent , shareable );
348416 usedMemory = new AtomicLong ();
349417 }
350418
0 commit comments