Skip to content

Commit

Permalink
implemented QueueDAO.containsMessage() for mysql and postgres (Netfli…
Browse files Browse the repository at this point in the history
  • Loading branch information
Yingsheng-eroad authored Oct 8, 2020
1 parent 75db6af commit 76bf84f
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -268,4 +268,11 @@ private void createQueueIfNotExists(Connection connection, String queueName) {
execute(connection, CREATE_QUEUE, q -> q.addParameter(queueName).executeUpdate());
}
}

@Override
public boolean containsMessage(String queueName, String messageId) {
final String EXISTS_QUEUE = "SELECT EXISTS(SELECT 1 FROM queue_message WHERE queue_name = ? AND message_id = ? )";
boolean exists = queryWithTransaction(EXISTS_QUEUE, q -> q.addParameter(queueName).addParameter(messageId).exists());
return exists;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public void complexQueueTest() {

for(int i = 0; i < 10; i++) {
String messageId = "msg" + i;
assertTrue(dao.containsMessage(queueName, messageId));
dao.remove(queueName, messageId);
}

Expand All @@ -119,6 +120,33 @@ public void complexQueueTest() {
assertEquals(0, size);
}

/**
* Test fix for https://github.com/Netflix/conductor/issues/1892
*
* */
@Test
public void containsMessageTest() {
String queueName = "TestQueue";
long offsetTimeInSecond = 0;

for(int i = 0; i < 10; i++) {
String messageId = "msg" + i;
dao.push(queueName, messageId, offsetTimeInSecond);
}
int size = dao.getSize(queueName);
assertEquals(10, size);

for(int i = 0; i < 10; i++) {
String messageId = "msg" + i;
assertTrue(dao.containsMessage(queueName, messageId));
dao.remove(queueName, messageId);
}
for(int i = 0; i < 10; i++) {
String messageId = "msg" + i;
assertFalse(dao.containsMessage(queueName, messageId));
}
}

/**
* Test fix for https://github.com/Netflix/conductor/issues/399
* @since 1.8.2-rc5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,4 +276,11 @@ private void createQueueIfNotExists(Connection connection, String queueName) {
execute(connection, CREATE_QUEUE, q -> q.addParameter(queueName).executeUpdate());
}
}

@Override
public boolean containsMessage(String queueName, String messageId) {
final String EXISTS_QUEUE = "SELECT EXISTS(SELECT 1 FROM queue_message WHERE queue_name = ? AND message_id = ? )";
boolean exists = queryWithTransaction(EXISTS_QUEUE, q -> q.addParameter(queueName).addParameter(messageId).exists());
return exists;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public void complexQueueTest() {

for(int i = 0; i < 10; i++) {
String messageId = "msg" + i;
assertTrue(dao.containsMessage(queueName, messageId));
dao.remove(queueName, messageId);
}

Expand Down Expand Up @@ -186,6 +187,33 @@ public void pollMessagesTest() {
}
}


/**
* Test fix for https://github.com/Netflix/conductor/issues/1892
*
* */
@Test
public void containsMessageTest() {
String queueName = "TestQueue";
long offsetTimeInSecond = 0;

for(int i = 0; i < 10; i++) {
String messageId = "msg" + i;
dao.push(queueName, messageId, offsetTimeInSecond);
}
int size = dao.getSize(queueName);
assertEquals(10, size);

for(int i = 0; i < 10; i++) {
String messageId = "msg" + i;
assertTrue(dao.containsMessage(queueName, messageId));
dao.remove(queueName, messageId);
}
for(int i = 0; i < 10; i++) {
String messageId = "msg" + i;
assertFalse(dao.containsMessage(queueName, messageId));
}
}
/**
* Test fix for https://github.com/Netflix/conductor/issues/448
* @since 1.8.2-rc5
Expand Down

0 comments on commit 76bf84f

Please sign in to comment.