1717 */
1818package io .netty .buffer ;
1919
20- import io . netty . util .internal . StringUtil ;
20+ import static org . apache . arrow . memory . util .AssertionUtil . ASSERT_ENABLED ;
2121
2222import java .lang .reflect .Field ;
2323import java .nio .ByteBuffer ;
2424import java .util .concurrent .atomic .AtomicLong ;
2525
2626import org .apache .arrow .memory .OutOfMemoryException ;
2727
28- import com .codahale .metrics .Gauge ;
29- import com .codahale .metrics .Histogram ;
30- import com .codahale .metrics .Metric ;
31- import com .codahale .metrics .MetricFilter ;
32- import com .codahale .metrics .MetricRegistry ;
28+ import io .netty .util .internal .StringUtil ;
3329
3430/**
3531 * The base allocator that we use for all of Arrow's memory management. Returns UnsafeDirectLittleEndian buffers.
3632 */
3733public class PooledByteBufAllocatorL {
38- private static final org .slf4j .Logger memoryLogger = org .slf4j .LoggerFactory .getLogger ("drill .allocator" );
34+ private static final org .slf4j .Logger memoryLogger = org .slf4j .LoggerFactory .getLogger ("arrow .allocator" );
3935
4036 private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60 ;
4137
42-
43- public static final String METRIC_PREFIX = "drill.allocator." ;
44-
45- private final MetricRegistry registry ;
4638 private final AtomicLong hugeBufferSize = new AtomicLong (0 );
4739 private final AtomicLong hugeBufferCount = new AtomicLong (0 );
4840 private final AtomicLong normalBufferSize = new AtomicLong (0 );
@@ -51,8 +43,7 @@ public class PooledByteBufAllocatorL {
5143 private final InnerAllocator allocator ;
5244 public final UnsafeDirectLittleEndian empty ;
5345
54- public PooledByteBufAllocatorL (MetricRegistry registry ) {
55- this .registry = registry ;
46+ public PooledByteBufAllocatorL () {
5647 allocator = new InnerAllocator ();
5748 empty = new UnsafeDirectLittleEndian (new DuplicatedByteBuf (Unpooled .EMPTY_BUFFER ));
5849 }
@@ -70,13 +61,66 @@ public int getChunkSize() {
7061 return allocator .chunkSize ;
7162 }
7263
73- private class InnerAllocator extends PooledByteBufAllocator {
64+ public long getHugeBufferSize () {
65+ return hugeBufferSize .get ();
66+ }
7467
68+ public long getHugeBufferCount () {
69+ return hugeBufferCount .get ();
70+ }
7571
72+ public long getNormalBufferSize () {
73+ return normalBufferSize .get ();
74+ }
75+
76+ public long getNormalBufferCount () {
77+ return normalBufferSize .get ();
78+ }
79+
80+ private static class AccountedUnsafeDirectLittleEndian extends UnsafeDirectLittleEndian {
81+ private final long initialCapacity ;
82+ private final AtomicLong count ;
83+ private final AtomicLong size ;
84+
85+ private AccountedUnsafeDirectLittleEndian (LargeBuffer buf , AtomicLong count , AtomicLong size ) {
86+ super (buf );
87+ this .initialCapacity = buf .capacity ();
88+ this .count = count ;
89+ this .size = size ;
90+ }
91+
92+ private AccountedUnsafeDirectLittleEndian (PooledUnsafeDirectByteBuf buf , AtomicLong count , AtomicLong size ) {
93+ super (buf );
94+ this .initialCapacity = buf .capacity ();
95+ this .count = count ;
96+ this .size = size ;
97+ }
98+
99+ @ Override
100+ public ByteBuf copy () {
101+ throw new UnsupportedOperationException ("copy method is not supported" );
102+ }
103+
104+ @ Override
105+ public ByteBuf copy (int index , int length ) {
106+ throw new UnsupportedOperationException ("copy method is not supported" );
107+ }
108+
109+ @ Override
110+ public boolean release (int decrement ) {
111+ boolean released = super .release (decrement );
112+ if (released ) {
113+ count .decrementAndGet ();
114+ size .addAndGet (-initialCapacity );
115+ }
116+ return released ;
117+ }
118+
119+ }
120+
121+ private class InnerAllocator extends PooledByteBufAllocator {
76122 private final PoolArena <ByteBuffer >[] directArenas ;
77123 private final MemoryStatusThread statusThread ;
78- private final Histogram largeBuffersHist ;
79- private final Histogram normalBuffersHist ;
80124 private final int chunkSize ;
81125
82126 public InnerAllocator () {
@@ -98,50 +142,6 @@ public InnerAllocator() {
98142 } else {
99143 statusThread = null ;
100144 }
101- removeOldMetrics ();
102-
103- registry .register (METRIC_PREFIX + "normal.size" , new Gauge <Long >() {
104- @ Override
105- public Long getValue () {
106- return normalBufferSize .get ();
107- }
108- });
109-
110- registry .register (METRIC_PREFIX + "normal.count" , new Gauge <Long >() {
111- @ Override
112- public Long getValue () {
113- return normalBufferCount .get ();
114- }
115- });
116-
117- registry .register (METRIC_PREFIX + "huge.size" , new Gauge <Long >() {
118- @ Override
119- public Long getValue () {
120- return hugeBufferSize .get ();
121- }
122- });
123-
124- registry .register (METRIC_PREFIX + "huge.count" , new Gauge <Long >() {
125- @ Override
126- public Long getValue () {
127- return hugeBufferCount .get ();
128- }
129- });
130-
131- largeBuffersHist = registry .histogram (METRIC_PREFIX + "huge.hist" );
132- normalBuffersHist = registry .histogram (METRIC_PREFIX + "normal.hist" );
133-
134- }
135-
136-
137- private synchronized void removeOldMetrics () {
138- registry .removeMatching (new MetricFilter () {
139- @ Override
140- public boolean matches (String name , Metric metric ) {
141- return name .startsWith ("drill.allocator." );
142- }
143-
144- });
145145 }
146146
147147 private UnsafeDirectLittleEndian newDirectBufferL (int initialCapacity , int maxCapacity ) {
@@ -154,27 +154,26 @@ private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCa
154154 // This is beyond chunk size so we'll allocate separately.
155155 ByteBuf buf = UnpooledByteBufAllocator .DEFAULT .directBuffer (initialCapacity , maxCapacity );
156156
157- hugeBufferCount .incrementAndGet ();
158157 hugeBufferSize .addAndGet (buf .capacity ());
159- largeBuffersHist .update (buf .capacity ());
160- // logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception());
161- return new UnsafeDirectLittleEndian (new LargeBuffer (buf , hugeBufferSize , hugeBufferCount ));
158+ hugeBufferCount .incrementAndGet ();
162159
160+ // logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception());
161+ return new AccountedUnsafeDirectLittleEndian (new LargeBuffer (buf ), hugeBufferCount , hugeBufferSize );
163162 } else {
164163 // within chunk, use arena.
165164 ByteBuf buf = directArena .allocate (cache , initialCapacity , maxCapacity );
166165 if (!(buf instanceof PooledUnsafeDirectByteBuf )) {
167166 fail ();
168167 }
169168
170- normalBuffersHist .update (buf .capacity ());
171- if (ASSERT_ENABLED ) {
172- normalBufferSize .addAndGet (buf .capacity ());
173- normalBufferCount .incrementAndGet ();
169+ if (!ASSERT_ENABLED ) {
170+ return new UnsafeDirectLittleEndian ((PooledUnsafeDirectByteBuf ) buf );
174171 }
175172
176- return new UnsafeDirectLittleEndian ((PooledUnsafeDirectByteBuf ) buf , normalBufferCount ,
177- normalBufferSize );
173+ normalBufferSize .addAndGet (buf .capacity ());
174+ normalBufferCount .incrementAndGet ();
175+
176+ return new AccountedUnsafeDirectLittleEndian ((PooledUnsafeDirectByteBuf ) buf , normalBufferCount , normalBufferSize );
178177 }
179178
180179 } else {
@@ -184,9 +183,10 @@ private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCa
184183
185184 private UnsupportedOperationException fail () {
186185 return new UnsupportedOperationException (
187- "Arrow requries that the JVM used supports access sun.misc.Unsafe. This platform didn't provide that functionality." );
186+ "Arrow requires that the JVM used supports access sun.misc.Unsafe. This platform didn't provide that functionality." );
188187 }
189188
189+ @ Override
190190 public UnsafeDirectLittleEndian directBuffer (int initialCapacity , int maxCapacity ) {
191191 if (initialCapacity == 0 && maxCapacity == 0 ) {
192192 newDirectBuffer (initialCapacity , maxCapacity );
@@ -215,9 +215,8 @@ private void validate(int initialCapacity, int maxCapacity) {
215215 private class MemoryStatusThread extends Thread {
216216
217217 public MemoryStatusThread () {
218- super ("memory-status- logger" );
218+ super ("allocation. logger" );
219219 this .setDaemon (true );
220- this .setName ("allocation.logger" );
221220 }
222221
223222 @ Override
@@ -229,12 +228,11 @@ public void run() {
229228 } catch (InterruptedException e ) {
230229 return ;
231230 }
232-
233231 }
234232 }
235-
236233 }
237234
235+ @ Override
238236 public String toString () {
239237 StringBuilder buf = new StringBuilder ();
240238 buf .append (directArenas .length );
@@ -260,13 +258,4 @@ public String toString() {
260258
261259
262260 }
263-
264- public static final boolean ASSERT_ENABLED ;
265-
266- static {
267- boolean isAssertEnabled = false ;
268- assert isAssertEnabled = true ;
269- ASSERT_ENABLED = isAssertEnabled ;
270- }
271-
272261}
0 commit comments