diff --git a/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLQueueDAO.java b/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLQueueDAO.java index 4391c80aa0..86bea44329 100644 --- a/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLQueueDAO.java +++ b/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLQueueDAO.java @@ -160,13 +160,13 @@ public void processAllUnacks() { logger.trace("processAllUnacks started"); - final String PROCESS_ALL_UNACKS = "UPDATE queue_message SET popped = false WHERE popped = true AND TIMESTAMPADD(SECOND,60,CURRENT_TIMESTAMP) > deliver_on"; + final String PROCESS_ALL_UNACKS = "UPDATE queue_message SET popped = false WHERE popped = true AND TIMESTAMPADD(SECOND,-60,CURRENT_TIMESTAMP) > deliver_on"; executeWithTransaction(PROCESS_ALL_UNACKS, Query::executeUpdate); } @Override public void processUnacks(String queueName) { - final String PROCESS_UNACKS = "UPDATE queue_message SET popped = false WHERE queue_name = ? AND popped = true AND TIMESTAMPADD(SECOND,60,CURRENT_TIMESTAMP) > deliver_on"; + final String PROCESS_UNACKS = "UPDATE queue_message SET popped = false WHERE queue_name = ? AND popped = true AND TIMESTAMPADD(SECOND,-60,CURRENT_TIMESTAMP) > deliver_on"; executeWithTransaction(PROCESS_UNACKS, q -> q.addParameter(queueName).executeUpdate()); } diff --git a/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/MySQLQueueDAOTest.java b/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/MySQLQueueDAOTest.java index 5c865634a2..5af2d9868a 100644 --- a/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/MySQLQueueDAOTest.java +++ b/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/MySQLQueueDAOTest.java @@ -329,14 +329,14 @@ public void processUnacksTest() { Map>> details = dao.queuesDetailVerbose(); uacked = details.get(queueName).get("a").get("uacked"); assertNotNull(uacked); - assertEquals("There should be no unacked messages", uacked.longValue(), 0); + assertEquals("The messages that were polled should be unacked still", uacked.longValue(), unackedCount - 1); Long otherUacked = details.get(otherQueueName).get("a").get("uacked"); assertNotNull(otherUacked); - assertEquals("Other queue should have unacked messages", otherUacked.longValue(), count); + assertEquals("Other queue should have all unacked messages", otherUacked.longValue(), count); Long size = dao.queuesDetail().get(queueName); assertNotNull(size); - assertEquals(size.longValue(), count - 1); + assertEquals(size.longValue(), count - unackedCount); } } diff --git a/postgres-persistence/src/main/java/com/netflix/conductor/dao/postgres/PostgresQueueDAO.java b/postgres-persistence/src/main/java/com/netflix/conductor/dao/postgres/PostgresQueueDAO.java index 255833fb52..227cfc54e7 100644 --- a/postgres-persistence/src/main/java/com/netflix/conductor/dao/postgres/PostgresQueueDAO.java +++ b/postgres-persistence/src/main/java/com/netflix/conductor/dao/postgres/PostgresQueueDAO.java @@ -171,13 +171,13 @@ public void processAllUnacks() { logger.trace("processAllUnacks started"); - final String PROCESS_ALL_UNACKS = "UPDATE queue_message SET popped = false WHERE popped = true AND (current_timestamp + (60 ||' seconds')::interval) > deliver_on"; + final String PROCESS_ALL_UNACKS = "UPDATE queue_message SET popped = false WHERE popped = true AND (current_timestamp - (60 ||' seconds')::interval) > deliver_on"; executeWithTransaction(PROCESS_ALL_UNACKS, Query::executeUpdate); } @Override public void processUnacks(String queueName) { - final String PROCESS_UNACKS = "UPDATE queue_message SET popped = false WHERE queue_name = ? AND popped = true AND (current_timestamp + (60 ||' seconds')::interval) > deliver_on"; + final String PROCESS_UNACKS = "UPDATE queue_message SET popped = false WHERE queue_name = ? AND popped = true AND (current_timestamp - (60 ||' seconds')::interval) > deliver_on"; executeWithTransaction(PROCESS_UNACKS, q -> q.addParameter(queueName).executeUpdate()); } diff --git a/postgres-persistence/src/test/java/com/netflix/conductor/dao/postgres/PostgresQueueDAOTest.java b/postgres-persistence/src/test/java/com/netflix/conductor/dao/postgres/PostgresQueueDAOTest.java index 4359ba22e7..cdbcc39c38 100644 --- a/postgres-persistence/src/test/java/com/netflix/conductor/dao/postgres/PostgresQueueDAOTest.java +++ b/postgres-persistence/src/test/java/com/netflix/conductor/dao/postgres/PostgresQueueDAOTest.java @@ -341,14 +341,14 @@ public void processUnacksTest() { Map>> details = dao.queuesDetailVerbose(); uacked = details.get(queueName).get("a").get("uacked"); assertNotNull(uacked); - assertEquals("There should be no unacked messages", uacked.longValue(), 0); + assertEquals("The messages that were polled should be unacked still", uacked.longValue(), unackedCount - 1); Long otherUacked = details.get(otherQueueName).get("a").get("uacked"); assertNotNull(otherUacked); - assertEquals("Other queue should have unacked messages", otherUacked.longValue(), count); + assertEquals("Other queue should have all unacked messages", otherUacked.longValue(), count); Long size = dao.queuesDetail().get(queueName); assertNotNull(size); - assertEquals(size.longValue(), count - 1); + assertEquals(size.longValue(), count - unackedCount); } }