Skip to content
This repository was archived by the owner on Feb 5, 2024. It is now read-only.

Commit 7be0d26

Browse files
committed
Add Announce functionality
1 parent a63df39 commit 7be0d26

10 files changed

+114
-12
lines changed

src/main/java/communication/DirectMessageReceiver.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
package communication;
22

3+
import java.util.concurrent.ExecutionException;
4+
import java.util.concurrent.TimeUnit;
5+
import java.util.concurrent.TimeoutException;
6+
37
import org.apache.log4j.Logger;
48
import org.zeromq.ZMQ;
59
import org.zeromq.ZMQ.Socket;
610

711
import control.FBase;
812
import crypto.CryptoProvider.EncryptionAlgorithm;
913
import de.hasenburg.fbase.model.GetMissedMessageResponse;
14+
import exceptions.FBaseEncryptionException;
1015
import exceptions.FBaseException;
1116
import model.JSONable;
1217
import model.config.NodeConfig;
@@ -53,8 +58,10 @@ protected void interpreteReceivedEnvelope(Envelope envelope, Socket responseSock
5358

5459
envelope.getMessage().decryptFields(fBase.configuration.getPrivateKey(),
5560
EncryptionAlgorithm.RSA);
56-
envelope.getMessage().verifyMessage(requestingNode.getPublicKey(),
57-
EncryptionAlgorithm.RSA);
61+
if (!envelope.getMessage().verifyMessage(requestingNode.getPublicKey(),
62+
EncryptionAlgorithm.RSA)) {
63+
throw new FBaseEncryptionException("The message was not signed correctly");
64+
}
5865

5966
// INTERPRET MESSAGE
6067
// if slow, it might be wise to create another task which executes the processing
@@ -74,6 +81,17 @@ protected void interpreteReceivedEnvelope(Envelope envelope, Socket responseSock
7481
} catch (FBaseException e) {
7582
responseMessage.setTextualInfo("Required messageID not parseable");
7683
}
84+
} else if (Command.ANNOUNCE_OWN_NODE_CONFIGURATION_CHANGE.equals(envelope.getMessage().getCommand())) {
85+
responseMessage.setContent("true");
86+
try {
87+
boolean status = fBase.taskmanager.runAnnounceUpdateOfOwnNodeConfigurationTask().get(5, TimeUnit.SECONDS);
88+
if (status == false) {
89+
responseMessage.setContent("Announcing failed, check log of target machine");
90+
}
91+
} catch (InterruptedException | ExecutionException | TimeoutException e) {
92+
responseMessage.setContent(e.getMessage());
93+
}
94+
7795
} else {
7896
responseMessage.setTextualInfo(
7997
"Unknown command " + envelope.getMessage().getCommand().toString());

src/main/java/communication/DirectMessageSender.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import model.messages.Command;
2020
import model.messages.Envelope;
2121
import model.messages.Message;
22+
import tasks.AnnounceUpdateOfOwnNodeConfigurationTaskTest;
2223

2324
/**
2425
* Sends requests to designated receivers.
@@ -35,13 +36,22 @@ public class DirectMessageSender extends AbstractSender {
3536

3637
/**
3738
* Initializes the Message, it then can be used without further modifications.
39+
* Messages are send to a random machine of the given node configuration.
3840
*/
3941
public DirectMessageSender(NodeConfig targetNode, FBase fBase) {
4042
super(getRandomAddress(targetNode.getMachines()), targetNode.getMessagePort(), ZMQ.REQ);
4143
this.fBase = fBase;
4244
this.targetNode = targetNode;
4345
}
4446

47+
/**
48+
* Initializes the Message, it then can be used without further modifications.
49+
*/
50+
public DirectMessageSender(String targetAddress, int targetPort, FBase fBase) {
51+
super(targetAddress, targetPort, ZMQ.REQ);
52+
this.fBase = fBase;
53+
}
54+
4555
private static String getRandomAddress(List<String> machines) {
4656
int randomNum = ThreadLocalRandom.current().nextInt(0, machines.size());
4757
return "tcp://" + machines.get(randomNum);
@@ -125,6 +135,35 @@ public GetMissedMessageResponse sendGetDataRecord(MessageID messageID)
125135
}
126136
}
127137

