Skip to content

Fix for applyForEach implementation on BigQueueImpl when gc is called #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/org/kairosdb/bigqueue/BigQueueImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public void applyForEach(ItemIterator iterator) throws IOException {
}

long index = this.queueFrontIndex.get();
for (long i = index; i < this.innerArray.size(); i++) {
for (long i = index; i < this.innerArray.getHeadIndex(); i++) {
iterator.forEach(this.innerArray.get(i));
}
} finally {
Expand Down
58 changes: 58 additions & 0 deletions src/test/java/org/kairosdb/bigqueue/BigQueueUnitTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ public void testApplyForEachDoNotChangeTheQueue() throws Exception {

assertEquals(3, bigQueue.size());
assertEquals(bigQueue.size(), dii.getCount());
assertEquals("1, 2, 3, ", dii.toString());

assertArrayEquals("1".getBytes(), bigQueue.dequeue());
assertArrayEquals("2".getBytes(), bigQueue.dequeue());
Expand All @@ -147,6 +148,63 @@ public void testApplyForEachDoNotChangeTheQueue() throws Exception {
assertEquals(0, bigQueue.size());
}

/**
* This test a simple lifecycle test where we call gc as soon as we dequeue.
* First enqueue 3 messages onto the queue.
* Then we dequeue it
* Add one more messages
* Dequeue it.
*
* Throughout the lifecycle we test the appleForEach to make sure we still get back what is on the queue.
*
* This test the gc call to make sure when the this.arrayTailIndex.get() is reset to the position before the last
* dequeue position that we still can reach all the items in the queue.
* @throws Exception if something goes wrong
*/
@Test
public void testApplyForEachTestGCLifeCycle() throws Exception {
bigQueue = new BigQueueImpl(testDir, "testApplyForEachTestGC", BigArrayImpl.MINIMUM_DATA_PAGE_SIZE);
bigQueue.enqueue("1".getBytes());
bigQueue.enqueue("2".getBytes());
bigQueue.enqueue("3".getBytes());

DefaultItemIterator dii = new DefaultItemIterator();
bigQueue.applyForEach(dii);
System.out.println("[" + dii.getCount() + "] " + dii.toString());

assertEquals(3, bigQueue.size());
assertEquals(bigQueue.size(), dii.getCount());
assertEquals("1, 2, 3, ", dii.toString());

assertArrayEquals("1".getBytes(), bigQueue.dequeue());
bigQueue.gc();
assertArrayEquals("2".getBytes(), bigQueue.dequeue());
bigQueue.gc();
assertArrayEquals("3".getBytes(), bigQueue.dequeue());
bigQueue.gc();

assertEquals(0, bigQueue.size());

// Send one more data
DefaultItemIterator dii2 = new DefaultItemIterator();
bigQueue.enqueue("1".getBytes());
bigQueue.applyForEach(dii2);
System.out.println("[" + dii2.getCount() + "] " + dii2.toString());
assertEquals(1, bigQueue.size());
assertEquals(bigQueue.size(), dii2.getCount());
assertEquals("1, ", dii2.toString());
assertArrayEquals("1".getBytes(), bigQueue.dequeue());
bigQueue.gc();

// Check nothing on the applyForEach
DefaultItemIterator dii3 = new DefaultItemIterator();
bigQueue.applyForEach(dii3);
System.out.println("[" + dii3.getCount() + "] " + dii3.toString());
assertEquals(0, bigQueue.size());
assertEquals(bigQueue.size(), dii3.getCount());
assertEquals("", dii3.toString()); // nothing to return
}

@Test
public void concurrentApplyForEachTest() throws Exception {
bigQueue = new BigQueueImpl(testDir, "concurrentApplyForEachTest", BigArrayImpl.MINIMUM_DATA_PAGE_SIZE );
Expand Down