Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions java/memory/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,6 @@
<name>Arrow Memory</name>

<dependencies>

<dependency>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>3.0.1</version>
</dependency>

<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
Expand Down
31 changes: 2 additions & 29 deletions java/memory/src/main/java/io/netty/buffer/LargeBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,43 +17,16 @@
*/
package io.netty.buffer;

import java.util.concurrent.atomic.AtomicLong;

/**
* A MutableWrappedByteBuf that also maintains a metric of the number of huge buffer bytes and counts.
*/
public class LargeBuffer extends MutableWrappedByteBuf {

private final AtomicLong hugeBufferSize;
private final AtomicLong hugeBufferCount;

private final int initCap;

public LargeBuffer(ByteBuf buffer, AtomicLong hugeBufferSize, AtomicLong hugeBufferCount) {
public LargeBuffer(ByteBuf buffer) {
super(buffer);
initCap = buffer.capacity();
this.hugeBufferCount = hugeBufferCount;
this.hugeBufferSize = hugeBufferSize;
}

@Override
public ByteBuf copy(int index, int length) {
return new LargeBuffer(buffer.copy(index, length), hugeBufferSize, hugeBufferCount);
return new LargeBuffer(buffer.copy(index, length));
}

@Override
public boolean release() {
return release(1);
}

@Override
public boolean release(int decrement) {
boolean released = unwrap().release(decrement);
if (released) {
hugeBufferSize.addAndGet(-initCap);
hugeBufferCount.decrementAndGet();
}
return released;
}

}
157 changes: 73 additions & 84 deletions java/memory/src/main/java/io/netty/buffer/PooledByteBufAllocatorL.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,24 @@
*/
package io.netty.buffer;

import io.netty.util.internal.StringUtil;
import static org.apache.arrow.memory.util.AssertionUtil.ASSERT_ENABLED;

import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.arrow.memory.OutOfMemoryException;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import io.netty.util.internal.StringUtil;

/**
* The base allocator that we use for all of Arrow's memory management. Returns UnsafeDirectLittleEndian buffers.
*/
public class PooledByteBufAllocatorL {
private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("drill.allocator");
private static final org.slf4j.Logger memoryLogger = org.slf4j.LoggerFactory.getLogger("arrow.allocator");

private static final int MEMORY_LOGGER_FREQUENCY_SECONDS = 60;


public static final String METRIC_PREFIX = "drill.allocator.";

private final MetricRegistry registry;
private final AtomicLong hugeBufferSize = new AtomicLong(0);
private final AtomicLong hugeBufferCount = new AtomicLong(0);
private final AtomicLong normalBufferSize = new AtomicLong(0);
Expand All @@ -51,8 +43,7 @@ public class PooledByteBufAllocatorL {
private final InnerAllocator allocator;
public final UnsafeDirectLittleEndian empty;

public PooledByteBufAllocatorL(MetricRegistry registry) {
this.registry = registry;
public PooledByteBufAllocatorL() {
allocator = new InnerAllocator();
empty = new UnsafeDirectLittleEndian(new DuplicatedByteBuf(Unpooled.EMPTY_BUFFER));
}
Expand All @@ -70,13 +61,66 @@ public int getChunkSize() {
return allocator.chunkSize;
}

private class InnerAllocator extends PooledByteBufAllocator {
public long getHugeBufferSize() {
return hugeBufferSize.get();
}

public long getHugeBufferCount() {
return hugeBufferCount.get();
}

public long getNormalBufferSize() {
return normalBufferSize.get();
}

public long getNormalBufferCount() {
return normalBufferSize.get();
}

private static class AccountedUnsafeDirectLittleEndian extends UnsafeDirectLittleEndian {
private final long initialCapacity;
private final AtomicLong count;
private final AtomicLong size;

private AccountedUnsafeDirectLittleEndian(LargeBuffer buf, AtomicLong count, AtomicLong size) {
super(buf);
this.initialCapacity = buf.capacity();
this.count = count;
this.size = size;
}

private AccountedUnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong count, AtomicLong size) {
super(buf);
this.initialCapacity = buf.capacity();
this.count = count;
this.size = size;
}

@Override
public ByteBuf copy() {
throw new UnsupportedOperationException("copy method is not supported");
}

@Override
public ByteBuf copy(int index, int length) {
throw new UnsupportedOperationException("copy method is not supported");
}

@Override
public boolean release(int decrement) {
boolean released = super.release(decrement);
if (released) {
count.decrementAndGet();
size.addAndGet(-initialCapacity);
}
return released;
}

}

private class InnerAllocator extends PooledByteBufAllocator {
private final PoolArena<ByteBuffer>[] directArenas;
private final MemoryStatusThread statusThread;
private final Histogram largeBuffersHist;
private final Histogram normalBuffersHist;
private final int chunkSize;

public InnerAllocator() {
Expand All @@ -98,50 +142,6 @@ public InnerAllocator() {
} else {
statusThread = null;
}
removeOldMetrics();

registry.register(METRIC_PREFIX + "normal.size", new Gauge<Long>() {
@Override
public Long getValue() {
return normalBufferSize.get();
}
});

registry.register(METRIC_PREFIX + "normal.count", new Gauge<Long>() {
@Override
public Long getValue() {
return normalBufferCount.get();
}
});

registry.register(METRIC_PREFIX + "huge.size", new Gauge<Long>() {
@Override
public Long getValue() {
return hugeBufferSize.get();
}
});

registry.register(METRIC_PREFIX + "huge.count", new Gauge<Long>() {
@Override
public Long getValue() {
return hugeBufferCount.get();
}
});

largeBuffersHist = registry.histogram(METRIC_PREFIX + "huge.hist");
normalBuffersHist = registry.histogram(METRIC_PREFIX + "normal.hist");

}


private synchronized void removeOldMetrics() {
registry.removeMatching(new MetricFilter() {
@Override
public boolean matches(String name, Metric metric) {
return name.startsWith("drill.allocator.");
}

});
}

private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCapacity) {
Expand All @@ -154,27 +154,26 @@ private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCa
// This is beyond chunk size so we'll allocate separately.
ByteBuf buf = UnpooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity, maxCapacity);

hugeBufferCount.incrementAndGet();
hugeBufferSize.addAndGet(buf.capacity());
largeBuffersHist.update(buf.capacity());
// logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception());
return new UnsafeDirectLittleEndian(new LargeBuffer(buf, hugeBufferSize, hugeBufferCount));
hugeBufferCount.incrementAndGet();

// logger.debug("Allocating huge buffer of size {}", initialCapacity, new Exception());
return new AccountedUnsafeDirectLittleEndian(new LargeBuffer(buf), hugeBufferCount, hugeBufferSize);
} else {
// within chunk, use arena.
ByteBuf buf = directArena.allocate(cache, initialCapacity, maxCapacity);
if (!(buf instanceof PooledUnsafeDirectByteBuf)) {
fail();
}

normalBuffersHist.update(buf.capacity());
if (ASSERT_ENABLED) {
normalBufferSize.addAndGet(buf.capacity());
normalBufferCount.incrementAndGet();
if (!ASSERT_ENABLED) {
return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf);
}

return new UnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf, normalBufferCount,
normalBufferSize);
normalBufferSize.addAndGet(buf.capacity());
normalBufferCount.incrementAndGet();

return new AccountedUnsafeDirectLittleEndian((PooledUnsafeDirectByteBuf) buf, normalBufferCount, normalBufferSize);
}

} else {
Expand All @@ -184,9 +183,10 @@ private UnsafeDirectLittleEndian newDirectBufferL(int initialCapacity, int maxCa

private UnsupportedOperationException fail() {
return new UnsupportedOperationException(
"Arrow requries that the JVM used supports access sun.misc.Unsafe. This platform didn't provide that functionality.");
"Arrow requires that the JVM used supports access sun.misc.Unsafe. This platform didn't provide that functionality.");
}

@Override
public UnsafeDirectLittleEndian directBuffer(int initialCapacity, int maxCapacity) {
if (initialCapacity == 0 && maxCapacity == 0) {
newDirectBuffer(initialCapacity, maxCapacity);
Expand Down Expand Up @@ -215,9 +215,8 @@ private void validate(int initialCapacity, int maxCapacity) {
private class MemoryStatusThread extends Thread {

public MemoryStatusThread() {
super("memory-status-logger");
super("allocation.logger");
this.setDaemon(true);
this.setName("allocation.logger");
}

@Override
Expand All @@ -229,12 +228,11 @@ public void run() {
} catch (InterruptedException e) {
return;
}

}
}

}

@Override
public String toString() {
StringBuilder buf = new StringBuilder();
buf.append(directArenas.length);
Expand All @@ -260,13 +258,4 @@ public String toString() {


}

public static final boolean ASSERT_ENABLED;

static {
boolean isAssertEnabled = false;
assert isAssertEnabled = true;
ASSERT_ENABLED = isAssertEnabled;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package io.netty.buffer;

import io.netty.util.internal.PlatformDependent;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -32,43 +30,33 @@
* The underlying class we use for little-endian access to memory. Is used underneath ArrowBufs to abstract away the
* Netty classes and underlying Netty memory management.
*/
public final class UnsafeDirectLittleEndian extends WrappedByteBuf {
public class UnsafeDirectLittleEndian extends WrappedByteBuf {
private static final boolean NATIVE_ORDER = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN;
private static final AtomicLong ID_GENERATOR = new AtomicLong(0);

public final long id = ID_GENERATOR.incrementAndGet();
private final AbstractByteBuf wrapped;
private final long memoryAddress;

private final AtomicLong bufferCount;
private final AtomicLong bufferSize;
private final long initCap;

UnsafeDirectLittleEndian(DuplicatedByteBuf buf) {
this(buf, true, null, null);
this(buf, true);
}

UnsafeDirectLittleEndian(LargeBuffer buf) {
this(buf, true, null, null);
this(buf, true);
}

UnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf, AtomicLong bufferCount, AtomicLong bufferSize) {
this(buf, true, bufferCount, bufferSize);
UnsafeDirectLittleEndian(PooledUnsafeDirectByteBuf buf) {
this(buf, true);

}

private UnsafeDirectLittleEndian(AbstractByteBuf buf, boolean fake, AtomicLong bufferCount, AtomicLong bufferSize) {
private UnsafeDirectLittleEndian(AbstractByteBuf buf, boolean fake) {
super(buf);
if (!NATIVE_ORDER || buf.order() != ByteOrder.BIG_ENDIAN) {
throw new IllegalStateException("Arrow only runs on LittleEndian systems.");
}

this.bufferCount = bufferCount;
this.bufferSize = bufferSize;

// initCap is used if we're tracking memory release. If we're in non-debug mode, we'll skip this.
this.initCap = ASSERT_ENABLED ? buf.capacity() : -1;

this.wrapped = buf;
this.memoryAddress = buf.memoryAddress();
}
Expand Down Expand Up @@ -244,16 +232,6 @@ public boolean release() {
return release(1);
}

@Override
public boolean release(int decrement) {
final boolean released = super.release(decrement);
if (ASSERT_ENABLED && released && bufferCount != null && bufferSize != null) {
bufferCount.decrementAndGet();
bufferSize.addAndGet(-initCap);
}
return released;
}

@Override
public int setBytes(int index, InputStream in, int length) throws IOException {
wrapped.checkIndex(index, length);
Expand Down
Loading