Skip to content

Commit a76ab93

Browse files
broadcast messages
1 parent db1cbc4 commit a76ab93

File tree

8 files changed

+327
-41
lines changed

8 files changed

+327
-41
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656
<dependency>
5757
<groupId>org.teamapps</groupId>
5858
<artifactId>teamapps-configuration-management</artifactId>
59-
<version>0.1</version>
59+
<version>0.2-SNAPSHOT</version>
6060
<scope>compile</scope>
6161
</dependency>
6262

src/main/java/org/teamapps/cluster/core/Cluster.java

Lines changed: 162 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
* Licensed under the Apache License, Version 2.0 (the "License");
88
* you may not use this file except in compliance with the License.
99
* You may obtain a copy of the License at
10-
*
10+
*
1111
* http://www.apache.org/licenses/LICENSE-2.0
12-
*
12+
*
1313
* Unless required by applicable law or agreed to in writing, software
1414
* distributed under the License is distributed on an "AS IS" BASIS,
1515
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -23,6 +23,7 @@
2323
import org.slf4j.Logger;
2424
import org.slf4j.LoggerFactory;
2525
import org.teamapps.cluster.message.protocol.*;
26+
import org.teamapps.commons.event.Event;
2627
import org.teamapps.commons.util.collections.ByKeyComparisonResult;
2728
import org.teamapps.commons.util.collections.CollectionUtil;
2829
import org.teamapps.configuration.Configuration;
@@ -45,8 +46,11 @@
4546

