From 76bf84f63d0c5bc680b3e1fc60282be0de441621 Mon Sep 17 00:00:00 2001 From: Yingsheng Tian <58492046+Yingsheng-eroad@users.noreply.github.com> Date: Fri, 9 Oct 2020 10:45:16 +1300 Subject: [PATCH] implemented QueueDAO.containsMessage() for mysql and postgres (#1899) --- .../conductor/dao/mysql/MySQLQueueDAO.java | 7 +++++ .../dao/mysql/MySQLQueueDAOTest.java | 28 +++++++++++++++++++ .../dao/postgres/PostgresQueueDAO.java | 7 +++++ .../dao/postgres/PostgresQueueDAOTest.java | 28 +++++++++++++++++++ 4 files changed, 70 insertions(+) diff --git a/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLQueueDAO.java b/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLQueueDAO.java index 2d863fa799..4391c80aa0 100644 --- a/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLQueueDAO.java +++ b/mysql-persistence/src/main/java/com/netflix/conductor/dao/mysql/MySQLQueueDAO.java @@ -268,4 +268,11 @@ private void createQueueIfNotExists(Connection connection, String queueName) { execute(connection, CREATE_QUEUE, q -> q.addParameter(queueName).executeUpdate()); } } + + @Override + public boolean containsMessage(String queueName, String messageId) { + final String EXISTS_QUEUE = "SELECT EXISTS(SELECT 1 FROM queue_message WHERE queue_name = ? AND message_id = ? )"; + boolean exists = queryWithTransaction(EXISTS_QUEUE, q -> q.addParameter(queueName).addParameter(messageId).exists()); + return exists; + } } diff --git a/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/MySQLQueueDAOTest.java b/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/MySQLQueueDAOTest.java index bce2f5bdcf..5c865634a2 100644 --- a/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/MySQLQueueDAOTest.java +++ b/mysql-persistence/src/test/java/com/netflix/conductor/dao/mysql/MySQLQueueDAOTest.java @@ -104,6 +104,7 @@ public void complexQueueTest() { for(int i = 0; i < 10; i++) { String messageId = "msg" + i; + assertTrue(dao.containsMessage(queueName, messageId)); dao.remove(queueName, messageId); } @@ -119,6 +120,33 @@ public void complexQueueTest() { assertEquals(0, size); } + /** + * Test fix for https://github.com/Netflix/conductor/issues/1892 + * + * */ + @Test + public void containsMessageTest() { + String queueName = "TestQueue"; + long offsetTimeInSecond = 0; + + for(int i = 0; i < 10; i++) { + String messageId = "msg" + i; + dao.push(queueName, messageId, offsetTimeInSecond); + } + int size = dao.getSize(queueName); + assertEquals(10, size); + + for(int i = 0; i < 10; i++) { + String messageId = "msg" + i; + assertTrue(dao.containsMessage(queueName, messageId)); + dao.remove(queueName, messageId); + } + for(int i = 0; i < 10; i++) { + String messageId = "msg" + i; + assertFalse(dao.containsMessage(queueName, messageId)); + } + } + /** * Test fix for https://github.com/Netflix/conductor/issues/399 * @since 1.8.2-rc5 diff --git a/postgres-persistence/src/main/java/com/netflix/conductor/dao/postgres/PostgresQueueDAO.java b/postgres-persistence/src/main/java/com/netflix/conductor/dao/postgres/PostgresQueueDAO.java index a5df2fe6b8..255833fb52 100644 --- a/postgres-persistence/src/main/java/com/netflix/conductor/dao/postgres/PostgresQueueDAO.java +++ b/postgres-persistence/src/main/java/com/netflix/conductor/dao/postgres/PostgresQueueDAO.java @@ -276,4 +276,11 @@ private void createQueueIfNotExists(Connection connection, String queueName) { execute(connection, CREATE_QUEUE, q -> q.addParameter(queueName).executeUpdate()); } } + + @Override + public boolean containsMessage(String queueName, String messageId) { + final String EXISTS_QUEUE = "SELECT EXISTS(SELECT 1 FROM queue_message WHERE queue_name = ? AND message_id = ? )"; + boolean exists = queryWithTransaction(EXISTS_QUEUE, q -> q.addParameter(queueName).addParameter(messageId).exists()); + return exists; + } } diff --git a/postgres-persistence/src/test/java/com/netflix/conductor/dao/postgres/PostgresQueueDAOTest.java b/postgres-persistence/src/test/java/com/netflix/conductor/dao/postgres/PostgresQueueDAOTest.java index 3481386b99..4359ba22e7 100644 --- a/postgres-persistence/src/test/java/com/netflix/conductor/dao/postgres/PostgresQueueDAOTest.java +++ b/postgres-persistence/src/test/java/com/netflix/conductor/dao/postgres/PostgresQueueDAOTest.java @@ -116,6 +116,7 @@ public void complexQueueTest() { for(int i = 0; i < 10; i++) { String messageId = "msg" + i; + assertTrue(dao.containsMessage(queueName, messageId)); dao.remove(queueName, messageId); } @@ -186,6 +187,33 @@ public void pollMessagesTest() { } } + + /** + * Test fix for https://github.com/Netflix/conductor/issues/1892 + * + * */ + @Test + public void containsMessageTest() { + String queueName = "TestQueue"; + long offsetTimeInSecond = 0; + + for(int i = 0; i < 10; i++) { + String messageId = "msg" + i; + dao.push(queueName, messageId, offsetTimeInSecond); + } + int size = dao.getSize(queueName); + assertEquals(10, size); + + for(int i = 0; i < 10; i++) { + String messageId = "msg" + i; + assertTrue(dao.containsMessage(queueName, messageId)); + dao.remove(queueName, messageId); + } + for(int i = 0; i < 10; i++) { + String messageId = "msg" + i; + assertFalse(dao.containsMessage(queueName, messageId)); + } + } /** * Test fix for https://github.com/Netflix/conductor/issues/448 * @since 1.8.2-rc5