Skip to content

Commit

Permalink
fixing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sboesebeck committed Nov 5, 2018
1 parent d195c41 commit 6d38d62
Showing 1 changed file with 31 additions and 13 deletions.
44 changes: 31 additions & 13 deletions src/de/caluga/morphium/driver/inmem/InMemoryDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -519,13 +519,29 @@ public MorphiumCursor initIteration(String db, String collection, Map<String, Ob
public void watch(String db, int timeout, boolean fullDocumentOnUpdate, DriverTailableIterationCallback cb) throws MorphiumDriverException {
watchersByDb.putIfAbsent(db, new Vector<>());
watchersByDb.get(db).add(cb);

//simulate blocking
while (isConnected()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}

@Override
public void watch(String db, String collection, int timeout, boolean fullDocumentOnUpdate, DriverTailableIterationCallback cb) throws MorphiumDriverException {
String key = db + "." + collection;
watchersByDb.putIfAbsent(key, new Vector<>());
watchersByDb.get(key).add(cb);
//simulate blocking
while (isConnected()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

@Override
Expand Down Expand Up @@ -894,26 +910,28 @@ public Map<String, Object> update(String db, String collection, Map<String, Obje
private void notifyWatchers(String db, String collection, String op, Map doc) {
List<DriverTailableIterationCallback> w = null;
if (watchersByDb.containsKey(db)) {
w = watchersByDb.get(db);
w = new ArrayList<>(watchersByDb.get(db));
} else if (collection != null && watchersByDb.containsKey(db + "." + collection)) {
w = watchersByDb.get(db + "." + collection);
w = new ArrayList<>(watchersByDb.get(db + "." + collection));
}
if (w == null) {
return;
}
Map<String, Object> data = new HashMap<>();
data.put("fullDocument", doc);
data.put("operationType", op);
Map m = Utils.getMap("db", db);
m.put("coll", collection);
data.put("ns", m);

long tx = txn.incrementAndGet();
data.put("txnNumber", tx);
data.put("clusterTime", System.currentTimeMillis());
if (doc != null) {
data.put("documentKey", Utils.getMap("_id", doc.get("_id")));
}
for (DriverTailableIterationCallback cb : w) {
Map<String, Object> data = new HashMap<>();
data.put("fullDocument", doc);
data.put("operationType", op);
Map m = Utils.getMap("db", db);
m.put("coll", collection);
data.put("ns", m);
data.put("txnNumber", tx);
data.put("clusterTime", System.currentTimeMillis());
if (doc != null) {
data.put("documentKey", Utils.getMap("_id", doc.get("_id")));
}

try {
cb.incomingData(data, System.currentTimeMillis());
} catch (Exception e) {
Expand Down

0 comments on commit 6d38d62

Please sign in to comment.