Skip to content

Commit c715a13

Browse files
committed
JAVA-448: Tailable Cursor and awaitdata
1 parent d99cd45 commit c715a13

File tree

4 files changed

+96
-31
lines changed

4 files changed

+96
-31
lines changed

src/main/com/mongodb/DBApiLayer.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -407,15 +407,31 @@ public DBObject next(){
407407
}
408408

409409
public boolean hasNext(){
410-
while ( true ){
411-
if ( _cur.hasNext() )
412-
return true;
413-
410+
boolean hasNext = _cur.hasNext();
411+
while ( !hasNext ) {
414412
if ( ! _curResult.hasGetMore( _options ) )
415413
return false;
416414

417415
_advance();
416+
hasNext = _cur.hasNext();
417+
418+
if (!hasNext) {
419+
if ( ( _options & Bytes.QUERYOPTION_AWAITDATA ) == 0 ) {
420+
// dont block waiting for data if no await
421+
return false;
422+
} else {
423+
// if await, driver should block until data is available
424+
// if server does not support await, driver must sleep to avoid busy loop
425+
if ((_curResult._flags & Bytes.RESULTFLAG_AWAITCAPABLE) == 0) {
426+
try {
427+
Thread.sleep(500);
428+
} catch (Exception e) {
429+
}
430+
}
431+
}
432+
}
418433
}
434+
return hasNext;
419435
}
420436

421437
private void _advance(){

src/main/com/mongodb/DBCursor.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -450,17 +450,7 @@ private DBObject _next()
450450

451451
_check();
452452

453-
_cur = null;
454-
if ((_options & Bytes.QUERYOPTION_TAILABLE) > 0) {
455-
try {
456-
_cur = _it.next();
457-
} catch (NoSuchElementException e){
458-
// Hacky way of handling this but should be OK for now...
459-
return null;
460-
}
461-
} else {
462-
_cur = _it.next();
463-
}
453+
_cur = _it.next();
464454
_num++;
465455

466456
if ( _keysWanted != null && _keysWanted.keySet().size() > 0 ){
@@ -744,7 +734,8 @@ public String toString() {
744734
if (addr != null)
745735
sb.append(", addr=").append(addr);
746736

747-
sb.append(", readPreference=").append( _readPref.toString() );
737+
if (_readPref != null)
738+
sb.append(", readPreference=").append( _readPref.toString() );
748739
return sb.toString();
749740
}
750741

src/main/com/mongodb/Response.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,7 @@ public boolean hasGetMore( int queryOptions ){
9595
if ( ( queryOptions & Bytes.QUERYOPTION_TAILABLE ) == 0 )
9696
return false;
9797

98-
// have a tailable cursor
99-
100-
if ( ( _flags & Bytes.RESULTFLAG_AWAITCAPABLE ) > 0 && ( queryOptions & Bytes.QUERYOPTION_AWAITDATA ) > 0 )
101-
return true;
102-
98+
// have a tailable cursor, it is always possible to call get more
10399
return true;
104100
}
105101

src/test/com/mongodb/DBCursorTest.java

Lines changed: 72 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import java.io.IOException;
2020
import java.util.Iterator;
2121

22+
import java.util.logging.Level;
23+
import java.util.logging.Logger;
2224
import org.testng.annotations.Test;
2325

2426
import com.mongodb.util.TestCase;
@@ -80,20 +82,80 @@ public void testOptions() {
8082
assertEquals(0, dbCursor.getOptions());
8183
}
8284

83-
@Test
85+
// @Test
86+
// public void testTailable() {
87+
// DBCollection c = _db.createCollection( "tailableTest", new BasicDBObject( "capped", true ).append( "size", 10000));
88+
// DBCursor cursor = c.find( ).addOption( Bytes.QUERYOPTION_TAILABLE );
89+
//
90+
// long start = System.currentTimeMillis();
91+
// System.err.println( "[ " + start + " ] Has Next?" + cursor.hasNext());
92+
// cursor.next();
93+
// long end = System.currentTimeMillis();
94+
// System.err.println( "[ " + end + " ] Tailable next returned." );
95+
// assertLess(start - end, 100);
96+
// c.drop();
97+
// }
98+
99+
@Test//(enabled = false)
84100
public void testTailable() {
85-
DBCollection c = _db.createCollection( "tailableTest", new BasicDBObject( "capped", true ).append( "size", 10000));
86-
DBCursor cursor = c.find( ).addOption( Bytes.QUERYOPTION_TAILABLE );
87-
88-
long start = System.currentTimeMillis();
89-
System.err.println( "[ " + start + " ] Has Next?" + cursor.hasNext());
90-
cursor.next();
91-
long end = System.currentTimeMillis();
92-
System.err.println( "[ " + end + " ] Tailable next returned." );
93-
assertLess(start - end, 100);
101+
DBCollection c = _db.getCollection("tail1");
94102
c.drop();
103+
_db.createCollection("tail1", new BasicDBObject("capped", true).append("size", 10000));
104+
for (int i = 0; i < 10; i++) {
105+
c.save(new BasicDBObject("x", i));
106+
}
107+
108+
DBCursor cur = c.find().sort(new BasicDBObject("$natural", 1)).addOption(Bytes.QUERYOPTION_TAILABLE);
109+
110+
while (cur.hasNext()) {
111+
cur.next();
112+
//do nothing...
113+
}
114+
115+
assert (!cur.hasNext());
116+
c.save(new BasicDBObject("x", 12));
117+
assert (cur.hasNext());
118+
assertNotNull(cur.next());
119+
assert (!cur.hasNext());
95120
}
96121

122+
@Test//(enabled = false)
123+
public void testTailableAwait() {
124+
DBCollection c = _db.getCollection("tail1");
125+
c.drop();
126+
_db.createCollection("tail1", new BasicDBObject("capped", true).append("size", 10000));
127+
for (int i = 0; i < 10; i++) {
128+
c.save(new BasicDBObject("x", i));
129+
}
130+
131+
final DBCursor cur = c.find().sort(new BasicDBObject("$natural", 1)).addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);
132+
Thread t = new Thread(new Runnable() {
133+
public void run() {
134+
// the following call will block on the last hasNext
135+
int i = 0;
136+
while (cur.hasNext()) {
137+
cur.next();
138+
if (++i > 10)
139+
break;
140+
}
141+
}
142+
});
143+
t.start();
144+
145+
try {
146+
Thread.sleep(5000);
147+
} catch (InterruptedException ex) {
148+
}
149+
assert (t.isAlive());
150+
151+
// this doc should unblock thread
152+
c.save(new BasicDBObject("x", 12));
153+
try {
154+
t.join(100);
155+
} catch (InterruptedException ex) {
156+
}
157+
}
158+
97159
@Test
98160
public void testBig2(){
99161
DBCollection c = _db.getCollection("big2");

0 commit comments

Comments
 (0)