From 75de569b16a35eede55393757083dfa83d36b368 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stephan=20B=C3=B6sebeck?= Date: Thu, 11 Apr 2019 21:56:36 +0200 Subject: [PATCH] fixing deadlock in messaging --- .../caluga/morphium/messaging/Messaging.java | 8 +++- .../test/mongo/suite/MessagingTest.java | 45 +++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/src/de/caluga/morphium/messaging/Messaging.java b/src/de/caluga/morphium/messaging/Messaging.java index 263ae86a3..914aa63f5 100644 --- a/src/de/caluga/morphium/messaging/Messaging.java +++ b/src/de/caluga/morphium/messaging/Messaging.java @@ -458,6 +458,7 @@ private void lockAndProcess(Msg obj) { if (obj != null && obj.getLockedBy() != null && obj.getLockedBy().equals(id)) { List lst = new ArrayList<>(); lst.add(obj); + log.debug("locked messages: "+lst.size()); try { processMessages(lst); } catch (Exception e) { @@ -477,13 +478,18 @@ private void processMessages(Iterable messages) { if (msg == null) continue; if (msg.getInAnswerTo() != null && waitingForMessages.get(msg.getInAnswerTo()) != null) { + log.debug("Got a message, we are waiting for..."); //this message we were waiting for waitingForAnswers.put((MorphiumId) msg.getInAnswerTo(), msg); processing.remove(m.getMsgId()); morphium.delete(msg, getCollectionName()); return; } - if (listeners.isEmpty() && listenerByName.isEmpty()) return; + if (listeners.isEmpty() && listenerByName.isEmpty()) { + updateProcessedByAndReleaseLock(msg); + log.debug("no listener registered... not processing message"); + return; + } // Query q = morphium.createQueryFor(m.getClass()).f("_id").eq(m.getMsgId()); // q.setCollectionName(getCollectionName()); // morphium.push(q, Msg.Fields.receivedBy, id); diff --git a/test/de/caluga/test/mongo/suite/MessagingTest.java b/test/de/caluga/test/mongo/suite/MessagingTest.java index fdec004b9..781b7a8e4 100644 --- a/test/de/caluga/test/mongo/suite/MessagingTest.java +++ b/test/de/caluga/test/mongo/suite/MessagingTest.java @@ -637,13 +637,58 @@ public void answeringTest() throws Exception { } + @Test + public void answers3NodesTest() throws Exception { + Messaging m1=new Messaging(morphium, 10, false,true,10); + m1.setSenderId("m1"); + Messaging m2=new Messaging(morphium, 10, false,true,10); + m2.setSenderId("m2"); + Messaging mSrv=new Messaging(morphium, 10, false,true,10); + mSrv.setSenderId("Srv"); + + m1.start(); + m2.start(); + mSrv.start(); + + mSrv.addListenerForMessageNamed("query",(msg,m)->{ + log.info("Incoming message - sending result"); + Msg answer=m.createAnswerMsg(); + answer.setValue("Result"); + return answer; + }); + Thread.sleep(1000); + + + for (int i=0;i<10; i++) { + Msg m = new Msg("query", "a message", "a query"); + m.setExclusive(true); + log.info("Sending m1..."); + Msg answer1=m1.sendAndAwaitFirstAnswer(m,1000); + assert(answer1 != null); + m = new Msg("query", "a message", "a query"); + log.info("... got it. Sending m2"); + Msg answer2=m2.sendAndAwaitFirstAnswer(m,1000); + assert(answer2!=null); + log.info("... got it."); + } + + } + @Test public void getAnswersTest() throws Exception { Messaging m1=new Messaging(morphium, 10, false,true,10); Messaging m2=new Messaging(morphium, 10, false,true,10); + Messaging mTst=new Messaging(morphium, 10, false,true,10); m1.start(); m2.start(); + mTst.start(); + + + mTst.addListenerForMessageNamed("somethign else",(msg,m)->{ + log.info("incoming message??"); + return null; + }); m2.addListenerForMessageNamed("question",(msg,m)->{ Msg answer=m.createAnswerMsg();