Skip to content

Commit 047aef1

Browse files
committed
fix tests
1 parent da6e55e commit 047aef1

File tree

4 files changed

+53
-45
lines changed

4 files changed

+53
-45
lines changed

src/main/java/com/me/rocks/queue/RocksQueue.java

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.me.rocks.queue.util.Bytes;
44
import org.rocksdb.ColumnFamilyHandle;
5+
import org.rocksdb.RocksDBException;
56
import org.rocksdb.RocksIterator;
67
import org.rocksdb.WriteBatch;
78
import org.slf4j.Logger;
@@ -33,8 +34,8 @@ public RocksQueue(final String queueName, final RocksStore store) {
3334
this.cfHandle = store.createColumnFamilyHandle(queueName);
3435
this.indexCfHandle = store.createColumnFamilyHandle(getIndexColumnFamilyName(queueName));
3536

36-
this.tail.set(getIndexId(TAIL));
37-
this.head.set(getIndexId(HEAD));
37+
this.tail.set(getIndexId(TAIL, 0));
38+
this.head.set(getIndexId(HEAD, 1));
3839

3940
this.tailIterator = store.newIteratorCF(cfHandle);
4041
}
@@ -47,30 +48,30 @@ private String getIndexColumnFamilyName(String queueName) {
4748
}
4849

4950
public boolean isEmpty() {
50-
return tail.get() <= head.get();
51+
return tail.get() == 0 ? true: tail.get() <= head.get();
5152
}
5253

5354
public long getSize() {
54-
return tail.get() - head.get();
55+
return tail.get() - head.get() + 1;
5556
}
5657

