Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,20 @@
/**
* An allocation listener being notified for allocation/deallocation
* <p>
* It is expected to be called from multiple threads and as such,
* provider should take care of making the implementation thread-safe
* It might be called from multiple threads if the allocator hierarchy shares a listener, in which
* case, the provider should take care of making the implementation thread-safe.
*/
public interface AllocationListener {

public static final AllocationListener NOOP = new AllocationListener() {
@Override
public void onAllocation(long size) {
}

@Override
public boolean onFailedAllocation(long size, Accountant.AllocationOutcome outcome) {
return false;
}
};

/**
Expand All @@ -39,4 +44,15 @@ public void onAllocation(long size) {
*/
void onAllocation(long size);

/**
* Called whenever an allocation failed, giving the caller a chance to create some space in the allocator
* (either by freeing some resource, or by changing the limit), and, if successful, allowing the allocator
* to retry the allocation.
*
* @param size the buffer size that was being allocated
* @param outcome the outcome of the failed allocation. Carries information of what failed
* @return true, if the allocation can be retried; false if the allocation should fail
*/
boolean onFailedAllocation(long size, Accountant.AllocationOutcome outcome);

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,28 +56,21 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
private final HistoricalLog historicalLog;
private volatile boolean isClosed = false; // the allocator has been closed

/**
* Initialize an allocator
* @param parentAllocator parent allocator. null if defining a root allocator
* @param listener listener callback. Must be non-null -- use {@link AllocationListener#NOOP} if no listener
* desired
* @param name name of this allocator
* @param initReservation initial reservation. Cannot be modified after construction
* @param maxAllocation limit. Allocations past the limit fail. Can be modified after construction
*/
protected BaseAllocator(
final AllocationListener listener,
final String name,
final long initReservation,
final long maxAllocation) throws OutOfMemoryException {
this(listener, null, name, initReservation, maxAllocation);
}

protected BaseAllocator(
final BaseAllocator parentAllocator,
final String name,
final long initReservation,
final long maxAllocation) throws OutOfMemoryException {
this(parentAllocator.listener, parentAllocator, name, initReservation, maxAllocation);
}

private BaseAllocator(
final AllocationListener listener,
final BaseAllocator parentAllocator,
final String name,
final long initReservation,
final long maxAllocation) throws OutOfMemoryException {
final BaseAllocator parentAllocator,
final AllocationListener listener,
final String name,
final long initReservation,
final long maxAllocation) throws OutOfMemoryException {
super(parentAllocator, initReservation, maxAllocation);

this.listener = listener;
Expand Down Expand Up @@ -276,7 +269,13 @@ public ArrowBuf buffer(final int initialRequestSize, BufferManager manager) {
: initialRequestSize;
AllocationOutcome outcome = this.allocateBytes(actualRequestSize);
if (!outcome.isOk()) {
throw new OutOfMemoryException(createErrorMsg(this, actualRequestSize, initialRequestSize));
if (listener.onFailedAllocation(actualRequestSize, outcome)) {
// Second try, in case the listener can do something about it
outcome = this.allocateBytes(actualRequestSize);
}
if (!outcome.isOk()) {
throw new OutOfMemoryException(createErrorMsg(this, actualRequestSize, initialRequestSize));
}
}

boolean success = false;
Expand Down Expand Up @@ -333,9 +332,18 @@ public BufferAllocator newChildAllocator(
final String name,
final long initReservation,
final long maxAllocation) {
return newChildAllocator(name, this.listener, initReservation, maxAllocation);
}

@Override
public BufferAllocator newChildAllocator(
final String name,
final AllocationListener listener,
final long initReservation,
final long maxAllocation) {
assertOpen();

final ChildAllocator childAllocator = new ChildAllocator(this, name, initReservation,
final ChildAllocator childAllocator = new ChildAllocator(listener, this, name, initReservation,
maxAllocation);

if (DEBUG) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,17 @@ public interface BufferAllocator extends AutoCloseable {
*/
public BufferAllocator newChildAllocator(String name, long initReservation, long maxAllocation);

/**
* Create a new child allocator.
*
* @param name the name of the allocator.
* @param listener allocation listener for the newly created child
* @param initReservation the initial space reservation (obtained from this allocator)
* @param maxAllocation maximum amount of space the new allocator can allocate
* @return the new allocator, or null if it can't be created
*/
public BufferAllocator newChildAllocator(String name, AllocationListener listener, long initReservation, long maxAllocation);

/**
* Close and release all buffers generated from this buffer pool.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class ChildAllocator extends BaseAllocator {
/**
* Constructor.
*
* @param listener Allocation listener to be used in this child
* @param parentAllocator parent allocator -- the one creating this child
* @param name the name of this child allocator
* @param initReservation initial amount of space to reserve (obtained from the parent)
Expand All @@ -41,11 +42,12 @@ class ChildAllocator extends BaseAllocator {
* allocation policy in force, even less memory may be available
*/
ChildAllocator(
AllocationListener listener,
BaseAllocator parentAllocator,
String name,
long initReservation,
long maxAllocation) {
super(parentAllocator, name, initReservation, maxAllocation);
super(parentAllocator, listener, name, initReservation, maxAllocation);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public RootAllocator(final long limit) {
}

public RootAllocator(final AllocationListener listener, final long limit) {
super(listener, "ROOT", 0, limit);
super(null, listener, "ROOT", 0, limit);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,122 @@ public void testRootAllocator_createChildDontClose() throws Exception {
}
}

private static void allocateAndFree(final BufferAllocator allocator) {
// Allocation listener
// It counts the number of times it has been invoked, and how much memory allocation it has seen
// When set to 'expand on fail', it attempts to expand the associated allocator's limit
private static final class TestAllocationListener implements AllocationListener {
private int numCalls;
private long totalMem;
private boolean expandOnFail;
BufferAllocator expandAlloc;
long expandLimit;

TestAllocationListener() {
this.numCalls = 0;
this.totalMem = 0;
this.expandOnFail = false;
this.expandAlloc = null;
this.expandLimit = 0;
}

@Override
public void onAllocation(long size) {
numCalls++;
totalMem += size;
}

@Override
public boolean onFailedAllocation(long size, Accountant.AllocationOutcome outcome) {
if (expandOnFail) {
expandAlloc.setLimit(expandLimit);
return true;
}
return false;
}

void setExpandOnFail(BufferAllocator expandAlloc, long expandLimit) {
this.expandOnFail = true;
this.expandAlloc = expandAlloc;
this.expandLimit = expandLimit;
}

int getNumCalls() {
return numCalls;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should be able to do this count number of calls monitoring using Jmockit's APIs rather than custom building something.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, and in fact I had it like that initially. However, I found I was repeating code again and again in the mocks, and it was much easier to just build a custom listener.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

}

long getTotalMem() {
return totalMem;
}
}

@Test
public void testRootAllocator_listeners() throws Exception {
TestAllocationListener l1 = new TestAllocationListener();
assertEquals(0, l1.getNumCalls());
assertEquals(0, l1.getTotalMem());
TestAllocationListener l2 = new TestAllocationListener();
assertEquals(0, l2.getNumCalls());
assertEquals(0, l2.getTotalMem());
// root and first-level child share the first listener
// second-level and third-level child share the second listener
try (final RootAllocator rootAllocator = new RootAllocator(l1, MAX_ALLOCATION)) {
try (final BufferAllocator c1 = rootAllocator.newChildAllocator("c1", 0, MAX_ALLOCATION)) {
final ArrowBuf buf1 = c1.buffer(16);
assertNotNull("allocation failed", buf1);
assertEquals(1, l1.getNumCalls());
assertEquals(16, l1.getTotalMem());
buf1.release();
try (final BufferAllocator c2 = c1.newChildAllocator("c2", l2, 0, MAX_ALLOCATION)) {
final ArrowBuf buf2 = c2.buffer(32);
assertNotNull("allocation failed", buf2);
assertEquals(1, l1.getNumCalls());
assertEquals(16, l1.getTotalMem());
assertEquals(1, l2.getNumCalls());
assertEquals(32, l2.getTotalMem());
buf2.release();
try (final BufferAllocator c3 = c2.newChildAllocator("c3", 0, MAX_ALLOCATION)) {
final ArrowBuf buf3 = c3.buffer(64);
assertNotNull("allocation failed", buf3);
assertEquals(1, l1.getNumCalls());
assertEquals(16, l1.getTotalMem());
assertEquals(2, l2.getNumCalls());
assertEquals(32 + 64, l2.getTotalMem());
buf3.release();
}
}
}
}
}

@Test
public void testRootAllocator_listenerAllocationFail() throws Exception {
TestAllocationListener l1 = new TestAllocationListener();
assertEquals(0, l1.getNumCalls());
assertEquals(0, l1.getTotalMem());
// Test attempts to allocate too much from a child whose limit is set to half of the max allocation
// The listener's callback triggers, expanding the child allocator's limit, so then the allocation succeeds
try (final RootAllocator rootAllocator = new RootAllocator(MAX_ALLOCATION)) {
try (final BufferAllocator c1 = rootAllocator.newChildAllocator("c1", l1,0, MAX_ALLOCATION / 2)) {
try {
c1.buffer(MAX_ALLOCATION);
fail("allocated memory beyond max allowed");
} catch (OutOfMemoryException e) {
// expected
}
assertEquals(0, l1.getNumCalls());
assertEquals(0, l1.getTotalMem());

l1.setExpandOnFail(c1, MAX_ALLOCATION);
ArrowBuf arrowBuf = c1.buffer(MAX_ALLOCATION);
assertNotNull("allocation failed", arrowBuf);
assertEquals(1, l1.getNumCalls());
assertEquals(MAX_ALLOCATION, l1.getTotalMem());
arrowBuf.release();
}
}
}

private static void allocateAndFree(final BufferAllocator allocator) {
final ArrowBuf arrowBuf = allocator.buffer(512);
assertNotNull("allocation failed", arrowBuf);
arrowBuf.release();
Expand Down