138+
/**
139+
* Ask the target machine to run {@link AnnounceUpdateOfOwnNodeConfigurationTaskTest}.
140+
* This is necessary if a new machine was added to a node, because node configuration
141+
* updates are distributed via the publisher to other nodes (and other nodes have not
142+
* subscribed to the new machine yet).
143+
*
144+
* @throws FBaseCommunicationException - if other node declines/cannot be reached
145+
*/
146+
public void sendAnnounceMeRequest() throws FBaseCommunicationException {
147+
Message m = new Message();
148+
m.setContent(Command.ANNOUNCE_OWN_NODE_CONFIGURATION_CHANGE.toString());
149+
m.setCommand(Command.ANNOUNCE_OWN_NODE_CONFIGURATION_CHANGE);
150+
try {
151+
String answer = send(createEncryptedEnvelope(m, fBase.configuration.getPublicKey()), null, null);
152+
Message response = createDecryptedMessage(answer, fBase.configuration.getPublicKey());
153+
154+
if (response.getContent().equals("true")) {
155+
logger.debug("The other machine announced me");
156+
return;
157+
} else {
158+
logger.debug("The other machine could not announce me");
159+
throw new FBaseCommunicationException("The other machine could not announce me " + response.getContent());
160+
}
161+
} catch (FBaseEncryptionException e1) {
162+
logger.error(e1.getMessage(), e1);
163+
throw new FBaseCommunicationException("Announce Request failed " + e1.getMessage());
164+
}
165+
}
166+
128167
/**
129168
* Create an envelope that is signed with the private key of the node and encrypted with
130169
* the public key of the target node. Also sets

src/main/java/control/FBase.java

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,24 @@
11
package control;
22

3+
import java.util.Iterator;
4+
import java.util.Map;
35
import java.util.concurrent.ExecutionException;
46
import java.util.concurrent.TimeUnit;
57
import java.util.concurrent.TimeoutException;
68

79
import org.apache.log4j.Logger;
10+
import org.javatuples.Pair;
811

912
import communication.DirectMessageReceiver;
13+
import communication.DirectMessageSender;
1014
import communication.MessageIdEvaluator;
1115
import communication.NamingServiceSender;
1216
import communication.Publisher;
1317
import communication.SubscriptionRegistry;
1418
import crypto.CryptoProvider.EncryptionAlgorithm;
1519
import de.hasenburg.fbase.rest.WebServer;
1620
import exceptions.FBaseCommunicationException;
21+
import exceptions.FBaseException;
1722
import exceptions.FBaseNamingServiceException;
1823
import exceptions.FBaseStorageConnectorException;
1924
import model.config.ClientConfig;
@@ -88,6 +93,7 @@ public void startup(boolean announce, boolean backgroundTasks) throws Interrupte
8893

8994
// add machine to node
9095
if (announce) {
96+
Thread.sleep(400); // make sure own heartbeat is in database
9197
announceMachineAdditionToNode();
9298
}
9399

@@ -106,15 +112,35 @@ public void startup(boolean announce, boolean backgroundTasks) throws Interrupte
106112

107113
private void announceMachineAdditionToNode() throws FBaseStorageConnectorException,
108114
FBaseCommunicationException, FBaseNamingServiceException {
109-
if (connector.heartbeats_listAll().size() <= 1) {
115+
Map<String, Pair<String, Long>> heartbeats = connector.heartbeats_listAll();
116+
if (heartbeats.size() <= 1) {
117+
logger.debug("We are the first machine of the node");
110118
// update myself (must exist before, created by another node)
111-
taskmanager.runAnnounceUpdateOfOwnNodeConfigurationTask();
119+
taskmanager.runAnnounceUpdateOfOwnNodeConfigurationTask();
120+
// TODO 2: get all keygroups in which node is either replica/trigger node (important for restart)
121+
return;
112122
}
113123

114-
115-
// TODO 1: Tell a machine that is already registered about addition (we need a one-to-one
116-
// here)
117-
124+
logger.debug("In total, the node has " + heartbeats.size() + " including myself.");
125+
Iterator<String> iterator = heartbeats.keySet().iterator();
126+
while (iterator.hasNext()) {
127+
String next = iterator.next();
128+
if (!configuration.getMachineName().equals(next)) {
129+
DirectMessageSender sender = new DirectMessageSender("tcp://" + heartbeats.get(next).getValue0(), configuration.getMessagePort(), this);
130+
try {
131+
sender.sendAnnounceMeRequest();
132+
sender.shutdown();
133+
break;
134+
} catch (FBaseException e) {
135+
logger.debug("Machine " + next + " could not announce me, trying with another one");
136+
}
137+
sender.shutdown();
138+
}
139+
// no machine left for announcing
140+
if (iterator.hasNext()) {
141+
logger.fatal("No machine could announce me, shutting down");
142+
}
143+
}
118144
}
119145

120146
public void tearDown() {

src/main/java/storageconnector/S3DBConnector.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ public void deleteBuckets() throws FBaseStorageConnectorException {
140140
break;
141141
}
142142
}
143+
logger.debug("Deleting bucket " + bucketName);
143144
s3.deleteBucket(bucketName);
144145
} catch (AmazonServiceException e) {
145146
throw new FBaseStorageConnectorException(e);

src/main/java/tasks/AnnounceUpdateOfOwnNodeConfigurationTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public Boolean executeFunctionality() {
5454
+ e.getMessage());
5555
}
5656

57-
// publish to all keygroups the machine is responsible for
57+
// publish to all keygroups the node is responsible for
5858
Set<KeygroupID> responsibilities =
5959
fBase.connector.keyGroupSubscriberMachines_listAll().keySet();
6060
for (KeygroupID keygroupID : responsibilities) {

src/main/java/tasks/background/PollLatestConfigurationDataForResponsibleKeygroupsTask.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@
3535
*
3636
* TODO T: Write test for this task
3737
*
38+
* TODO 2: It might make sense to ask the naming service about all keygroups this node is responsbile for,
39+
* instead of relying on the data from the responsibility table
40+
*
3841
* @author jonathanhasenburg
3942
*
4043
*/