57-
public long getHead() {
58+
public long getHeadIndex() {
5859
return head.get();
5960
}
6061

61-
public long getTail() {
62+
public long getTailIndex() {
6263
return tail.get();
6364
}
6465

6566
public long approximateSize() {
66-
return getIndexId(TAIL) - getIndexId(HEAD);
67+
return getIndexId(TAIL, 0) - getIndexId(HEAD, 1) + 1;
6768
}
6869

69-
private long getIndexId(byte[] key) {
70+
private long getIndexId(byte[] key, long defaultValue) {
7071
byte[] value = store.getCF(key, indexCfHandle);
7172

7273
if(value == null) {
73-
return 0;
74+
return defaultValue;
7475
}
7576

7677
return Bytes.byteToLong(value);
@@ -84,6 +85,10 @@ public long enqueue(byte[] value) {
8485
writeBatch.put(cfHandle, indexId, value);
8586
writeBatch.merge(indexCfHandle, TAIL, ONE);
8687
store.write(writeBatch);
88+
} catch (RocksDBException e) {
89+
tail.decrementAndGet();
90+
log.error("Enqueue {} fails, {}", id, e);
91+
return -1;
8792
}
8893

8994
return id;
@@ -95,7 +100,7 @@ public long enqueue(byte[] value) {
95100
*/
96101
public QueueItem dequeue() {
97102
QueueItem item = consume();
98-
removeHead(item.getKey());
103+
removeHead();
99104
return item;
100105
}
101106

@@ -109,11 +114,9 @@ public QueueItem consume() {
109114
return null;
110115
}
111116

112-
if(!tailIterator.isValid()) {
113-
log.debug("Seek to head from {}", head.get());
114-
byte[] sid = Bytes.longToByte(head.get());
115-
tailIterator.seek(sid);
116-
}
117+
log.debug("Seek to head from {}", head.get());
118+
byte[] sid = Bytes.longToByte(head.get());
119+
tailIterator.seek(sid);
117120

118121
if(!tailIterator.isValid()) {
119122
return null;
@@ -124,23 +127,26 @@ public QueueItem consume() {
124127
QueueItem item = new QueueItem();
125128
item.setKey(id);
126129
item.setValue(tailIterator.value());
127-
tailIterator.next();
128130

129131
return item;
130132
}
131133

132134
/**
133135
* remove the head from queue
134-
* @param headId
135136
* @return
136137
*/
137-
public void removeHead(long headId) {
138+
public void removeHead() {
139+
if(this.getSize() <= 0) {
140+
return;
141+
}
142+
138143
try(final WriteBatch writeBatch = new WriteBatch()) {
139-
writeBatch.remove(cfHandle, Bytes.longToByte(headId));
144+
writeBatch.remove(cfHandle, Bytes.longToByte(head.get()));
140145
writeBatch.merge(indexCfHandle, HEAD, ONE);
141146
store.write(writeBatch);
142-
143147
head.incrementAndGet();
148+
} catch (RocksDBException e) {
149+
e.printStackTrace();
144150
}
145151
}
146152

src/main/java/com/me/rocks/queue/RocksStore.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -177,11 +177,7 @@ public byte[] getCF(byte[] key, ColumnFamilyHandle cfHandle) {
177177
return value;
178178
}
179179

180-
public void write(WriteBatch writeBatch) {
181-
try {
182-
db.write(this.writeOptions, writeBatch);
183-
} catch (RocksDBException e) {
184-
log.error("Write batch into rocks db fails, {}", e);
185-
}
180+
public void write(WriteBatch writeBatch) throws RocksDBException {
181+
db.write(this.writeOptions, writeBatch);
186182
}
187183
}

src/test/java/com/me/rocks/queue/RocksQueueShould.java

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,12 @@
88
import org.slf4j.LoggerFactory;
99

1010
import static org.hamcrest.core.Is.is;
11-
import static org.junit.Assert.assertArrayEquals;
12-
import static org.junit.Assert.assertEquals;
13-
import static org.junit.Assert.assertThat;
11+
import static org.junit.Assert.*;
1412

1513
public class RocksQueueShould extends RocksShould {
1614
private static final Logger log = LoggerFactory.getLogger(RocksQueueShould.class);
1715

1816
private RocksStore rocksStore;
19-
private String queueName = "queue_name";
2017

2118
@Before public void
2219
initialize() {
@@ -30,8 +27,10 @@ public class RocksQueueShould extends RocksShould {
3027
should_queue_init_head_and_tail() {
3128
RocksQueue queue = rocksStore.createQueue(generateQueueName());
3229

33-
assertThat(queue.getHead(), is(0L));
34-
assertThat(queue.getTail(), is(0L));
30+
assertThat(queue.getHeadIndex(), is(1L));
31+
assertThat(queue.getTailIndex(), is(0L));
32+
assertTrue(queue.isEmpty());
33+
assertThat(queue.approximateSize(), is(0L));
3534
}
3635

3736
@Test public void
@@ -42,8 +41,8 @@ public class RocksQueueShould extends RocksShould {
4241

4342
queue.enqueue(something);
4443

45-
assertThat(queue.getHead(), is(0L));
46-
assertThat(queue.getTail(), is(1L));
44+
assertThat(queue.getHeadIndex(), is(1L));
45+
assertThat(queue.getTailIndex(), is(1L));
4746

4847
queue.close();
4948
}
@@ -60,21 +59,23 @@ public class RocksQueueShould extends RocksShould {
6059
queue.enqueue(v2);
6160
assertThat(queue.approximateSize(), is(2L));
6261

63-
assertEquals(queue.getHead(), 0);
64-
assertEquals(queue.getTail(), 2);
62+
assertEquals(queue.getHeadIndex(), 1);
63+
assertEquals(queue.getTailIndex(), 2);
6564

6665
QueueItem res1 = queue.dequeue();
6766
assertArrayEquals(v1, res1.getValue());
67+
assertEquals(queue.getHeadIndex(), 2);
6868
assertThat(queue.approximateSize(), is(1L));
6969

7070
QueueItem res2 = queue.dequeue();
7171
assertArrayEquals(v2, res2.getValue());
7272
assertThat(queue.approximateSize(), is(0L));
73+
assertThat(queue.getSize(), is(0L));
7374

74-
assertEquals(queue.getTail(), 2);
75-
assertEquals(queue.getHead(), 2);
75+
assertEquals(queue.getTailIndex(), 2);
76+
assertEquals(queue.getHeadIndex(), 2);
7677

77-
log.info("queue tail is {} and head is {}", queue.getHead(), queue.getTail());
78+
log.info("queue tail is {} and head is {}", queue.getHeadIndex(), queue.getTailIndex());
7879
}
7980

8081
@Test public void
@@ -92,18 +93,23 @@ public class RocksQueueShould extends RocksShould {
9293
QueueItem consume = queue.consume();
9394
assertEquals(consume.getKey(), id_1);
9495
log.info("Consumes value = {}", Bytes.bytesToString(consume.getValue()));
95-
assertArrayEquals(consume.getValue(), v2);
96+
assertArrayEquals(consume.getValue(), v1);
9697

9798
//multiple times consumes will always return the head
9899
QueueItem consume2 = queue.consume();
99100
assertEquals(consume2.getKey(), id_1);
100-
assertArrayEquals(consume2.getValue(), v2);
101+
assertArrayEquals(consume2.getValue(), v1);
101102

102-
assertEquals(queue.getTail(), 2);
103-
assertEquals(queue.getHead(), 0);
103+
assertEquals(queue.getTailIndex(), 2);
104+
assertEquals(queue.getHeadIndex(), 1);
104105
assertEquals(queue.approximateSize(), 2);
105106
}
106107

108+
@Test public void
109+
when_poll_queue_should_reove_head() {
110+
111+
}
112+
107113
@After public void
108114
destroy() {
109115
rocksStore.close();

src/test/java/com/me/rocks/queue/RocksStoreShould.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ public class RocksStoreShould extends RocksShould {
3131
@Test public void
3232
should_newly_created_queue_size_approximate_to_zero() {
3333
assertNotNull(queue);
34-
assertThat(queue.getHead(), is(0L));
35-
assertThat(queue.getTail(), is(0L));
34+
assertThat(queue.getHeadIndex(), is(1L));
35+
assertThat(queue.getTailIndex(), is(0L));
3636
assertThat(queue.approximateSize(), is(0L));
3737
}
3838

0 commit comments

Comments
 (0)