Skip to content

🎨 重构MessagePipe,添加线程间协作 #73

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
import org.minbox.framework.message.pipe.core.Message;
import org.minbox.framework.message.pipe.core.exception.MessagePipeException;
import org.minbox.framework.message.pipe.server.config.MessagePipeConfiguration;
import org.minbox.framework.message.pipe.server.exception.ExceptionHandler;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.util.ObjectUtils;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

Expand All @@ -28,16 +29,45 @@ public class MessagePipe {
*/
@Getter
private String name;
private String queueName;
private AtomicInteger lastMessageCount = new AtomicInteger(0);
private AtomicLong lastProcessTimeMillis = new AtomicLong(System.currentTimeMillis());
/**
* The Redis blocking queue bound to the current message pipeline
*/
@Getter
private RBlockingQueue<Message> queue;
/**
* The redisson client instance
*
* @see RBlockingQueue
* @see RLock
*/
@Getter
private RedissonClient redissonClient;
/**
* The queue name in redis
*/
private String queueName;
/**
* The name of the lock used when putting the message
*/
private String putLockName;
/**
* The lock name used when taking the message
*/
private String takeLockName;
/**
* The last processing message millis
* <p>
* The default values is {@link System#currentTimeMillis()}
*/
private AtomicLong lastProcessTimeMillis = new AtomicLong(System.currentTimeMillis());
/**
* Whether the message monitoring method is being executed
*/
private boolean runningHandleAll = false;
/**
* Is the add data method being executed
*/
private boolean transfer = false;
/**
* The {@link MessagePipe} configuration
*/
Expand All @@ -49,7 +79,10 @@ public MessagePipe(String name,
MessagePipeConfiguration configuration) {
this.name = name;
this.queueName = LockNames.MESSAGE_QUEUE.format(this.name);
this.putLockName = LockNames.PUT_MESSAGE.format(this.name);
this.takeLockName = LockNames.TAKE_MESSAGE.format(this.name);
this.redissonClient = redissonClient;
this.queue = redissonClient.getBlockingQueue(this.queueName);
this.configuration = configuration;
if (this.name == null || this.name.trim().length() == 0) {
throw new MessagePipeException("The MessagePipe name is required,cannot be null.");
Expand All @@ -67,58 +100,103 @@ public MessagePipe(String name,
*
* @param message The {@link Message} instance
*/
public void put(Message message) {
String putLockName = LockNames.PUT_MESSAGE.format(this.name);
public synchronized void putLast(Message message) {
this.transfer = true;
RLock putLock = redissonClient.getLock(putLockName);
putLock.lock();
if (!Thread.currentThread().isInterrupted()) {
try {
String queueLockName = LockNames.MESSAGE_QUEUE.format(this.name);
RBlockingQueue<Message> queue = redissonClient.getBlockingQueue(queueLockName);
try {
MessagePipeConfiguration.LockTime lockTime = configuration.getLockTime();
if (putLock.tryLock(lockTime.getWaitTime(), lockTime.getLeaseTime(), lockTime.getTimeUnit())) {
boolean addSuccess = queue.offer(message);
lastMessageCount.set(queue.size());
if (!addSuccess) {
throw new MessagePipeException("Unsuccessful when writing the message to the queue.");
}
} catch (Exception e) {
configuration.getExceptionHandler().handleException(e, message);
} finally {
putLock.unlock();
}
} catch (Exception e) {
this.doHandleException(e, message);
} finally {
this.transfer = false;
putLock.unlock();
notifyAll();
}
}

/**
* Lock processing the first message
* Processing first message
*
* @param function Logical method of processing messages
* @param function Logical method of processing first message in {@link MessagePipe}
*/
public void lockHandleTheFirst(Function<Message, Boolean> function) {
Message message = null;
String takeLockName = LockNames.TAKE_MESSAGE.format(this.name);
public synchronized void handleFirst(Function<Message, Boolean> function) {
while (transfer || runningHandleAll) {
try {
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error(e.getMessage(), e);
}
}
Message current = null;
Long currentTimeMillis = System.currentTimeMillis();
RLock takeLock = redissonClient.getLock(takeLockName);
log.debug("lock:" + takeLock.toString() + ",interrupted:" + Thread.currentThread().isInterrupted()
+ ",hold:" + takeLock.isHeldByCurrentThread() + ",threadId:" + Thread.currentThread().getId());
try {
MessagePipeConfiguration.LockTime lockTime = configuration.getLockTime();
if (takeLock.tryLock(lockTime.getWaitTime(), lockTime.getLeaseTime(), lockTime.getTimeUnit())) {
log.debug("Thread:{}, acquired lock.", Thread.currentThread().getId());
RBlockingQueue<Message> queue = redissonClient.getBlockingQueue(this.queueName);
this.lastMessageCount.set(queue.size());
message = queue.peek();
boolean isExecutionSuccessfully = message != null ? function.apply(message) : false;
if (isExecutionSuccessfully) {
Long currentTimeMillis = System.currentTimeMillis();
this.lastProcessTimeMillis.set(currentTimeMillis);
queue.poll();
// Take first message
current = this.peek();
if (ObjectUtils.isEmpty(current)) {
log.warn("Message pipeline: {}, no message to be processed was found.", name);
return;
}
boolean executionResult = function.apply(current);
if (!executionResult) {
throw new MessagePipeException("MessagePipe [" + name + "] , Handle message exception, message content: " +
new String(current.getBody()));
}
// Remove first message
this.poll();
}
} catch (Exception e) {
configuration.getExceptionHandler().handleException(e, message);
this.doHandleException(e, current);
} finally {
if (!this.checkClientIsShutdown() && takeLock.isLocked() && takeLock.isHeldByCurrentThread()) {
takeLock.unlock();
lastProcessTimeMillis.set(currentTimeMillis);
transfer = true;
takeLock.unlock();
notifyAll();
}
}

/**
* Process messages sequentially until all processing is complete
*
* @param function Logical method of processing messages in a loop
*/
public synchronized void handleToLast(Function<Message, Boolean> function) {
runningHandleAll = true;
RLock takeLock = redissonClient.getLock(takeLockName);
Message current = null;
try {
MessagePipeConfiguration.LockTime lockTime = configuration.getLockTime();
if (takeLock.tryLock(lockTime.getWaitTime(), lockTime.getLeaseTime(), lockTime.getTimeUnit())) {
while (queue.size() > 0) {
// Take first message
current = this.peek();
boolean executionResult = function.apply(current);
if (!executionResult) {
throw new MessagePipeException("Handle message exception, message content: " +
new String(current.getBody()));
}
// Remove first message
this.poll();
}
}
} catch (Exception e) {
this.doHandleException(e, current);
} finally {
Long currentTimeMillis = System.currentTimeMillis();
lastProcessTimeMillis.set(currentTimeMillis);
transfer = true;
runningHandleAll = false;
takeLock.unlock();
notifyAll();
}
}

Expand All @@ -131,7 +209,6 @@ public void lockHandleTheFirst(Function<Message, Boolean> function) {
public Message peek() {
Message message = null;
if (!this.checkClientIsShutdown()) {
RBlockingQueue<Message> queue = redissonClient.getBlockingQueue(queueName);
message = queue.peek();
}
return message;
Expand All @@ -146,7 +223,6 @@ public Message peek() {
public Message poll() {
Message message = null;
if (!this.checkClientIsShutdown()) {
RBlockingQueue<Message> queue = redissonClient.getBlockingQueue(queueName);
message = queue.poll();
}
return message;
Expand All @@ -160,30 +236,20 @@ public Message poll() {
public int size() {
int messageSize = 0;
if (!this.checkClientIsShutdown()) {
RBlockingQueue<Message> queue = redissonClient.getBlockingQueue(queueName);
messageSize = queue.size();
this.lastMessageCount.set(messageSize);
messageSize = this.queue.size();
}
return messageSize;
}

/**
* Get last invoke {@link #lockHandleTheFirst} method time millis
* Get last invoke {@link #handleFirst}、{@link #handleToLast} method time millis
*
* @return Last call time,{@link java.util.concurrent.TimeUnit#MILLISECONDS}
*/
public Long getLastProcessTimeMillis() {
return this.lastProcessTimeMillis.get();
}

/**
*
* @return
*/
public int getLastMessageCount() {
return this.lastMessageCount.get();
}

/**
* Check whether the redisson client has been shutdown
*
Expand All @@ -193,4 +259,17 @@ private boolean checkClientIsShutdown() {
return redissonClient.isShutdown() || redissonClient.isShuttingDown();
}

/**
* Execution processing exception
*
* @param e The {@link Exception} instance
* @param current {@link Message} instance being processed
*/
private void doHandleException(Exception e, Message current) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
ExceptionHandler exceptionHandler = configuration.getExceptionHandler();
exceptionHandler.handleException(e, current);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ public MessageDistributionExecutor(MessagePipe messagePipe,
* Waiting for process new messages
* <p>
* After discovering a new message from the message pipeline, perform distribution to the client
* Take the value of {@link MessagePipe#getLastMessageCount()}
* Take the value of {@link MessagePipe#size()}
* as the judgment condition for processing the distribution message
*/
public void waitProcessing() {
while (true) {
try {
synchronized (this) {
while (this.messagePipe.getLastMessageCount() > 0) {
while (this.messagePipe.size() > 0) {
try {
this.takeAndSend();
}
Expand Down Expand Up @@ -93,7 +93,7 @@ private void takeAndSend() {
if (ObjectUtils.isEmpty(client)) {
throw new MessagePipeException("Message Pipe: " + this.pipeName + ", no healthy clients were found.");
}
this.messagePipe.lockHandleTheFirst(message -> sendMessageToClient(message, client));
this.messagePipe.handleFirst(message -> sendMessageToClient(message, client));
}

/**
Expand Down