Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added EvictingBlockingQueue implementation. Closes #3882 #5417

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Copyright (C) 2012 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.common.util.concurrent;

import java.util.AbstractList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.GwtIncompatible;
import com.google.common.collect.ImmutableList;
import com.google.common.testing.NullPointerTester;
import com.google.common.testing.SerializableTester;

import junit.framework.TestCase;

/**
* Tests for {@link EvictingBlockingQueue}.
*
* @author Noam Greenshtain
*/
//@GwtCompatible(emulated = true)
public class EvictingBlockingQueueTest extends TestCase {
public void testCreateWithNegativeSize() throws Exception {
try {
EvictingBlockingQueue.create(-1);
fail();
} catch (IllegalArgumentException expected) {
}
}

public void testCreateWithZeroSize() throws Exception {
try {
EvictingBlockingQueue.create(0);
fail();
} catch (IllegalArgumentException expected) {
}
}

public void testRemainingCapacity_maxSize1() {
EvictingBlockingQueue<String> queue = EvictingBlockingQueue.create(1);
assertEquals(1, queue.remainingCapacity());
queue.add("hi");
assertEquals(0, queue.remainingCapacity());
}

public void testRemainingCapacity_maxSize3() {
EvictingBlockingQueue<String> queue = EvictingBlockingQueue.create(3);
assertEquals(3, queue.remainingCapacity());
queue.add("hi");
assertEquals(2, queue.remainingCapacity());
queue.add("hi");
assertEquals(1, queue.remainingCapacity());
queue.add("hi");
assertEquals(0, queue.remainingCapacity());
}

public void testEvictingAfterOne() throws Exception {
EvictingBlockingQueue<String> queue = EvictingBlockingQueue.create(1);
assertEquals(0, queue.size());
assertEquals(1, queue.remainingCapacity());

assertTrue(queue.add("hi"));
assertEquals("hi", queue.element());
assertEquals("hi", queue.peek());
assertEquals(1, queue.size());
assertEquals(0, queue.remainingCapacity());

assertTrue(queue.add("there"));
assertEquals("there", queue.element());
assertEquals("there", queue.peek());
assertEquals(1, queue.size());
assertEquals(0, queue.remainingCapacity());

assertEquals("there", queue.remove());
assertEquals(0, queue.size());
assertEquals(1, queue.remainingCapacity());
}

public void testEvictingAfterThree() throws Exception {
EvictingBlockingQueue<String> queue = EvictingBlockingQueue.create(3);
assertEquals(0, queue.size());
assertEquals(3, queue.remainingCapacity());

assertTrue(queue.add("one"));
assertTrue(queue.offer("two",Long.valueOf(1),TimeUnit.SECONDS));
assertTrue(queue.offer("three"));
assertEquals("one", queue.element());
assertEquals("one", queue.peek());
assertEquals(3, queue.size());
assertEquals(0, queue.remainingCapacity());

assertTrue(queue.add("four"));
assertEquals("two", queue.element());
assertEquals("two", queue.peek());
assertEquals(3, queue.size());
assertEquals(0, queue.remainingCapacity());

assertEquals("two", queue.remove());
assertEquals(2, queue.size());
assertEquals(1, queue.remainingCapacity());
}

public void testAddAll() throws Exception {
EvictingBlockingQueue<String> queue = EvictingBlockingQueue.create(3);
assertEquals(0, queue.size());
assertEquals(3, queue.remainingCapacity());

assertTrue(queue.addAll(ImmutableList.of("one", "two", "three")));
assertEquals("one", queue.element());
assertEquals("one", queue.peek());
assertEquals(3, queue.size());
assertEquals(0, queue.remainingCapacity());

assertTrue(queue.addAll(ImmutableList.of("four")));
assertEquals("two", queue.element());
assertEquals("two", queue.peek());
assertEquals(3, queue.size());
assertEquals(0, queue.remainingCapacity());

assertEquals("two", queue.remove());
assertEquals(2, queue.size());
assertEquals(1, queue.remainingCapacity());
}

public void testAddAll_largeList() {
final List<String> list = ImmutableList.of("one", "two", "three", "four", "five");
List<String> misbehavingList = new AbstractList<String>() {
@Override
public int size() {
return list.size();
}

@Override
public String get(int index) {
if (index < 2) {
throw new AssertionError();
}
return list.get(index);
}
};

EvictingBlockingQueue<String> queue = EvictingBlockingQueue.create(3);
assertTrue(queue.addAll(misbehavingList));

assertEquals("three", queue.remove());
assertEquals("four", queue.remove());
assertEquals("five", queue.remove());
assertTrue(queue.isEmpty());
}

@GwtIncompatible // NullPointerTester
public void testNullPointerExceptions() {
NullPointerTester tester = new NullPointerTester();
tester.testAllPublicStaticMethods(EvictingBlockingQueue.class);
tester.testAllPublicConstructors(EvictingBlockingQueue.class);
EvictingBlockingQueue<String> queue = EvictingBlockingQueue.create(5);
// The queue must be non-empty so it throws a NPE correctly
queue.add("one");
tester.testAllPublicInstanceMethods(queue);
}

public void testSerialization() {
EvictingBlockingQueue<String> original = EvictingBlockingQueue.create(5);
original.add("one");
original.add("two");
original.add("three");

EvictingBlockingQueue<String> copy = SerializableTester.reserialize(original);
assertEquals(copy.maxSize, original.maxSize);
assertEquals("one", copy.remove());
assertEquals("two", copy.remove());
assertEquals("three", copy.remove());
assertTrue(copy.isEmpty());
}
}
178 changes: 178 additions & 0 deletions guava/src/com/google/common/util/concurrent/EvictingBlockingQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Copyright (C) 2010 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package com.google.common.util.concurrent;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.annotations.GwtIncompatible;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import java.io.Serializable;
import java.util.Collection;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;


