Skip to content

Commit d5b83ab

Browse files
authored
Merge pull request #38 from hengboy/master
Server only distributes messages to valid clients
2 parents cbcb318 + 9dadb00 commit d5b83ab

File tree

4 files changed

+26
-10
lines changed

4 files changed

+26
-10
lines changed

message-pipe-server/src/main/java/org/minbox/framework/message/pipe/server/ClientExpiredExecutor.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
package org.minbox.framework.message.pipe.server;
22

3-
import com.alibaba.fastjson.JSON;
43
import lombok.extern.slf4j.Slf4j;
5-
import org.minbox.framework.message.pipe.core.information.ClientInformation;
64
import org.minbox.framework.message.pipe.core.ClientStatus;
5+
import org.minbox.framework.message.pipe.core.information.ClientInformation;
76
import org.minbox.framework.message.pipe.server.config.ServerConfiguration;
87
import org.springframework.beans.factory.DisposableBean;
98
import org.springframework.beans.factory.InitializingBean;
@@ -52,16 +51,22 @@ private void startEliminateExpiredClient() {
5251
List<ClientInformation> clients = ClientManager.getAllClient();
5352
if (clients != null && clients.size() > 0) {
5453
clients.stream().forEach(client -> {
55-
long intervalSeconds = (client.getLastReportTime() - currentTime) / 1000;
54+
String clientId = ClientManager.getClientId(client.getAddress(), client.getPort());
55+
long intervalSeconds = (currentTime - client.getLastReportTime()) / 1000;
5656
if (intervalSeconds > configuration.getExpiredExcludeThresholdSeconds()
5757
&& ClientStatus.ON_LINE.equals(client.getStatus())) {
5858
client.setStatus(ClientStatus.OFF_LINE);
5959
ClientManager.updateClientInformation(client);
60-
log.debug("MessagePipe Client:{},status updated to offline.", JSON.toJSONString(client));
60+
log.info("MessagePipe Client:{},status updated to offline.", clientId);
61+
} else if (intervalSeconds <= configuration.getExpiredExcludeThresholdSeconds()
62+
&& ClientStatus.OFF_LINE.equals(client.getStatus())) {
63+
client.setStatus(ClientStatus.ON_LINE);
64+
ClientManager.updateClientInformation(client);
65+
log.info("MessagePipe Client:{},status updated to online.", clientId);
6166
}
6267
});
6368
}
64-
}, 5, 20, TimeUnit.SECONDS);
69+
}, 5, configuration.getCheckClientExpiredIntervalSeconds(), TimeUnit.SECONDS);
6570
}
6671

6772
@Override

message-pipe-server/src/main/java/org/minbox/framework/message/pipe/server/ClientManager.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import io.grpc.ManagedChannel;
44
import io.grpc.ManagedChannelBuilder;
5+
import org.minbox.framework.message.pipe.core.ClientStatus;
56
import org.minbox.framework.message.pipe.core.information.ClientInformation;
67
import org.minbox.framework.message.pipe.core.exception.MessagePipeException;
78
import org.springframework.util.ObjectUtils;
@@ -120,11 +121,16 @@ public static void bindClientToPipe(String pipeName, String clientId) {
120121
* @param pipeName message pipe name
121122
* @return The message pipe bind clients
122123
*/
123-
public static List<ClientInformation> getPipeBindClients(String pipeName) {
124+
public static List<ClientInformation> getPipeBindOnLineClients(String pipeName) {
124125
List<ClientInformation> clientInformationList = new ArrayList<>();
125126
Set<String> clientIds = PIPE_CLIENTS.get(pipeName);
126127
if (!ObjectUtils.isEmpty(clientIds)) {
127-
clientIds.stream().forEach(clientId -> clientInformationList.add(CLIENTS.get(clientId)));
128+
clientIds.stream().forEach(clientId -> {
129+
ClientInformation client = CLIENTS.get(clientId);
130+
if (ClientStatus.ON_LINE == client.getStatus()) {
131+
clientInformationList.add(client);
132+
}
133+
});
128134
}
129135
return clientInformationList;
130136
}
@@ -151,7 +157,6 @@ public static synchronized ManagedChannel establishChannel(String clientId) {
151157
}
152158

153159
/**
154-
*
155160
* @param clientId
156161
*/
157162
public static void removeChannel(String clientId) {

message-pipe-server/src/main/java/org/minbox/framework/message/pipe/server/config/ServerConfiguration.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,8 @@ public class ServerConfiguration {
2929
* @see org.minbox.framework.message.pipe.server.ClientExpiredExecutor
3030
*/
3131
private long expiredExcludeThresholdSeconds = 30;
32+
/**
33+
* Check the client timeout interval in seconds
34+
*/
35+
private long checkClientExpiredIntervalSeconds = 10;
3236
}

message-pipe-server/src/main/java/org/minbox/framework/message/pipe/server/distribution/MessageDistributionExecutor.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ private void takeAndSend() {
8383
takeLock.lock(configuration.getLockTime().getLeaseTime(), configuration.getLockTime().getTimeUnit());
8484
if (!Thread.currentThread().isInterrupted()) {
8585
try {
86-
List<ClientInformation> clients = ClientManager.getPipeBindClients(this.pipeName);
86+
List<ClientInformation> clients = ClientManager.getPipeBindOnLineClients(this.pipeName);
8787
String queueLockName = LockNames.MESSAGE_QUEUE.format(this.pipeName);
8888
RBlockingQueue<Message> queue = redissonClient.getBlockingQueue(queueLockName);
8989
message = queue.peek();
@@ -145,7 +145,9 @@ private boolean sendMessageToClient(Message message, ClientInformation clientInf
145145
} catch (Exception e) {
146146
throw e;
147147
}
148-
log.debug("To the client: {}, sending the message is complete.", clientId);
148+
if (isSendSuccessfully) {
149+
log.debug("To the client: {}, sending the message is complete.", clientId);
150+
}
149151
return isSendSuccessfully;
150152
}
151153

0 commit comments

Comments
 (0)