Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
sboesebeck committed Apr 18, 2019
2 parents 280a179 + faa1e47 commit 9d779a4
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 15 deletions.
89 changes: 76 additions & 13 deletions src/de/caluga/morphium/messaging/Messaging.java
Original file line number Diff line number Diff line change
Expand Up @@ -238,15 +238,33 @@ public void run() {
try {
if (evt == null || evt.getOperationType() == null) return running;
if (evt.getOperationType().equals("insert")) {
// if (log.isDebugEnabled())
// log.debug(getSenderId() + ": incoming message: " + evt.getFullDocument().get("_id") + " inAnswerTo: " + evt.getFullDocument().get("in_answer_to"));
//insert => new Message
// log.debug("New message incoming");
Msg obj = morphium.getMapper().deserialize(Msg.class, evt.getFullDocument());
if (obj.getInAnswerTo() != null && waitingForMessages.containsKey(obj.getInAnswerTo())) {
if (log.isDebugEnabled())
log.debug("processing answer " + obj.getMsgId() + " in answer to " + obj.getInAnswerTo());
List<Msg> lst = new ArrayList<>();
lst.add(obj);
try {
processMessages(lst);
} catch (Exception e) {
log.error("Error during message processing ", e);
}
return running;
}
if (listenerByName.get(obj.getName()) == null && listeners.size() == 0) {
//ignoring incoming message, we do not have listener for
return running;
}
if (obj.getSender().equals(id) || (obj.getProcessedBy() != null && obj.getProcessedBy().contains(id)) || (obj.getRecipient() != null && !obj.getRecipient().equals(id))) {
//ignoring my own messages
return running;
}
if (pauseMessages.containsKey(obj.getName())) {
log.debug("Not processing message - processing paused for " + obj.getName());
if (log.isDebugEnabled())
log.debug("Not processing message - processing paused for " + obj.getName());
return running;
}
//do not process messages, that are exclusive, but already processed or not for me / all
Expand Down Expand Up @@ -282,8 +300,30 @@ public void run() {
// if (((Map<String,Object>)data.get("o")).get("$set")!=null){
// //there is a set-update
// }

if (evt.getFullDocument() != null && evt.getFullDocument().get("_id") != null) {
Msg obj = morphium.findById(Msg.class, new MorphiumId(evt.getFullDocument().get("_id").toString()), getCollectionName());
if (obj == null) {
return running; //was deleted?
}
if (obj.getInAnswerTo() != null && waitingForMessages.containsKey(obj.getInAnswerTo())) {
if (obj.isExclusive()) {
lockAndProcess(obj);
} else {
List<Msg> lst = new ArrayList<>();
lst.add(obj);

try {
processMessages(lst);
} catch (Exception e) {
log.error("Error during message processing ", e);
}
}
}
if (listenerByName.get(obj.getName()) == null && listeners.size() == 0) {
if (obj.getInAnswerTo() == null || !waitingForMessages.containsKey(obj.getInAnswerTo()))
return running;
}
if (obj != null && obj.isExclusive() && obj.getLockedBy() == null && !pauseMessages.containsKey(obj.getName()) && (obj.getRecipient() == null || obj.getRecipient().equals(id))) {
log.debug("Update of msg - trying to lock");
// locking
Expand Down Expand Up @@ -389,6 +429,13 @@ private MorphiumIterator<Msg> findMessages(String name, boolean multiple) {
or1.f(Msg.Fields.name).nin(pausedMessagesKeys);
or2.f(Msg.Fields.name).nin(pausedMessagesKeys);
}
if (listeners.isEmpty() && !listenerByName.isEmpty()) {
or1.f(Msg.Fields.name).in(listenerByName.keySet());
or2.f(Msg.Fields.name).in(listenerByName.keySet());
} else if (listenerByName.isEmpty() && listeners.isEmpty()) {
return q.q().f(Msg.Fields.msgId).eq("123445").asIterable();
}

}
ArrayList<MorphiumId> processingIds = new ArrayList<>(processing);
if (!processing.isEmpty()) {
Expand Down Expand Up @@ -458,7 +505,8 @@ private void lockAndProcess(Msg obj) {
if (obj != null && obj.getLockedBy() != null && obj.getLockedBy().equals(id)) {
List<Msg> lst = new ArrayList<>();
lst.add(obj);
log.debug("locked messages: " + lst.size());
if (log.isDebugEnabled())
log.debug("locked messages: " + lst.size());
try {
processMessages(lst);
} catch (Exception e) {
Expand All @@ -479,7 +527,8 @@ private void processMessages(Iterable<Msg> messages) {

//noinspection SuspiciousMethodCalls
if (msg.getInAnswerTo() != null && waitingForMessages.get(msg.getInAnswerTo()) != null) {
log.debug("Got a message, we are waiting for...");
if (log.isDebugEnabled())
log.debug(getSenderId() + ": Got a message, we are waiting for...");
//this message we were waiting for
waitingForAnswers.put((MorphiumId) msg.getInAnswerTo(), msg);
processing.remove(m.getMsgId());
Expand All @@ -488,7 +537,7 @@ private void processMessages(Iterable<Msg> messages) {
}
if (listeners.isEmpty() && listenerByName.isEmpty()) {
updateProcessedByAndReleaseLock(msg);
log.debug("no listener registered... not processing message");
log.error(getSenderId() + ": should not be here. not processing message, as no listeners are defined " + msg.getMsgId());
return;
}
// Query<? extends Msg> q = morphium.createQueryFor(m.getClass()).f("_id").eq(m.getMsgId());
Expand Down Expand Up @@ -522,7 +571,8 @@ private void processMessages(Iterable<Msg> messages) {

if (msg.getTtl() < System.currentTimeMillis() - msg.getTimestamp()) {
//Delete outdated msg!
log.debug("Found outdated message - deleting it!");
if (log.isDebugEnabled())
log.debug(getSenderId() + ": Found outdated message - deleting it!");
morphium.delete(msg, getCollectionName());
processing.remove(m.getMsgId());
return;
Expand All @@ -535,7 +585,8 @@ private void processMessages(Iterable<Msg> messages) {
lst.addAll(listenerByName.get(msg.getName()));
}
if (lst.isEmpty()) {
log.debug("Message did not have a listener registered");
if (log.isDebugEnabled())
log.debug(getSenderId() + ": Message did not have a listener registered");
wasProcessed = true;
}
for (MessageListener l : lst) {
Expand All @@ -547,6 +598,11 @@ private void processMessages(Iterable<Msg> messages) {
}
if (answer != null) {
msg.sendAnswer(Messaging.this, answer);
if (log.isDebugEnabled())
log.debug("sent answer to " + answer.getInAnswerTo() + " recipient: " + answer.getRecipient() + " id: " + answer.getMsgId());
if (answer.getRecipient() == null) {
log.error("Recipeient of answer is null?!?!");
}
}
} catch (MessageRejectedException mre) {
log.warn("Message was rejected by listener", mre);
Expand All @@ -572,21 +628,23 @@ private void processMessages(Iterable<Msg> messages) {
}
}
}
if (!wasProcessed && !lst.isEmpty()) {
if (!wasProcessed && !wasRejected) {
// msg.addAdditional("Processing of message failed by "+getSenderId()+": "+t.getMessage());
log.error("message was not processed");
} else if (wasRejected) {
log.debug("Message rejected");
}

// if (msg.getType().equals(MsgType.SINGLE)) {
// //removing it
// morphium.delete(msg, getCollectionName());
// }
//updating it to be processed by others...
if ((msg.getLockedBy() != null && msg.getLockedBy().equals("ALL")) || (msg.getRecipient()!=null && msg.getRecipient().equals(id) && msg.getInAnswerTo() != null)) {
if ((msg.getLockedBy() != null && msg.getLockedBy().equals("ALL")) || (msg.getRecipient() != null && msg.getRecipient().equals(id) && msg.getInAnswerTo() != null)) {
updateProcessedByAndReleaseLock(msg);
} else {
//Exclusive message
morphium.delete(msg,getCollectionName());
morphium.delete(msg, getCollectionName());
// msg.addProcessedId(id);
// msg.setLockedBy(null);
// msg.setLocked(0);
Expand Down Expand Up @@ -725,11 +783,13 @@ public void terminate() {
running = false;
if (decouplePool != null) {
int sz = decouplePool.shutdownNow().size();
log.debug("Shutting down with " + sz + " runnables still scheduled");
if (log.isDebugEnabled())
log.debug("Shutting down with " + sz + " runnables still scheduled");
}
if (threadPool != null) {
int sz = threadPool.shutdownNow().size();
log.debug("Shutting down with " + sz + " runnables still pending in pool");
if (log.isDebugEnabled())
log.debug("Shutting down with " + sz + " runnables still pending in pool");
}
if (changeStreamMonitor != null) changeStreamMonitor.stop();
sendMessageToSelf(new Msg("info", "going down", "now"));
Expand Down Expand Up @@ -836,7 +896,7 @@ private void sendMessageToSelf(Msg m, boolean async) {
m.setSender("self");
m.setRecipient(id);
m.setSenderHost(hostname);
morphium.storeNoCache(m,getCollectionName());
morphium.storeNoCache(m, getCollectionName());
}

public boolean isAutoAnswer() {
Expand Down Expand Up @@ -880,6 +940,9 @@ public <T extends Msg> T sendAndAwaitFirstAnswer(T theMessage, long timeoutInMs)
}
Thread.yield();
}
if (log.isDebugEnabled()) {
log.debug("got message after: " + (System.currentTimeMillis() - start) + "ms");
}
waitingForMessages.remove(theMessage.getMsgId());
return (T) waitingForAnswers.remove(theMessage.getMsgId());
}
Expand Down
6 changes: 4 additions & 2 deletions src/de/caluga/morphium/messaging/Msg.java
Original file line number Diff line number Diff line change
Expand Up @@ -362,9 +362,11 @@ public Msg createAnswerMsg() {

public void sendAnswer(Messaging messaging, Msg m) {
m.setInAnswerTo(this.msgId);
m.addRecipient(this.getSender());
//m.addRecipient(this.getSender());
m.setRecipient(this.getSender());
m.setDeleteAt(new Date(System.currentTimeMillis() + m.getTtl()));
messaging.queueMessage(m);
m.setMsgId(new MorphiumId());
messaging.storeMessage(m);
}


Expand Down
84 changes: 84 additions & 0 deletions test/de/caluga/test/mongo/suite/MessagingTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,90 @@ public void answeringTest() throws Exception {

}


@Test
public void answerExclusiveMessagesTest() throws Exception {
Messaging m1 = new Messaging(morphium, 10, false, true, 10);
m1.setSenderId("m1");
Messaging m2 = new Messaging(morphium, 10, false, true, 10);
m2.setSenderId("m2");
Messaging m3 = new Messaging(morphium, 10, false, true, 10);
m3.setSenderId("m3");
m1.start();
m2.start();
m3.start();

m3.addListenerForMessageNamed("test", (msg, m) -> {
log.info("INcoming message");
return m.createAnswerMsg();
});

Msg m = new Msg("test", "important", "value");
m.setExclusive(true);
Msg answer = m1.sendAndAwaitFirstAnswer(m, 6000);
Thread.sleep(500);
assert (answer != null);
assert (answer.getProcessedBy().size() == 1);
assert (answer.getProcessedBy().contains("m3"));
}


@Test
public void ignoringMessagesTest() throws Exception {
Messaging m1 = new Messaging(morphium, 10, false, true, 10);
m1.setSenderId("m1");
Messaging m2 = new Messaging(morphium, 10, false, true, 10);
m2.setSenderId("m2");
m1.start();
m2.start();

Msg m = new Msg("test", "ignore me please", "value");
m1.storeMessage(m);
Thread.sleep(1000);
m = morphium.reread(m);
assert (m.getProcessedBy().size() == 1) : "wrong number of proccessed by entries: " + m.getProcessedBy().size();
}

@Test
public void severalMessagingsTest() throws Exception {
Messaging m1 = new Messaging(morphium, 10, false, true, 10);
m1.setSenderId("m1");
Messaging m2 = new Messaging(morphium, 10, false, true, 10);
m2.setSenderId("m2");
Messaging m3 = new Messaging(morphium, 10, false, true, 10);
m3.setSenderId("m3");
m1.start();
m2.start();
m3.start();

m3.addListenerForMessageNamed("test", (msg, m) -> {
//log.info("Got message: "+m.getName());
log.info("Sending answer for " + m.getMsgId());
return new Msg("test", "answer", "value", 600000);
});

procCounter.set(0);
for (int i = 0; i < 180; i++) {
new Thread() {
public void run() {
Msg m = new Msg("test", "nothing", "value");
m.setTtl(60000000);
Msg a = m1.sendAndAwaitFirstAnswer(m, 6000);
assert (a != null);
procCounter.incrementAndGet();
}
}.start();

}
while (procCounter.get() < 150) {
Thread.yield();
}

}




@Test
public void answers3NodesTest() throws Exception {
Messaging m1 = new Messaging(morphium, 10, false, true, 10);
Expand Down

0 comments on commit 9d779a4

Please sign in to comment.