Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

In processAllUnacks, change it so only the unacks from the past get reset #1875

Merged
merged 15 commits into from
Oct 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,13 @@ public void processAllUnacks() {
logger.trace("processAllUnacks started");


final String PROCESS_ALL_UNACKS = "UPDATE queue_message SET popped = false WHERE popped = true AND TIMESTAMPADD(SECOND,60,CURRENT_TIMESTAMP) > deliver_on";
final String PROCESS_ALL_UNACKS = "UPDATE queue_message SET popped = false WHERE popped = true AND TIMESTAMPADD(SECOND,-60,CURRENT_TIMESTAMP) > deliver_on";
executeWithTransaction(PROCESS_ALL_UNACKS, Query::executeUpdate);
}

@Override
public void processUnacks(String queueName) {
final String PROCESS_UNACKS = "UPDATE queue_message SET popped = false WHERE queue_name = ? AND popped = true AND TIMESTAMPADD(SECOND,60,CURRENT_TIMESTAMP) > deliver_on";
final String PROCESS_UNACKS = "UPDATE queue_message SET popped = false WHERE queue_name = ? AND popped = true AND TIMESTAMPADD(SECOND,-60,CURRENT_TIMESTAMP) > deliver_on";
executeWithTransaction(PROCESS_UNACKS, q -> q.addParameter(queueName).executeUpdate());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,14 +301,14 @@ public void processUnacksTest() {
Map<String, Map<String, Map<String, Long>>> details = dao.queuesDetailVerbose();
uacked = details.get(queueName).get("a").get("uacked");
assertNotNull(uacked);
assertEquals("There should be no unacked messages", uacked.longValue(), 0);
assertEquals("The messages that were polled should be unacked still", uacked.longValue(), unackedCount - 1);

Long otherUacked = details.get(otherQueueName).get("a").get("uacked");
assertNotNull(otherUacked);
assertEquals("Other queue should have unacked messages", otherUacked.longValue(), count);
assertEquals("Other queue should have all unacked messages", otherUacked.longValue(), count);

Long size = dao.queuesDetail().get(queueName);
assertNotNull(size);
assertEquals(size.longValue(), count - 1);
assertEquals(size.longValue(), count - unackedCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,13 @@ public void processAllUnacks() {

logger.trace("processAllUnacks started");

final String PROCESS_ALL_UNACKS = "UPDATE queue_message SET popped = false WHERE popped = true AND (current_timestamp + (60 ||' seconds')::interval) > deliver_on";
final String PROCESS_ALL_UNACKS = "UPDATE queue_message SET popped = false WHERE popped = true AND (current_timestamp - (60 ||' seconds')::interval) > deliver_on";
executeWithTransaction(PROCESS_ALL_UNACKS, Query::executeUpdate);
}

@Override
public void processUnacks(String queueName) {
final String PROCESS_UNACKS = "UPDATE queue_message SET popped = false WHERE queue_name = ? AND popped = true AND (current_timestamp + (60 ||' seconds')::interval) > deliver_on";
final String PROCESS_UNACKS = "UPDATE queue_message SET popped = false WHERE queue_name = ? AND popped = true AND (current_timestamp - (60 ||' seconds')::interval) > deliver_on";
executeWithTransaction(PROCESS_UNACKS, q -> q.addParameter(queueName).executeUpdate());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,14 +313,14 @@ public void processUnacksTest() {
Map<String, Map<String, Map<String, Long>>> details = dao.queuesDetailVerbose();
uacked = details.get(queueName).get("a").get("uacked");
assertNotNull(uacked);
assertEquals("There should be no unacked messages", uacked.longValue(), 0);
assertEquals("The messages that were polled should be unacked still", uacked.longValue(), unackedCount - 1);

Long otherUacked = details.get(otherQueueName).get("a").get("uacked");
assertNotNull(otherUacked);
assertEquals("Other queue should have unacked messages", otherUacked.longValue(), count);
assertEquals("Other queue should have all unacked messages", otherUacked.longValue(), count);

Long size = dao.queuesDetail().get(queueName);
assertNotNull(size);
assertEquals(size.longValue(), count - 1);
assertEquals(size.longValue(), count - unackedCount);
}
}