/**
* A thread-safe implementation of the EvictingQueue data structure. It holds an
* ArrayBlockingQueue as the delegate which performers most of the BlockingQueue
* interface API. It overrides the methods that adds new objects to the data
* structure in order to implement the Evicting feature.
*
* <p>
* An evicting blocking queue must be configured with a maximum size. Each time
* an element is added to a full queue, the queue automatically removes its head
* element. This is different from conventional bounded queues, which either
* block or reject new elements when full.
*
* <p>
* This class does not accept null elements.
*
* @author Noam Greenshtain
*
*/
@GwtIncompatible
public final class EvictingBlockingQueue<E> extends ForwardingBlockingQueue<E> implements Serializable {

private final BlockingQueue<E> delegate;

private final Integer lock;

@VisibleForTesting
final int maxSize;

private EvictingBlockingQueue(int maxSize) {
checkArgument(maxSize >= 1, "maxSize (%s) must >= 1", maxSize);
this.delegate = new ArrayBlockingQueue<E>(maxSize);
this.maxSize = maxSize;
this.lock = Integer.valueOf(0);
}

/**
* Creates and returns a new EvictingBlockingQueue that will hold up to
* {@code maxSize} elements.
*
*/
public static <E> EvictingBlockingQueue<E> create(int maxSize) {
return new EvictingBlockingQueue<E>(maxSize);
}

@Override
protected BlockingQueue<E> delegate() {
return this.delegate;
}

/**
* Adds the given element to this queue. If the queue is currently full, the
* element at the head of the queue is evicted to make room.
*
* @return {@code true} always
*/
@Override
public void put(E e) {
add(e);
}

/**
* Adds the given element to this queue. If the queue is currently full, the
* element at the head of the queue is evicted to make room.
*
* @return {@code true} always
*/
@Override
@CanIgnoreReturnValue
public boolean offer(E e) {
return add(e);
}

/**
* Adds the given element to this queue. If the queue is currently full, the
* element at the head of the queue is evicted to make room.
*
* @return {@code true} always
*/
@Override
@CanIgnoreReturnValue
public boolean offer(E e, long timeout, @Nullable TimeUnit unit) {
return add(e); // Timeout is useless in an EvictingQueue feature.
}

/**
* Adds the given element to this queue. If the queue is currently full, the
* element at the head of the queue is evicted to make room. This operation is
* synchronized due to BlockingQueue semantics.
*
* @return {@code true} always
*/
@Override
@CanIgnoreReturnValue
public boolean add(E e) {
checkNotNull(e); // check before removing
synchronized (lock) {
if (size() == maxSize) {
delegate.remove();
}
delegate.add(e);
return true;
}
}

@Override
@CanIgnoreReturnValue
public boolean addAll(Collection<? extends E> collection) {
int size = collection.size();
synchronized (lock) {
if (size >= maxSize) {
clear();
return Iterables.addAll(this, Iterables.skip(collection, size - maxSize));
}
return standardAddAll(collection);
}
}

@Override
public boolean remove(Object o) {
checkNotNull(o);
synchronized (lock) {
return this.delegate.remove(o);
}
}

@Override
public E remove() {
synchronized (lock) {
return this.delegate.remove();
}
}

@Override
public E poll() {
synchronized (lock) {
return this.delegate.poll();
}
}

@Override
public boolean contains(Object object) {
synchronized (lock) {
return delegate().contains(checkNotNull(object));
}
}

}