Skip to content

Commit

Permalink
fixing deadlock in messaging
Browse files Browse the repository at this point in the history
  • Loading branch information
sboesebeck committed Apr 11, 2019
1 parent b0b2059 commit 75de569
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 1 deletion.
8 changes: 7 additions & 1 deletion src/de/caluga/morphium/messaging/Messaging.java
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ private void lockAndProcess(Msg obj) {
if (obj != null && obj.getLockedBy() != null && obj.getLockedBy().equals(id)) {
List<Msg> lst = new ArrayList<>();
lst.add(obj);
log.debug("locked messages: "+lst.size());
try {
processMessages(lst);
} catch (Exception e) {
Expand All @@ -477,13 +478,18 @@ private void processMessages(Iterable<Msg> messages) {
if (msg == null) continue;

if (msg.getInAnswerTo() != null && waitingForMessages.get(msg.getInAnswerTo()) != null) {
log.debug("Got a message, we are waiting for...");
//this message we were waiting for
waitingForAnswers.put((MorphiumId) msg.getInAnswerTo(), msg);
processing.remove(m.getMsgId());
morphium.delete(msg, getCollectionName());
return;
}
if (listeners.isEmpty() && listenerByName.isEmpty()) return;
if (listeners.isEmpty() && listenerByName.isEmpty()) {
updateProcessedByAndReleaseLock(msg);
log.debug("no listener registered... not processing message");
return;
}
// Query<? extends Msg> q = morphium.createQueryFor(m.getClass()).f("_id").eq(m.getMsgId());
// q.setCollectionName(getCollectionName());
// morphium.push(q, Msg.Fields.receivedBy, id);
Expand Down
45 changes: 45 additions & 0 deletions test/de/caluga/test/mongo/suite/MessagingTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -637,13 +637,58 @@ public void answeringTest() throws Exception {

}

@Test
public void answers3NodesTest() throws Exception {
Messaging m1=new Messaging(morphium, 10, false,true,10);
m1.setSenderId("m1");
Messaging m2=new Messaging(morphium, 10, false,true,10);
m2.setSenderId("m2");
Messaging mSrv=new Messaging(morphium, 10, false,true,10);
mSrv.setSenderId("Srv");

m1.start();
m2.start();
mSrv.start();

mSrv.addListenerForMessageNamed("query",(msg,m)->{
log.info("Incoming message - sending result");
Msg answer=m.createAnswerMsg();
answer.setValue("Result");
return answer;
});
Thread.sleep(1000);


for (int i=0;i<10; i++) {
Msg m = new Msg("query", "a message", "a query");
m.setExclusive(true);
log.info("Sending m1...");
Msg answer1=m1.sendAndAwaitFirstAnswer(m,1000);
assert(answer1 != null);
m = new Msg("query", "a message", "a query");
log.info("... got it. Sending m2");
Msg answer2=m2.sendAndAwaitFirstAnswer(m,1000);
assert(answer2!=null);
log.info("... got it.");
}

}

@Test
public void getAnswersTest() throws Exception {
Messaging m1=new Messaging(morphium, 10, false,true,10);
Messaging m2=new Messaging(morphium, 10, false,true,10);
Messaging mTst=new Messaging(morphium, 10, false,true,10);

m1.start();
m2.start();
mTst.start();


mTst.addListenerForMessageNamed("somethign else",(msg,m)->{
log.info("incoming message??");
return null;
});

m2.addListenerForMessageNamed("question",(msg,m)->{
Msg answer=m.createAnswerMsg();
Expand Down

0 comments on commit 75de569

Please sign in to comment.