Skip to content

Fix KV Stores for same message in multiple groups #8737

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,6 +27,7 @@
import java.util.stream.Stream;

import org.springframework.jmx.export.annotation.ManagedAttribute;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

Expand All @@ -45,7 +46,7 @@ public abstract class AbstractKeyValueMessageStore extends AbstractMessageGroupS

protected static final String MESSAGE_KEY_PREFIX = "MESSAGE_";

protected static final String MESSAGE_GROUP_KEY_PREFIX = "MESSAGE_GROUP_";
protected static final String MESSAGE_GROUP_KEY_PREFIX = "GROUP_OF_MESSAGES_";

private final String messagePrefix;

Expand All @@ -57,9 +58,9 @@ protected AbstractKeyValueMessageStore() {

/**
* Construct an instance based on the provided prefix for keys to distinguish between
* different store instances in the same target key-value data base. Defaults to an
* different store instances in the same target key-value database. Defaults to an
* empty string - no prefix. The actual prefix for messages is
* {@code prefix + MESSAGE_}; for message groups - {@code prefix + MESSAGE_GROUP_}
* {@code prefix + MESSAGE_}; for message groups - {@code prefix + GROUP_OF_MESSAGES_}
* @param prefix the prefix to use
* @since 4.3.12
*/
Expand All @@ -71,18 +72,18 @@ protected AbstractKeyValueMessageStore(String prefix) {

/**
* Return the configured prefix for message keys to distinguish between different
* store instances in the same target key-value data base. Defaults to the
* store instances in the same target key-value database. Defaults to the
* {@value MESSAGE_KEY_PREFIX} - without a custom prefix.
* @return the prefix for keys
* @since 4.3.12
*/
protected String getMessagePrefix() {
public String getMessagePrefix() {
return this.messagePrefix;
}

/**
* Return the configured prefix for message group keys to distinguish between
* different store instances in the same target key-value data base. Defaults to the
* different store instances in the same target key-value database. Defaults to the
* {@value MESSAGE_GROUP_KEY_PREFIX} - without custom prefix.
* @return the prefix for keys
* @since 4.3.12
Expand Down Expand Up @@ -140,10 +141,15 @@ public <T> Message<T> addMessage(Message<T> message) {
}

protected void doAddMessage(Message<?> message) {
doAddMessage(message, null);
}

protected void doAddMessage(Message<?> message, @Nullable Object groupId) {
Assert.notNull(message, "'message' must not be null");
UUID messageId = message.getHeaders().getId();
Assert.notNull(messageId, "Cannot store messages without an ID header");
doStoreIfAbsent(this.messagePrefix + messageId, new MessageHolder(message));
String messageKey = this.messagePrefix + (groupId != null ? groupId.toString() + '_' : "") + messageId;
doStoreIfAbsent(messageKey, new MessageHolder(message));
}

@Override
Expand All @@ -165,7 +171,6 @@ public long getMessageCount() {
return (messageIds != null) ? messageIds.size() : 0;
}


// MessageGroupStore methods

/**
Expand Down Expand Up @@ -211,7 +216,7 @@ public void addMessagesToGroup(Object groupId, Message<?>... messages) {
}

for (Message<?> message : messages) {
doAddMessage(message);
doAddMessage(message, groupId);
if (metadata != null) {
metadata.add(message.getHeaders().getId());
}
Expand Down Expand Up @@ -253,7 +258,7 @@ public void removeMessagesFromGroup(Object groupId, Collection<Message<?>> messa

List<Object> messageIds = new ArrayList<>();
for (UUID id : ids) {
messageIds.add(this.messagePrefix + id);
messageIds.add(this.messagePrefix + groupId + '_' + id);
}

doRemoveAll(messageIds);
Expand Down Expand Up @@ -288,7 +293,7 @@ public void removeMessageGroup(Object groupId) {
List<Object> messageIds =
messageGroupMetadata.getMessageIds()
.stream()
.map(id -> this.messagePrefix + id)
.map(id -> this.messagePrefix + groupId + '_' + id)
.collect(Collectors.toList());

doRemoveAll(messageIds);
Expand Down Expand Up @@ -326,32 +331,55 @@ public Message<?> pollMessageFromGroup(Object groupId) {
groupMetadata.remove(firstId);
groupMetadata.setLastModified(System.currentTimeMillis());
doStore(this.groupPrefix + groupId, groupMetadata);
return removeMessage(firstId);
return removeMessageFromGroup(firstId, groupId);
}
}
return null;
}

private Message<?> removeMessageFromGroup(UUID id, Object groupId) {
Assert.notNull(id, "'id' must not be null");
Object object = doRemove(this.messagePrefix + groupId + '_' + id);
if (object != null) {
return extractMessage(object);
}
else {
return null;
}
}

@Override
public Message<?> getOneMessageFromGroup(Object groupId) {
MessageGroupMetadata groupMetadata = getGroupMetadata(groupId);
if (groupMetadata != null) {
UUID messageId = groupMetadata.firstId();
if (messageId != null) {
return getMessage(messageId);
return getMessageFromGroup(messageId, groupId);
}
}
return null;
}

@Nullable
private Message<?> getMessageFromGroup(UUID messageId, Object groupId) {
Assert.notNull(messageId, "'messageId' must not be null");
Object object = doRetrieve(this.messagePrefix + groupId + '_' + messageId);
if (object != null) {
return extractMessage(object);
}
else {
return null;
}
}

@Override
public Collection<Message<?>> getMessagesForGroup(Object groupId) {
MessageGroupMetadata groupMetadata = getGroupMetadata(groupId);
ArrayList<Message<?>> messages = new ArrayList<>();
if (groupMetadata != null) {
Iterator<UUID> messageIds = groupMetadata.messageIdIterator();
while (messageIds.hasNext()) {
messages.add(getMessage(messageIds.next()));
messages.add(getMessageFromGroup(messageIds.next(), groupId));
}
}
return messages;
Expand All @@ -362,7 +390,7 @@ public Stream<Message<?>> streamMessagesForGroup(Object groupId) {
return getGroupMetadata(groupId)
.getMessageIds()
.stream()
.map(this::getMessage);
.map((messageId) -> getMessageFromGroup(messageId, groupId));
}

@Override
Expand All @@ -376,8 +404,8 @@ public Iterator<MessageGroup> iterator() {

private Collection<String> normalizeKeys(Collection<String> keys) {
Set<String> normalizedKeys = new HashSet<>();
for (Object key : keys) {
String strKey = (String) key;
for (String key : keys) {
String strKey = key;
if (strKey.startsWith(this.groupPrefix)) {
strKey = strKey.replace(this.groupPrefix, "");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2022 the original author or authors.
* Copyright 2017-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2022 the original author or authors.
* Copyright 2017-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,10 +23,10 @@
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.map.IMap;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.history.MessageHistory;
Expand All @@ -50,26 +50,25 @@ public class HazelcastMessageStoreTests {

private static IMap<Object, Object> map;

@BeforeClass
@BeforeAll
public static void init() {
instance = Hazelcast.newHazelcastInstance();
map = instance.getMap("customTestsMessageStore");
store = new HazelcastMessageStore(map);
}

@AfterClass
@AfterAll
public static void destroy() {
instance.shutdown();
}

@Before
@BeforeEach
public void clean() {
map.clear();
}

@Test
public void testWithMessageHistory() {

Message<?> message = new GenericMessage<>("Hello");
DirectChannel fooChannel = new DirectChannel();
fooChannel.setBeanName("fooChannel");
Expand Down Expand Up @@ -107,7 +106,6 @@ public void testAddAndRemoveMessagesFromMessageGroup() {

@Test
public void addAndGetMessage() {

Message<?> message = MessageBuilder.withPayload("test").build();
store.addMessage(message);
Message<?> retrieved = store.getMessage(message.getHeaders().getId());
Expand Down Expand Up @@ -145,4 +143,34 @@ public void messageStoreIterator() {
assertThat(groupCount).isEqualTo(1);
}

@Test
public void sameMessageInTwoGroupsNotRemovedByFirstGroup() {
GenericMessage<String> testMessage = new GenericMessage<>("test data");

store.addMessageToGroup("1", testMessage);
store.addMessageToGroup("2", testMessage);

store.removeMessageGroup("1");

assertThat(store.getMessageCount()).isEqualTo(1);

store.removeMessageGroup("2");

assertThat(store.getMessageCount()).isEqualTo(0);
}

@Test
public void removeMessagesFromGroupDontRemoveSameMessageInOtherGroup() {
GenericMessage<String> testMessage = new GenericMessage<>("test data");

store.addMessageToGroup("1", testMessage);
store.addMessageToGroup("2", testMessage);

store.removeMessagesFromGroup("1", testMessage);

assertThat(store.getMessageCount()).isEqualTo(1);
assertThat(store.messageGroupSize("1")).isEqualTo(0);
assertThat(store.messageGroupSize("2")).isEqualTo(1);
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2013-2022 the original author or authors.
* Copyright 2013-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -101,7 +101,8 @@ public <T> Message<T> addMessage(Message<T> message) {
@Override
public Message<?> removeMessage(UUID id) {
Assert.notNull(id, "'id' must not be null");
Query query = Query.query(Criteria.where(MessageDocumentFields.MESSAGE_ID).is(id));
Query query = Query.query(Criteria.where(MessageDocumentFields.MESSAGE_ID).is(id)
.and(MessageDocumentFields.GROUP_ID).exists(false));
MessageDocument document = getMongoTemplate().findAndRemove(query, MessageDocument.class, this.collectionName);
return (document != null) ? document.getMessage() : null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -258,14 +258,15 @@ public MessageMetadata getMessageMetadata(UUID id) {
@Override
@ManagedAttribute
public long getMessageCount() {
return this.template.getCollection(this.collectionName).countDocuments();
Query query = Query.query(Criteria.where("headers.id").exists(true).and(GROUP_ID_KEY).exists(false));
return this.template.getCollection(this.collectionName).countDocuments(query.getQueryObject());
}

@Override
public Message<?> removeMessage(UUID id) {
Assert.notNull(id, "'id' must not be null");
MessageWrapper messageWrapper =
this.template.findAndRemove(whereMessageIdIs(id), MessageWrapper.class, this.collectionName);
Query query = Query.query(Criteria.where("headers.id").is(id).and(GROUP_ID_KEY).exists(false));
MessageWrapper messageWrapper = this.template.findAndRemove(query, MessageWrapper.class, this.collectionName);
return (messageWrapper != null ? messageWrapper.getMessage() : null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,28 @@ void testWithMessageHistory() {
.containsEntry("type", "channel");
}

@Test
public void removeMessageDoesntRemoveSameMessageInTheGroup() {
GenericMessage<String> testMessage = new GenericMessage<>("test data");

MessageGroupStore store = getMessageGroupStore();

store.addMessageToGroup("1", testMessage);

MessageStore messageStore = (MessageStore) store;

messageStore.removeMessage(testMessage.getHeaders().getId());

assertThat(messageStore.getMessageCount()).isEqualTo(0);
assertThat(store.getMessageCountForAllMessageGroups()).isEqualTo(1);
assertThat(store.messageGroupSize("1")).isEqualTo(1);

store.removeMessageGroup("1");

assertThat(store.getMessageCountForAllMessageGroups()).isEqualTo(0);
assertThat(store.messageGroupSize("1")).isEqualTo(0);
}

protected abstract MessageGroupStore getMessageGroupStore();

protected abstract MessageStore getMessageStore();
Expand Down
Loading