4647
public class Cluster implements ClusterServiceRegistry {
4748
private static final Logger LOGGER = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
48-
4949
public static final String CLUSTER_SERVICE = "clusterService";
50+
51+
public final Event<ClusterNodeData> onLeaderAvailable = new Event<>();
52+
public final Event<List<ClusterNodeData>> onAvailableNodesChange = new Event<>();
53+
5054
private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
5155
private final ClusterNodeData localNode;
5256
private final Map<String, ClusterNode> clusterNodeMap = new ConcurrentHashMap<>();
@@ -57,6 +61,7 @@ public class Cluster implements ClusterServiceRegistry {
5761
private final File tempDir;
5862
private ClusterConfig clusterConfig;
5963
private boolean active = true;
64+
private ClusterNodeData leaderNode;
6065

6166
private ThreadPoolExecutor taskExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
6267
60L, TimeUnit.SECONDS,
@@ -89,9 +94,14 @@ private Cluster(ClusterConfig clusterConfig) {
8994
localNode = new ClusterNodeData()
9095
.setNodeId(clusterConfig.getNodeId() != null && !clusterConfig.getNodeId().isBlank() ? clusterConfig.getNodeId() : UUID.randomUUID().toString())
9196
.setHost(clusterConfig.getHost())
92-
.setPort(clusterConfig.getPort());
97+
.setPort(clusterConfig.getPort())
98+
.setLeaderNode(clusterConfig.isLeaderNode());
99+
if (clusterConfig.isLeaderNode()) {
100+
leaderNode = localNode;
101+
onLeaderAvailable.fire(leaderNode);
102+
}
93103
tempDir = createTempDirSave();
94-
LOGGER.info("Cluster node [{}]: started", localNode.getNodeId());
104+
LOGGER.info("Cluster node [{}]: started {}", localNode.getNodeId(), clusterConfig.isLeaderNode() ? "as leader node" : "");
95105
startServerSocket(localNode);
96106
if (clusterConfig.getPeerNodes() != null) {
97107
clusterConfig.getPeerNodes().stream()
@@ -130,17 +140,28 @@ private void startServerSocket(ClusterNodeData localNode) {
130140

131141
protected synchronized ClusterConnectionResult handleConnectionRequest(ClusterConnectionRequest request, ClusterConnection connection) {
132142
ClusterNodeData remoteNode = request.getLocalNode();
143+
LOGGER.info("Cluster node [{}]: connection requested from: {}, {}", localNode.getNodeId(), request.getLocalNode().getNodeId(), request.getLocalNode().getHost());
133144
String[] nodeServices = request.getLocalServices();
134145
ClusterConnectionResult connectionResult = new ClusterConnectionResult().setLocalNode(localNode);
146+
if (request.getLeaderNode() != null) {
147+
if (leaderNode != null && !leaderNode.getNodeId().equals(request.getLeaderNode().getNodeId())) {
148+
LOGGER.error("Cluster node [{}]: error: connection requested denied from {}, {} - different leader node: {} vs {}", localNode.getNodeId(), request.getLocalNode().getNodeId(), request.getLocalNode().getHost(), leaderNode.getNodeId(), request.getLocalNode().getNodeId());
149+
return connectionResult
150+
.setAccepted(false);
151+
} else if (leaderNode == null) {
152+
leaderNode = remoteNode;
153+
sendLeaderNodeUpdateToPeers();
154+
onLeaderAvailable.fire(leaderNode);
155+
LOGGER.info("Cluster node [{}]: new leader node: {}", localNode.getNodeId(), request.getLeaderNode().getNodeId());
156+
}
157+
}
135158
ClusterNode clusterNode = clusterNodeMap.get(remoteNode.getNodeId());
136-
List<ClusterNode> existingPeerNodes = new ArrayList<>(clusterNodeMap.values())
159+
List<ClusterNodeData> knownPeers = new ArrayList<>(clusterNodeMap.values())
137160
.stream()
138-
.filter(node -> !node.getNodeData().getNodeId().equals(localNode.getNodeId()))
139-
.filter(node -> node.getNodeData().getPort() > 0)
140-
.toList();
141-
List<ClusterNodeData> knownPeers = existingPeerNodes.stream()
142161
.map(ClusterNode::getNodeData)
143-
.collect(Collectors.toList());
162+
.filter(nodeData -> !nodeData.getNodeId().equals(localNode.getNodeId()))
163+
.filter(nodeData -> nodeData.getPort() > 0)
164+
.toList();
144165

145166
if (clusterNode == null || !clusterNode.isConnected()) {
146167
if (clusterNode == null) {
@@ -156,6 +177,15 @@ protected synchronized ClusterConnectionResult handleConnectionRequest(ClusterCo
156177
sendMessageToPeerNodes(new ClusterNewPeerInfo().setNewPeer(remoteNode), remoteNode);
157178
}
158179

180+
request.getKnownPeers().stream()
181+
.filter(peer -> peer.getHost() != null)
182+
.filter(peer -> peer.getPort() > 0)
183+
.filter(peer -> !clusterNodeMap.containsKey(peer.getNodeId()))
184+
.filter(peer -> !localNode.getNodeId().equals(peer.getNodeId()))
185+
.forEach(this::connectNode);
186+
187+
handleAvailableClusterNodesChanged();
188+
159189
return connectionResult
160190
.setAccepted(true)
161191
.setKnownPeers(knownPeers)
@@ -171,6 +201,17 @@ protected synchronized void handleConnectionResult(ClusterConnectionResult resul
171201
if (result.isAccepted()) {
172202
LOGGER.info("Cluster node [{}]: connection request accepted from: {}, {}", localNode.getNodeId(), result.getLocalNode().getNodeId(), result.getLocalNode().getHost());
173203
ClusterNode clusterNode = clusterNodeMap.get(remoteNode.getNodeId());
204+
if (result.getLeaderNode() != null) {
205+
if (leaderNode == null) {
206+
leaderNode = result.getLeaderNode();
207+
sendLeaderNodeUpdateToPeers();
208+
onLeaderAvailable.fire(leaderNode);
209+
LOGGER.info("Cluster node [{}]: new leader node: {}", localNode.getNodeId(), result.getLeaderNode().getNodeId());
210+
} else if (!leaderNode.getNodeId().equals(result.getLocalNode().getNodeId())) {
211+
LOGGER.error("Cluster node [{}]: error: connection result denied from {}, {} - different leader node: {} vs {}", localNode.getNodeId(), result.getLocalNode().getNodeId(), result.getLocalNode().getHost(), leaderNode.getNodeId(), result.getLocalNode().getNodeId());
212+
return;
213+
}
214+
}
174215
if (clusterNode == null) {
175216
clusterNode = new ClusterNode(this, remoteNode, connection);
176217
clusterNodeMap.put(remoteNode.getNodeId(), clusterNode);
@@ -195,6 +236,7 @@ protected synchronized void handleConnectionResult(ClusterConnectionResult resul
195236
sendMessageToPeerNodes(servicesUpdate);
196237
}
197238
}
239+
handleAvailableClusterNodesChanged();
198240
} else {
199241
LOGGER.info("Cluster node [{}]: connection request denied from: {}, {}", localNode.getNodeId(), result.getLocalNode().getNodeId(), result.getLocalNode().getHost());
200242
}
@@ -257,6 +299,15 @@ protected void handleClusterNewPeerInfo(ClusterNewPeerInfo newPeerInfo, ClusterN
257299
}
258300
}
259301

302+
protected void handleClusterNewLeaderInfo(ClusterNewLeaderInfo newLeaderInfo, ClusterNode clusterNode) {
303+
if (leaderNode == null) {
304+
leaderNode = newLeaderInfo.getLeaderNode();
305+
sendLeaderNodeUpdateToPeers();
306+
onLeaderAvailable.fire(leaderNode);
307+
LOGGER.info("Cluster node [{}]: new leader node: {}", localNode.getNodeId(), newLeaderInfo.getLeaderNode().getNodeId());
308+
}
309+
}
310+
260311

261312
protected void handleClusterAvailableServicesUpdate(ClusterAvailableServicesUpdate availableServicesUpdate, ClusterNode clusterNode) {
262313
updateClusterNodeServices(clusterNode, availableServicesUpdate.getServices());
@@ -266,6 +317,20 @@ protected synchronized void handleDisconnect(ClusterNode clusterNode) {
266317
pendingServiceRequestsMap.values().stream()
267318
.filter(clusterTask -> Objects.equals(clusterTask.getProcessingNodeId(), clusterNode.getNodeData().getNodeId()))
268319
.forEach(this::executeClusterTask);
320+
handleAvailableClusterNodesChanged();
321+
}
322+
323+
324+
private void handleAvailableClusterNodesChanged() {
325+
List<ClusterNodeData> availableNodes = getAvailablePeerNodes();
326+
onAvailableNodesChange.fire(availableNodes);
327+
}
328+
329+
private List<ClusterNodeData> getAvailablePeerNodes() {
330+
return clusterNodeMap.values().stream()
331+
.filter(ClusterNode::isConnected)
332+
.map(ClusterNode::getNodeData)
333+
.toList();
269334
}
270335

271336
private void handleConfigUpdate(ClusterConfig config) {
@@ -290,17 +355,53 @@ private synchronized void updateClusterNodeServices(ClusterNode clusterNode, Str
290355
}
291356

292357
protected void connectNode(ClusterNodeData peerNode) {
293-
new ClusterConnection(this, peerNode, new ArrayList<>(localServices.keySet()));
358+
List<ClusterNodeData> knownPeers = new ArrayList<>(clusterNodeMap.values())
359+
.stream()
360+
.map(ClusterNode::getNodeData)
361+
.filter(nodeData -> !nodeData.getNodeId().equals(localNode.getNodeId()))
362+
.filter(nodeData -> nodeData.getPort() > 0)
363+
.toList();
364+
String[] services = localServices.keySet().toArray(new String[0]);
365+
ClusterConnectionRequest clusterConnectionRequest = new ClusterConnectionRequest()
366+
.setLocalNode(localNode)
367+
.setLocalServices(services)
368+
.setLeaderNode(leaderNode)
369+
.setKnownPeers(knownPeers);
370+
new ClusterConnection(this, peerNode, clusterConnectionRequest);
294371
}
295372

296373
private synchronized void sendMessageToPeerNodes(Message message, ClusterNodeData... excludingNodes) {
297374
Set<String> excludeSet = excludingNodes == null ? new HashSet<>() : Arrays.stream(excludingNodes).map(ClusterNodeData::getNodeId).collect(Collectors.toSet());
298-
List<ClusterNode> peerNodes = clusterNodeMap.values().stream().filter(node -> node.isConnected() && !excludeSet.contains(node.getNodeData().getNodeId())).collect(Collectors.toList());
375+
List<ClusterNode> peerNodes = clusterNodeMap.values()
376+
.stream()
377+
.filter(node -> node.isConnected() && !excludeSet.contains(node.getNodeData().getNodeId()))
378+
.toList();
299379
LOGGER.info("Cluster node [{}]: send to peer nodes: {}, message: {}", localNode.getNodeId(), peerNodes.size(), message.getMessageDefUuid());
300380
peerNodes.forEach(node -> node.writeMessage(message));
301381
}
302382

303383

384+
private synchronized void sendLeaderNodeUpdateToPeers() {
385+
List<ClusterNode> peerNodes = clusterNodeMap.values()
386+
.stream()
387+
.toList();
388+
ClusterNewLeaderInfo clusterNewLeaderInfo = new ClusterNewLeaderInfo().setLeaderNode(leaderNode);
389+
peerNodes.forEach(node -> node.writeMessage(clusterNewLeaderInfo));
390+
}
391+
392+
public void sendMessage(String nodeId, Message message) {
393+
ClusterNode clusterNode = clusterNodeMap.get(nodeId);
394+
if (clusterNode != null) {
395+
clusterNode.writeMessage(message);
396+
}
397+
}
398+
399+
public void sendMessage(List<String> nodeIds, Message message) {
400+
for (String nodeId : nodeIds) {
401+
sendMessage(nodeId, message);
402+
}
403+
}
404+
304405
@Override
305406
public void registerService(AbstractClusterService clusterService) {
306407
LOGGER.info("Cluster node [{}]: register local service: {}", localNode.getNodeId(), clusterService.getServiceName());
@@ -323,8 +424,13 @@ public boolean isServiceAvailable(String serviceName) {
323424

324425
@Override
325426
public <REQUEST extends Message, RESPONSE extends Message> RESPONSE executeServiceMethod(String serviceName, String method, REQUEST request, PojoObjectDecoder<RESPONSE> responseDecoder) {
326-
LOGGER.info("Cluster node: {} - execute service method {}/{}", localNode.getNodeId(), serviceName, method);
327-
ClusterTask clusterTask = new ClusterTask(serviceName, method, request);
427+
return executeServiceMethod(null, serviceName, method, request, responseDecoder);
428+
}
429+
430+
@Override
431+
public <REQUEST extends Message, RESPONSE extends Message> RESPONSE executeServiceMethod(String clusterNodeId, String serviceName, String method, REQUEST request, PojoObjectDecoder<RESPONSE> responseDecoder) {
432+
LOGGER.info("Cluster node: {} - execute service method {}/{}" + (clusterNodeId != null ? ", on node {}" : ""), localNode.getNodeId(), serviceName, method, clusterNodeId);
433+
ClusterTask clusterTask = new ClusterTask(serviceName, method, request, clusterNodeId);
328434
pendingServiceRequestsMap.put(clusterTask.getTaskId(), clusterTask);
329435
while (!clusterTask.isFinished()) {
330436
clusterTask.startProcessing();
@@ -345,6 +451,29 @@ public <REQUEST extends Message, RESPONSE extends Message> RESPONSE executeServi
345451
throw new RuntimeException("Error: execute cluster service method failed:" + serviceName + ", " + method);
346452
}
347453

454+
@Override
455+
public <MESSAGE extends Message> void executeServiceBroadcast(String serviceName, String method, MESSAGE message) {
456+
LOGGER.info("Cluster node: {} - execute service broadcast method {}/{}", localNode.getNodeId(), serviceName, method);
457+
List<ClusterNode> clusterNodes = nodesByServiceName.get(serviceName);
458+
if (clusterNodes != null) {
459+
ClusterServiceBroadcastMessage broadcastMessage = new ClusterServiceBroadcastMessage()
460+
.setServiceName(serviceName)
461+
.setMethodName(method)
462+
.setMessage(message);
463+
clusterNodes.forEach(node -> node.writeMessage(broadcastMessage));
464+
}
465+
}
466+
467+
public void handleServiceBroadcastMessage(ClusterServiceBroadcastMessage broadcastMessage, ClusterNode clusterNode) {
468+
String serviceName = broadcastMessage.getServiceName();
469+
String method = broadcastMessage.getMethodName();
470+
LOGGER.info("Cluster node [{}]: handle broadcast message from {}: {}/{}", localNode.getNodeId(), clusterNode.getNodeData().getNodeId(), serviceName, method);
471+
AbstractClusterService clusterService = localServices.get(serviceName);
472+
if (clusterService != null) {
473+
taskExecutor.execute(() -> clusterService.handleMessage(method, broadcastMessage.getMessage()));
474+
}
475+
}
476+
348477

349478
private void executeClusterTask(ClusterTask clusterTask) {
350479
if (clusterTask.isRetryLimitReached()) {
@@ -357,7 +486,16 @@ private void executeClusterTask(ClusterTask clusterTask) {
357486
clusterTask.addExecutionAttempt();
358487

359488
AbstractClusterService localService = localServices.get(clusterTask.getServiceName());
360-
ClusterNode clusterNode = getBestServiceNode(clusterTask.getServiceName());
489+
ClusterNode clusterNode;
490+
if (clusterTask.isFixedServiceNode()) {
491+
if (localNode.getNodeId().equals(clusterTask.getFixedServiceNodeId())) {
492+
clusterNode = null;
493+
} else {
494+
clusterNode = clusterNodeMap.get(clusterTask.getFixedServiceNodeId());
495+
}
496+
} else {
497+
clusterNode = getBestServiceNode(clusterTask.getServiceName());
498+
}
361499
if (localService != null && clusterNode != null) {
362500
if (getActiveTasks() <= clusterNode.getActiveTasks()) {
363501
runLocalClusterTask(localService, clusterTask);
@@ -488,4 +626,12 @@ public synchronized List<String> getClusterNodeServices(ClusterNode clusterNode)
488626
List<String> services = servicesByNode.get(clusterNode);
489627
return new ArrayList<>(services == null ? Collections.emptyList() : services);
490628
}
629+
630+
public ClusterNodeData getLeaderNode() {
631+
return leaderNode;
632+
}
633+
634+
public boolean isLeaderNode() {
635+
return leaderNode != null && leaderNode.getNodeId().equals(localNode.getNodeId());
636+
}
491637
}

src/main/java/org/teamapps/cluster/core/ClusterConnection.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import java.io.*;
3030
import java.lang.invoke.MethodHandles;
3131
import java.net.Socket;
32-
import java.util.List;
3332
import java.util.UUID;
3433
import java.util.concurrent.ArrayBlockingQueue;
3534
import java.util.concurrent.TimeUnit;
@@ -69,21 +68,18 @@ public ClusterConnection(Cluster cluster, Socket socket) {
6968
startWriterThread();
7069
}
7170

72-
public ClusterConnection(Cluster cluster, ClusterNodeData peerNode, List<String> localServices) {
71+
public ClusterConnection(Cluster cluster, ClusterNodeData peerNode, ClusterConnectionRequest clusterConnectionRequest) {
7372
this.cluster = cluster;
7473
this.aesCipher = new AesCipher(cluster.getClusterConfig().getClusterSecret());
7574
this.remoteHostAddress = peerNode;
7675
this.tempDir = cluster.getTempDir();
7776
this.incomingConnection = false;
7877
connect(peerNode);
7978
if (connected) {
80-
String[] localServicesArray = localServices.isEmpty() ? null : localServices.toArray(localServices.toArray(new String[0]));
8179
startReaderThread();
8280
startWriterThread();
83-
ClusterConnectionRequest clusterConnectionRequest = new ClusterConnectionRequest()
84-
.setLocalNode(cluster.getLocalNode())
85-
.setLocalServices(localServicesArray);
8681
writeDirectMessage(clusterConnectionRequest);
82+
8783
}
8884
}
8985

@@ -118,13 +114,15 @@ private void startReaderThread() {
118114
byte[] data = new byte[messageSize];
119115
dataInputStream.readFully(data);
120116
byte[] messageData = aesCipher.decrypt(data);
121-
Message message = new Message(messageData);
117+
Message message = new Message(messageData, this);
122118
switch (message.getMessageDefUuid()) {
123119
case ClusterServiceMethodRequest.OBJECT_UUID -> handleClusterServiceMethodRequest(ClusterServiceMethodRequest.remap(message));
124120
case ClusterServiceMethodResult.OBJECT_UUID -> handleClusterServiceMethodResult(ClusterServiceMethodResult.remap(message));
121+
case ClusterServiceBroadcastMessage.OBJECT_UUID -> handleClusterServiceBroadcastMessage(ClusterServiceBroadcastMessage.remap(message));
125122
case ClusterMessageFilePart.OBJECT_UUID -> handleClusterMessageFilePart(ClusterMessageFilePart.remap(message));
126123
case ClusterAvailableServicesUpdate.OBJECT_UUID -> handleClusterAvailableServicesUpdate(ClusterAvailableServicesUpdate.remap(message));
127124
case ClusterNewPeerInfo.OBJECT_UUID -> handleClusterNewPeerInfo(ClusterNewPeerInfo.remap(message));
125+
case ClusterNewLeaderInfo.OBJECT_UUID -> handleClusterNewLeaderInfo(ClusterNewLeaderInfo.remap(message));
128126
case ClusterConnectionRequest.OBJECT_UUID -> handleClusterConnectionRequest(ClusterConnectionRequest.remap(message));
129127
case ClusterConnectionResult.OBJECT_UUID -> handleClusterConnectionResult(ClusterConnectionResult.remap(message));
130128
case ClusterNodeShutDownInfo.OBJECT_UUID -> {
@@ -149,6 +147,13 @@ private void startReaderThread() {
149147
thread.start();
150148
}
151149

150+
private void handleClusterServiceBroadcastMessage(ClusterServiceBroadcastMessage broadcastMessage) {
151+
cluster.handleServiceBroadcastMessage(broadcastMessage, clusterNode);
152+
}
153+
154+
private void handleClusterNewLeaderInfo(ClusterNewLeaderInfo newLeaderInfo) {
155+
cluster.handleClusterNewLeaderInfo(newLeaderInfo, clusterNode);
156+
}
152157

153158
private void handleClusterNewPeerInfo(ClusterNewPeerInfo newPeerInfo) {
154159
cluster.handleClusterNewPeerInfo(newPeerInfo, clusterNode);

0 commit comments

Comments
 (0)