Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

MySQL queues: reduce the impact of racing by popping the queue 1 by 1 #1355

Merged
merged 1 commit into from
Nov 17, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
MySQL queues: reduce the impact of racing by popping the queue messag…
…es one by one
  • Loading branch information
ggrekhov committed Oct 15, 2019
commit be29a58735e6a394765974bb427efc060788d5a7
Original file line number Diff line number Diff line change
Expand Up @@ -242,19 +242,16 @@ private List<Message> popMessages(Connection connection, String queueName, int c
return messages;
}

final String POP_MESSAGES = "UPDATE queue_message SET popped = true WHERE queue_name = ? AND message_id IN (%s) AND popped = false";
List<Message> poppedMessages = new ArrayList<>();
for (Message message: messages) {
final String POP_MESSAGE = "UPDATE queue_message SET popped = true WHERE queue_name = ? AND message_id = ? AND popped = false";
int result = query(connection, POP_MESSAGE, q -> q.addParameter(queueName).addParameter(message.getId()).executeUpdate());

final List<String> Ids = messages.stream().map(Message::getId).collect(Collectors.toList());
final String query = String.format(POP_MESSAGES, Query.generateInBindings(messages.size()));

int result = query(connection, query, q -> q.addParameter(queueName).addParameters(Ids).executeUpdate());

if (result != messages.size()) {
String message = String.format("Could not pop all messages for given ids: %s (%d messages were popped)",
Ids, result);
throw new ApplicationException(ApplicationException.Code.BACKEND_ERROR, message);
if (result == 1) {
poppedMessages.add(message);
}
}
return messages;
return poppedMessages;
}


Expand Down