Skip to content

Commit

Permalink
In processAllUnacks, change it so only the unacks from the past get r…
Browse files Browse the repository at this point in the history
…eset (Netflix#1875)

* only reset unacks from the past

* fix the test cases so that it matches the change to the code

* fix the test cases so that it matches the change to the code

* fix the test cases so that it matches the change to the code

* fix the test cases so that it matches the change to the code

* fix the test cases so that it matches the change to the code

* fix the test cases so that it matches the change to the code

* fix the test cases so that it matches the change to the code

* fix the test cases so that it matches the change to the code

* fix the test cases so that it matches the change to the code

* fix the test cases so that it matches the change to the code

* fix the test cases so that it matches the change to the code

* fix the test cases so that it matches the change to the code

* fix the test cases so that it matches the change to the code

* fix the test cases so that it matches the change to the code

Co-authored-by: u447 <rick.fishman@bcbsfl.com>
  • Loading branch information
rickfish and u447 authored Oct 13, 2020
1 parent fc37811 commit e8128f0
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,13 @@ public void processAllUnacks() {
logger.trace("processAllUnacks started");


final String PROCESS_ALL_UNACKS = "UPDATE queue_message SET popped = false WHERE popped = true AND TIMESTAMPADD(SECOND,60,CURRENT_TIMESTAMP) > deliver_on";
final String PROCESS_ALL_UNACKS = "UPDATE queue_message SET popped = false WHERE popped = true AND TIMESTAMPADD(SECOND,-60,CURRENT_TIMESTAMP) > deliver_on";
executeWithTransaction(PROCESS_ALL_UNACKS, Query::executeUpdate);
}

@Override
public void processUnacks(String queueName) {
final String PROCESS_UNACKS = "UPDATE queue_message SET popped = false WHERE queue_name = ? AND popped = true AND TIMESTAMPADD(SECOND,60,CURRENT_TIMESTAMP) > deliver_on";
final String PROCESS_UNACKS = "UPDATE queue_message SET popped = false WHERE queue_name = ? AND popped = true AND TIMESTAMPADD(SECOND,-60,CURRENT_TIMESTAMP) > deliver_on";
executeWithTransaction(PROCESS_UNACKS, q -> q.addParameter(queueName).executeUpdate());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,14 +329,14 @@ public void processUnacksTest() {
Map<String, Map<String, Map<String, Long>>> details = dao.queuesDetailVerbose();
uacked = details.get(queueName).get("a").get("uacked");
assertNotNull(uacked);
assertEquals("There should be no unacked messages", uacked.longValue(), 0);
assertEquals("The messages that were polled should be unacked still", uacked.longValue(), unackedCount - 1);

Long otherUacked = details.get(otherQueueName).get("a").get("uacked");
assertNotNull(otherUacked);
assertEquals("Other queue should have unacked messages", otherUacked.longValue(), count);
assertEquals("Other queue should have all unacked messages", otherUacked.longValue(), count);

Long size = dao.queuesDetail().get(queueName);
assertNotNull(size);
assertEquals(size.longValue(), count - 1);
assertEquals(size.longValue(), count - unackedCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,13 @@ public void processAllUnacks() {

logger.trace("processAllUnacks started");

final String PROCESS_ALL_UNACKS = "UPDATE queue_message SET popped = false WHERE popped = true AND (current_timestamp + (60 ||' seconds')::interval) > deliver_on";
final String PROCESS_ALL_UNACKS = "UPDATE queue_message SET popped = false WHERE popped = true AND (current_timestamp - (60 ||' seconds')::interval) > deliver_on";
executeWithTransaction(PROCESS_ALL_UNACKS, Query::executeUpdate);
}

@Override
public void processUnacks(String queueName) {
final String PROCESS_UNACKS = "UPDATE queue_message SET popped = false WHERE queue_name = ? AND popped = true AND (current_timestamp + (60 ||' seconds')::interval) > deliver_on";
final String PROCESS_UNACKS = "UPDATE queue_message SET popped = false WHERE queue_name = ? AND popped = true AND (current_timestamp - (60 ||' seconds')::interval) > deliver_on";
executeWithTransaction(PROCESS_UNACKS, q -> q.addParameter(queueName).executeUpdate());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,14 +341,14 @@ public void processUnacksTest() {
Map<String, Map<String, Map<String, Long>>> details = dao.queuesDetailVerbose();
uacked = details.get(queueName).get("a").get("uacked");
assertNotNull(uacked);
assertEquals("There should be no unacked messages", uacked.longValue(), 0);
assertEquals("The messages that were polled should be unacked still", uacked.longValue(), unackedCount - 1);

Long otherUacked = details.get(otherQueueName).get("a").get("uacked");
assertNotNull(otherUacked);
assertEquals("Other queue should have unacked messages", otherUacked.longValue(), count);
assertEquals("Other queue should have all unacked messages", otherUacked.longValue(), count);

Long size = dao.queuesDetail().get(queueName);
assertNotNull(size);
assertEquals(size.longValue(), count - 1);
assertEquals(size.longValue(), count - unackedCount);
}
}

0 comments on commit e8128f0

Please sign in to comment.