Skip to content

Commit

Permalink
hide queue inside MemoryBoundedLinkedBlockingQueue (airbytehq#26375)
Browse files Browse the repository at this point in the history
Hiding the actual java Queue has an inner class to avoid the chance that someone tries to use native queue methods that we haven't overridden. Good thing that I did this too, because one of the changes we made during hack dayz wasn't reflected in our current feature branch. We need to override poll(time, unit) not just poll. This PR makes sure we won't make that mistake again!
  • Loading branch information
cgardens authored and marcosmarxm committed Jun 8, 2023
1 parent c82d209 commit 5d3aba6
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,83 +12,126 @@
import java.util.concurrent.atomic.AtomicReference;
import lombok.extern.slf4j.Slf4j;

/**
* This class is meant to emulate the behavior of a LinkedBlockingQueue, but instead of being
* bounded on number of items in the queue, it is bounded by the memory it is allowed to use. The
* amount of memory it is allowed to use can be resized after it is instantiated.
* <p>
* This class intentaionally hides the underlying queue inside of it. For this class to work, it has
* to override each method on a queue that adds or removes records from the queue. The Queue
* interface has a lot of methods to override, and we don't want to spend the time overriding a lot
* of methods that won't be used. By hiding the queue, we avoid someone accidentally using a queue
* method that has not been modified. If you need access to another of the queue methods, pattern
* match adding the memory tracking as seen in {@link HiddenQueue}, and then delegate to that method
* from this top-level class.
*
* @param <E> type in the queue
*/
@Slf4j
class MemoryBoundedLinkedBlockingQueue<E> extends LinkedBlockingQueue<MemoryBoundedLinkedBlockingQueue.MemoryItem<E>> {

private final AtomicLong currentMemoryUsage;
private final AtomicLong maxMemoryUsage;
class MemoryBoundedLinkedBlockingQueue<E> {

private final AtomicReference<Instant> timeOfLastMessage;
private final HiddenQueue<E> hiddenQueue;

public MemoryBoundedLinkedBlockingQueue(final long maxMemoryUsage) {
currentMemoryUsage = new AtomicLong(0);
this.maxMemoryUsage = new AtomicLong(maxMemoryUsage);
timeOfLastMessage = new AtomicReference(null);
hiddenQueue = new HiddenQueue<>(maxMemoryUsage);
}

public long getCurrentMemoryUsage() {
return currentMemoryUsage.get();
}

public long getMaxMemoryUsage() {
return maxMemoryUsage.get();
return hiddenQueue.currentMemoryUsage.get();
}

public void addMaxMemory(final long maxMemoryUsage) {
this.maxMemoryUsage.addAndGet(maxMemoryUsage);
this.hiddenQueue.maxMemoryUsage.addAndGet(maxMemoryUsage);
}

public Optional<Instant> getTimeOfLastMessage() {
return Optional.ofNullable(timeOfLastMessage.get());
return Optional.ofNullable(hiddenQueue.timeOfLastMessage.get());
}

public int size() {
return hiddenQueue.size();
}

public boolean offer(final E e, final long itemSizeInBytes) {
final long newMemoryUsage = currentMemoryUsage.addAndGet(itemSizeInBytes);
if (newMemoryUsage <= maxMemoryUsage.get()) {
final boolean success = super.offer(new MemoryItem<>(e, itemSizeInBytes));
if (!success) {
currentMemoryUsage.addAndGet(-itemSizeInBytes);
} else {
// it succeeded!
timeOfLastMessage.set(Instant.now());
}
log.debug("offer status: {}", success);
return success;
} else {
currentMemoryUsage.addAndGet(-itemSizeInBytes);
log.debug("offer failed");
return false;
}
return hiddenQueue.offer(e, itemSizeInBytes);
}

@Override
public MemoryBoundedLinkedBlockingQueue.MemoryItem<E> take() throws InterruptedException {
final MemoryItem<E> memoryItem = super.take();
if (memoryItem != null) {
currentMemoryUsage.addAndGet(-memoryItem.size());
return memoryItem;
}
return null;
return hiddenQueue.take();
}

@Override
public MemoryBoundedLinkedBlockingQueue.MemoryItem<E> poll() {
final MemoryItem<E> memoryItem = super.poll();
if (memoryItem != null) {
currentMemoryUsage.addAndGet(-memoryItem.size());
return memoryItem;
}
return null;
return hiddenQueue.poll();
}

@Override
public MemoryBoundedLinkedBlockingQueue.MemoryItem<E> poll(final long timeout, final TimeUnit unit) throws InterruptedException {
final MemoryItem<E> memoryItem = super.poll(timeout, unit);
if (memoryItem != null) {
return hiddenQueue.poll(timeout, unit);
}

/**
* Extends LinkedBlockingQueue so that we can get a LinkedBlockingQueue bounded by memory. Hidden as
* an inner class, so it doesn't get misused, see top-level javadoc comment.
*
* @param <E>
*/
private static class HiddenQueue<E> extends LinkedBlockingQueue<MemoryBoundedLinkedBlockingQueue.MemoryItem<E>> {

private final AtomicLong currentMemoryUsage;
private final AtomicLong maxMemoryUsage;
private final AtomicReference<Instant> timeOfLastMessage;

public HiddenQueue(final long maxMemoryUsage) {
currentMemoryUsage = new AtomicLong(0);
this.maxMemoryUsage = new AtomicLong(maxMemoryUsage);
timeOfLastMessage = new AtomicReference<>(null);
}

public boolean offer(final E e, final long itemSizeInBytes) {
final long newMemoryUsage = currentMemoryUsage.addAndGet(itemSizeInBytes);
if (newMemoryUsage <= maxMemoryUsage.get()) {
final boolean success = super.offer(new MemoryItem<>(e, itemSizeInBytes));
if (!success) {
currentMemoryUsage.addAndGet(-itemSizeInBytes);
} else {
// it succeeded!
timeOfLastMessage.set(Instant.now());
}
log.debug("offer status: {}", success);
return success;
} else {
currentMemoryUsage.addAndGet(-itemSizeInBytes);
log.debug("offer failed");
return false;
}
}

@Override
public MemoryBoundedLinkedBlockingQueue.MemoryItem<E> take() throws InterruptedException {
final MemoryItem<E> memoryItem = super.take();
currentMemoryUsage.addAndGet(-memoryItem.size());
return memoryItem;
}
return null;

@Override
public MemoryBoundedLinkedBlockingQueue.MemoryItem<E> poll() {
final MemoryItem<E> memoryItem = super.poll();
if (memoryItem != null) {
currentMemoryUsage.addAndGet(-memoryItem.size());
return memoryItem;
}
return null;
}

@Override
public MemoryBoundedLinkedBlockingQueue.MemoryItem<E> poll(final long timeout, final TimeUnit unit) throws InterruptedException {
final MemoryItem<E> memoryItem = super.poll(timeout, unit);
if (memoryItem != null) {
currentMemoryUsage.addAndGet(-memoryItem.size());
return memoryItem;
}
return null;
}

}

public record MemoryItem<E> (E item, long size) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,11 @@ void offerAndTakeShouldReturn() throws InterruptedException {

queue.offer("abc", 6);

var item = queue.take();
final var item = queue.take();

assertEquals("abc", item.item());
}

@Test
void offerAndToStreamShouldReturn() throws InterruptedException {
final MemoryBoundedLinkedBlockingQueue<String> queue = new MemoryBoundedLinkedBlockingQueue<>(1024);

queue.offer("abc", 6);
queue.offer("DEF", 6);

System.out.println(queue.size());
queue.stream().forEach(stringMemoryItem -> System.out.println(stringMemoryItem.item()));
System.out.println(queue.size());
}

@Test
void test() throws InterruptedException {
final MemoryBoundedLinkedBlockingQueue<String> queue = new MemoryBoundedLinkedBlockingQueue<>(1024);
Expand Down

0 comments on commit 5d3aba6

Please sign in to comment.