Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(RedisMessageStore): RedisMessageStore add lock #9680

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,12 @@
* @author Gary Russell
* @author Artem Bilan
* @author Ngoc Nhan
* @author Youbin Wu
*
* @since 2.1
*/
public abstract class AbstractKeyValueMessageStore extends AbstractMessageGroupStore implements MessageStore {

private static final String GROUP_ID_MUST_NOT_BE_NULL = "'groupId' must not be null";

protected static final String MESSAGE_KEY_PREFIX = "MESSAGE_";

protected static final String MESSAGE_GROUP_KEY_PREFIX = "GROUP_OF_MESSAGES_";
Expand Down Expand Up @@ -206,7 +205,7 @@ public MessageGroupMetadata getGroupMetadata(Object groupId) {
}

@Override
public void addMessagesToGroup(Object groupId, Message<?>... messages) {
protected void doAddMessagesToGroup(Object groupId, Message<?>... messages) {
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
Assert.notNull(messages, "'messages' must not be null");

Expand Down Expand Up @@ -240,7 +239,7 @@ public void addMessagesToGroup(Object groupId, Message<?>... messages) {
}

@Override
public void removeMessagesFromGroup(Object groupId, Collection<Message<?>> messages) {
protected void doRemoveMessagesFromGroup(Object groupId, Collection<Message<?>> messages) {
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
Assert.notNull(messages, "'messages' must not be null");

Expand Down Expand Up @@ -283,7 +282,7 @@ public Message<?> getMessageFromGroup(Object groupId, UUID messageId) {
}

@Override
public boolean removeMessageFromGroupById(Object groupId, UUID messageId) {
protected boolean doRemoveMessageFromGroupById(Object groupId, UUID messageId) {
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
Assert.notNull(messageId, "'messageId' must not be null");
Object mgm = doRetrieve(this.groupPrefix + groupId);
Expand All @@ -305,7 +304,7 @@ public boolean removeMessageFromGroupById(Object groupId, UUID messageId) {
}

@Override
public void completeGroup(Object groupId) {
protected void doCompleteGroup(Object groupId) {
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
MessageGroupMetadata metadata = getGroupMetadata(groupId);
if (metadata != null) {
Expand All @@ -319,7 +318,7 @@ public void completeGroup(Object groupId) {
* Remove the MessageGroup with the provided group ID.
*/
@Override
public void removeMessageGroup(Object groupId) {
protected void doRemoveMessageGroup(Object groupId) {
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
Object mgm = doRemove(this.groupPrefix + groupId);
if (mgm != null) {
Expand All @@ -337,7 +336,7 @@ public void removeMessageGroup(Object groupId) {
}

@Override
public void setGroupCondition(Object groupId, String condition) {
protected void doSetGroupCondition(Object groupId, String condition) {
MessageGroupMetadata metadata = getGroupMetadata(groupId);
if (metadata != null) {
metadata.setCondition(condition);
Expand All @@ -346,7 +345,7 @@ public void setGroupCondition(Object groupId, String condition) {
}

@Override
public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) {
protected void doSetLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) {
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
MessageGroupMetadata metadata = getGroupMetadata(groupId);
if (metadata == null) {
Expand All @@ -359,7 +358,7 @@ public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNu
}

@Override
public Message<?> pollMessageFromGroup(Object groupId) {
protected Message<?> doPollMessageFromGroup(Object groupId) {
MessageGroupMetadata groupMetadata = getGroupMetadata(groupId);
if (groupMetadata != null) {
UUID firstId = groupMetadata.firstId();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,30 +19,42 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.integration.support.locks.DefaultLockRegistry;
import org.springframework.integration.support.locks.LockRegistry;
import org.springframework.integration.util.CheckedCallable;
import org.springframework.integration.util.CheckedRunnable;
import org.springframework.jmx.export.annotation.ManagedAttribute;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.util.Assert;

/**
* @author Dave Syer
* @author Oleg Zhurakousky
* @author Gary Russell
* @author Artem Bilan
* @author Christian Tzolov
* @author Youbin Wu
*
* @since 2.0
*/
@ManagedResource
public abstract class AbstractMessageGroupStore extends AbstractBatchingMessageGroupStore
implements MessageGroupStore, Iterable<MessageGroup> {

protected static final String INTERRUPTED_WHILE_OBTAINING_LOCK = "Interrupted while obtaining lock";

protected static final String GROUP_ID_MUST_NOT_BE_NULL = "'groupId' must not be null";

protected final Log logger = LogFactory.getLog(getClass()); // NOSONAR final

private final Lock lock = new ReentrantLock();
Expand All @@ -56,11 +68,15 @@ public abstract class AbstractMessageGroupStore extends AbstractBatchingMessageG

private boolean timeoutOnIdle;

protected LockRegistry lockRegistry;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This must be private and whenever we use outside of this class has to be replaced by the protected getLockRegistry().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, even LockRegister does not need to be exposed to subclasses. But SimpleMessageStore is special, it needs lockRegister and has its own semaphore mechanism. I hope you can handle the follow-up work, because I feel there will be many special considerations in it. I will not update this PR again

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, confirm that you are OK that I'll pick it up whenever you have left.

I'll do that, thought, in new year, when I switch to 6.5 already.

Happy holidays!


protected AbstractMessageGroupStore() {
this.lockRegistry = new DefaultLockRegistry();
}

protected AbstractMessageGroupStore(boolean lazyLoadMessageGroups) {
this.lazyLoadMessageGroups = lazyLoadMessageGroups;
this.lockRegistry = new DefaultLockRegistry();
}

@Override
Expand Down Expand Up @@ -109,6 +125,16 @@ public void setLazyLoadMessageGroups(boolean lazyLoadMessageGroups) {
this.lazyLoadMessageGroups = lazyLoadMessageGroups;
}

/**
* Specify the type of the {@link LockRegistry} to ensure atomic operations
* @param lockRegistry lockRegistryType
* @since 6.5
*/
public void setLockRegistry(LockRegistry lockRegistry) {
Assert.notNull(lockRegistry, "The LockRegistry cannot be null");
this.lockRegistry = lockRegistry;
}

@Override
public void registerMessageGroupExpiryCallback(MessageGroupCallback callback) {
if (callback instanceof UniqueExpiryCallback) {
Expand Down Expand Up @@ -195,12 +221,98 @@ public void removeMessagesFromGroup(Object key, Message<?>... messages) {
removeMessagesFromGroup(key, Arrays.asList(messages));
}

@Override
public void removeMessagesFromGroup(Object key, Collection<Message<?>> messages) {
Assert.notNull(key, GROUP_ID_MUST_NOT_BE_NULL);
executeLocked(key, () -> doRemoveMessagesFromGroup(key, messages));
}

protected abstract void doRemoveMessagesFromGroup(Object key, Collection<Message<?>> messages);

@Override
public void addMessagesToGroup(Object groupId, Message<?>... messages) {
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
executeLocked(groupId, () -> doAddMessagesToGroup(groupId, messages));
}

protected abstract void doAddMessagesToGroup(Object groupId, Message<?>... messages);

@Override
public MessageGroup addMessageToGroup(Object groupId, Message<?> message) {
addMessagesToGroup(groupId, message);
return getMessageGroup(groupId);
}

@Override
public void removeMessageGroup(Object groupId) {
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
executeLocked(groupId, () -> doRemoveMessageGroup(groupId));
}

protected abstract void doRemoveMessageGroup(Object groupId);

@Override
public boolean removeMessageFromGroupById(Object groupId, UUID messageId) {
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
return executeLocked(groupId, () -> doRemoveMessageFromGroupById(groupId, messageId));
}

protected boolean doRemoveMessageFromGroupById(Object groupId, UUID messageId) {
throw new UnsupportedOperationException("Not supported for this store");
}

@Override
public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) {
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
executeLocked(groupId, () -> doSetLastReleasedSequenceNumberForGroup(groupId, sequenceNumber));
}

protected abstract void doSetLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber);

@Override
public void completeGroup(Object groupId) {
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
executeLocked(groupId, () -> doCompleteGroup(groupId));
}

protected abstract void doCompleteGroup(Object groupId);

@Override
public void setGroupCondition(Object groupId, String condition) {
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
executeLocked(groupId, () -> doSetGroupCondition(groupId, condition));
}

protected abstract void doSetGroupCondition(Object groupId, String condition);

@Override
public Message<?> pollMessageFromGroup(Object groupId) {
Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL);
return executeLocked(groupId, () -> doPollMessageFromGroup(groupId));
}

protected abstract Message<?> doPollMessageFromGroup(Object groupId);

protected <T, E extends RuntimeException> T executeLocked(Object groupId, CheckedCallable<T, E> runnable) {
try {
return this.lockRegistry.executeLocked(groupId, runnable);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new MessagingException(INTERRUPTED_WHILE_OBTAINING_LOCK, e);
}
}

protected <E extends RuntimeException> void executeLocked(Object groupId, CheckedRunnable<E> runnable) {
try {
this.lockRegistry.executeLocked(groupId, runnable);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new MessagingException(INTERRUPTED_WHILE_OBTAINING_LOCK, e);
}
}

private void expire(MessageGroup group) {

RuntimeException exception = null;
Expand Down
Loading
Loading