src/test/java/communication/NamingServiceSenderTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public class NamingServiceSenderTest {
4848
private static ExecutorService executor;
4949
private static FBase fbase;
5050
private static final String ownNodeConfigJSONPath =
51-
"src/test/resources/NamingServiceSenderTest_NodeConfig.json";
51+
"src/test/resources/NamingServiceSenderTest_NodeConfig1.json";
5252

5353
private static NamingServiceSender localSender = null;
5454
private static NamingServiceSender nsSender = null;

src/test/java/tasks/FBaseFactory.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ public class FBaseFactory {
1313
private static final String BASIC_PROPERTIES_FILE = "FBaseFactory_Basic";
1414
private static final String NAMING_SERVICE_PROPERTIES_FILE = "FBaseFactory_NamingService";
1515

16-
1716
public static FBase basic(int instanceNumber, boolean announce, boolean backgroundTasks)
1817
throws FBaseStorageConnectorException, InterruptedException, ExecutionException,
1918
TimeoutException, FBaseCommunicationException, FBaseNamingServiceException {

src/test/resources/NamingServiceSenderTest_NodeConfig.json renamed to src/test/resources/NamingServiceSenderTest_NodeConfig1.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,5 @@
1212
"messagePort": 6001,
1313
"restPort": 8081,
1414
"location": "Everywhere",
15-
"description": "Machine specified in JUnit tests"
15+
"description": "Machine specified in JUnit tests (Node 1)"
1616
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
{
2+
"version": 1,
3+
"nodeID": {
4+
"nodeID": "N2"
5+
},
6+
"publicKey": "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAqMCldz9e44B0I6V916WUX5tHlw0S4M+zUniAxeRq3rHFOtlKA+ARI8MW85sDz8MAMT51HxhcXOc09L76s0Nh2eWYdvFkaTH66BH2/d6QBIjmPOj8ZWM3t+U6CzqhN9wXqNKLj19Aop+cy4KqhfstiQpAb33803Wto/Zm+Gt3XQEpH/IRs5fgrnp73I4LDenosz5t/Xjo3DMG1+w9fx9ccB+BUikgFPHVDM5bHVT3KZG/9F3Qh4sYNXKWgNuLGpn2v6dVEAFZO/QM4UzJES7O0vHt9Dis4XQloCXqQaZ3FpCd8fQV7xnNO33gdyIJZyjEv8O5ucSuVbddsr+63cQ76QIDAQAB",
7+
"encryptionAlgorithm": "RSA",
8+
"machines": [
9+
"m1"
10+
],
11+
"publisherPort": 7002,
12+
"messagePort": 6002,
13+
"restPort": 8082,
14+
"location": "Everywhere",
15+
"description": "Configuration fitting for Node 2"
16+
}

0 commit comments

Comments
 (0)