Skip to content
This repository was archived by the owner on Feb 5, 2024. It is now read-only.

Commit c9a93a6

Browse files
authored
Merge pull request #23 from OpenFogStack/announce
Announce and shutdown functionality
2 parents a63df39 + 448c51f commit c9a93a6

13 files changed

+157
-25
lines changed

src/main/java/communication/DirectMessageReceiver.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
package communication;
22

3+
import java.util.concurrent.ExecutionException;
4+
import java.util.concurrent.TimeUnit;
5+
import java.util.concurrent.TimeoutException;
6+
37
import org.apache.log4j.Logger;
48
import org.zeromq.ZMQ;
59
import org.zeromq.ZMQ.Socket;
610

711
import control.FBase;
812
import crypto.CryptoProvider.EncryptionAlgorithm;
913
import de.hasenburg.fbase.model.GetMissedMessageResponse;
14+
import exceptions.FBaseEncryptionException;
1015
import exceptions.FBaseException;
1116
import model.JSONable;
1217
import model.config.NodeConfig;
@@ -53,8 +58,10 @@ protected void interpreteReceivedEnvelope(Envelope envelope, Socket responseSock
5358

5459
envelope.getMessage().decryptFields(fBase.configuration.getPrivateKey(),
5560
EncryptionAlgorithm.RSA);
56-
envelope.getMessage().verifyMessage(requestingNode.getPublicKey(),
57-
EncryptionAlgorithm.RSA);
61+
if (!envelope.getMessage().verifyMessage(requestingNode.getPublicKey(),
62+
EncryptionAlgorithm.RSA)) {
63+
throw new FBaseEncryptionException("The message was not signed correctly");
64+
}
5865

5966
// INTERPRET MESSAGE
6067
// if slow, it might be wise to create another task which executes the processing
@@ -74,6 +81,17 @@ protected void interpreteReceivedEnvelope(Envelope envelope, Socket responseSock
7481
} catch (FBaseException e) {
7582
responseMessage.setTextualInfo("Required messageID not parseable");
7683
}
84+
} else if (Command.ANNOUNCE_OWN_NODE_CONFIGURATION_CHANGE.equals(envelope.getMessage().getCommand())) {
85+
responseMessage.setContent("true");
86+
try {
87+
boolean status = fBase.taskmanager.runAnnounceUpdateOfOwnNodeConfigurationTask().get(5, TimeUnit.SECONDS);
88+
if (status == false) {
89+
responseMessage.setContent("Announcing failed, check log of target machine");
90+
}
91+
} catch (InterruptedException | ExecutionException | TimeoutException e) {
92+
responseMessage.setContent(e.getMessage());
93+
}
94+
7795
} else {
7896
responseMessage.setTextualInfo(
7997
"Unknown command " + envelope.getMessage().getCommand().toString());

src/main/java/communication/DirectMessageSender.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import model.messages.Command;
2020
import model.messages.Envelope;
2121
import model.messages.Message;
22+
import tasks.AnnounceUpdateOfOwnNodeConfigurationTaskTest;
2223

2324
/**
2425
* Sends requests to designated receivers.
@@ -35,13 +36,22 @@ public class DirectMessageSender extends AbstractSender {
3536

3637
/**
3738
* Initializes the Message, it then can be used without further modifications.
39+
* Messages are send to a random machine of the given node configuration.
3840
*/
3941
public DirectMessageSender(NodeConfig targetNode, FBase fBase) {
4042
super(getRandomAddress(targetNode.getMachines()), targetNode.getMessagePort(), ZMQ.REQ);
4143
this.fBase = fBase;
4244
this.targetNode = targetNode;
4345
}
4446

47+
/**
48+
* Initializes the Message, it then can be used without further modifications.
49+
*/
50+
public DirectMessageSender(String targetAddress, int targetPort, FBase fBase) {
51+
super(targetAddress, targetPort, ZMQ.REQ);
52+
this.fBase = fBase;
53+
}
54+
4555
private static String getRandomAddress(List<String> machines) {
4656
int randomNum = ThreadLocalRandom.current().nextInt(0, machines.size());
4757
return "tcp://" + machines.get(randomNum);
@@ -125,6 +135,35 @@ public GetMissedMessageResponse sendGetDataRecord(MessageID messageID)
125135
}
126136
}
127137

138+
/**
139+
* Ask the target machine to run {@link AnnounceUpdateOfOwnNodeConfigurationTaskTest}.
140+
* This is necessary if a new machine was added to a node, because node configuration
141+
* updates are distributed via the publisher to other nodes (and other nodes have not
142+
* subscribed to the new machine yet).
143+
*
144+
* @throws FBaseCommunicationException - if other node declines/cannot be reached
145+
*/
146+
public void sendAnnounceMeRequest() throws FBaseCommunicationException {
147+
Message m = new Message();
148+
m.setContent(Command.ANNOUNCE_OWN_NODE_CONFIGURATION_CHANGE.toString());
149+
m.setCommand(Command.ANNOUNCE_OWN_NODE_CONFIGURATION_CHANGE);
150+
try {
151+
String answer = send(createEncryptedEnvelope(m, fBase.configuration.getPublicKey()), null, null);
152+
Message response = createDecryptedMessage(answer, fBase.configuration.getPublicKey());
153+
154+
if (response.getContent().equals("true")) {
155+
logger.debug("The other machine announced me");
156+
return;
157+
} else {
158+
logger.debug("The other machine could not announce me");
159+
throw new FBaseCommunicationException("The other machine could not announce me " + response.getContent());
160+
}
161+
} catch (FBaseEncryptionException e1) {
162+
logger.error(e1.getMessage(), e1);
163+
throw new FBaseCommunicationException("Announce Request failed " + e1.getMessage());
164+
}
165+
}
166+
128167
/**
129168
* Create an envelope that is signed with the private key of the node and encrypted with
130169
* the public key of the target node. Also sets

src/main/java/communication/MessageIdEvaluator.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
* he finds a gap, he will create a directMessageSender and ask the related node about the send
3232
* data.
3333
*
34+
* TODO 2: clean ID storage from time to time
35+
*
3436
* @author jonathanhasenburg
3537
*
3638
*/

src/main/java/control/FBase.java

Lines changed: 63 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,27 @@
11
package control;
22

3+
import java.util.ArrayList;
4+
import java.util.Iterator;
5+
import java.util.List;
6+
import java.util.Map;
37
import java.util.concurrent.ExecutionException;
8+
import java.util.concurrent.Future;
49
import java.util.concurrent.TimeUnit;
510
import java.util.concurrent.TimeoutException;
611

712
import org.apache.log4j.Logger;
13+
import org.javatuples.Pair;
814

915
import communication.DirectMessageReceiver;
16+
import communication.DirectMessageSender;
1017
import communication.MessageIdEvaluator;
1118
import communication.NamingServiceSender;
1219
import communication.Publisher;
1320
import communication.SubscriptionRegistry;
1421
import crypto.CryptoProvider.EncryptionAlgorithm;
1522
import de.hasenburg.fbase.rest.WebServer;
1623
import exceptions.FBaseCommunicationException;
24+
import exceptions.FBaseException;
1725
import exceptions.FBaseNamingServiceException;
1826
import exceptions.FBaseStorageConnectorException;
1927
import model.config.ClientConfig;
@@ -32,6 +40,10 @@
3240
/**
3341
* Main control class of a FBase machine.
3442
*
43+
* TODO 2: Missing functionality: <br>
44+
* * clean up of data records that are stored longer than needed<br>
45+
* * clean up of message history of outgoing messages
46+
*
3547
* @author jonathanhasenburg
3648
*
3749
*/
@@ -50,6 +62,8 @@ public class FBase {
5062
public MessageIdEvaluator messageIdEvaluator = null;
5163
private WebServer server = null;
5264

65+
private List<Future<Boolean>> backgroundTaskList = new ArrayList<>();
66+
5367
public FBase(String configName) {
5468
configuration = new Configuration(configName);
5569
}
@@ -58,8 +72,7 @@ public void startup(boolean announce, boolean backgroundTasks) throws Interrupte
5872
ExecutionException, TimeoutException, FBaseStorageConnectorException,
5973
FBaseCommunicationException, FBaseNamingServiceException {
6074
if (Connector.S3.equals(configuration.getDatabaseConnector())) {
61-
connector =
62-
new S3DBConnector(configuration.getNodeID());
75+
connector = new S3DBConnector(configuration.getNodeID());
6376
} else {
6477
connector = new OnHeapDBConnector(configuration.getNodeID());
6578
}
@@ -84,20 +97,23 @@ public void startup(boolean announce, boolean backgroundTasks) throws Interrupte
8497
messageIdEvaluator.startup();
8598

8699
// start putting heartbeats (pulse 0 = default)
87-
taskmanager.startBackgroundPutHeartbeatTask(0);
100+
backgroundTaskList.add(taskmanager.startBackgroundPutHeartbeatTask(0));
88101

89102
// add machine to node
90103
if (announce) {
104+
Thread.sleep(400); // make sure own heartbeat is in database
91105
announceMachineAdditionToNode();
92106
}
93107

94108
// start other background tasks (interval 0 = default)
95109
if (backgroundTasks) {
96-
taskmanager.startBackgroundPollLatesConfigurationDataForResponsibleKeygroupsTask(0);
97-
taskmanager.startBackgroundCheckKeygroupConfigurationsOnUpdatesTask(0);
98-
taskmanager.startDetectMissingHeartbeatsTask(0, 0);
99-
taskmanager.startBackgroundDetectMissingResponsibility(0);
100-
taskmanager.startBackgroundDetectLostResponsibility(0);
110+
backgroundTaskList.add(taskmanager
111+
.startBackgroundPollLatesConfigurationDataForResponsibleKeygroupsTask(0));
112+
backgroundTaskList
113+
.add(taskmanager.startBackgroundCheckKeygroupConfigurationsOnUpdatesTask(0));
114+
backgroundTaskList.add(taskmanager.startDetectMissingHeartbeatsTask(0, 0));
115+
backgroundTaskList.add(taskmanager.startBackgroundDetectMissingResponsibility(0));
116+
backgroundTaskList.add(taskmanager.startBackgroundDetectLostResponsibility(0));
101117
}
102118

103119
Thread.sleep(50);
@@ -106,32 +122,61 @@ public void startup(boolean announce, boolean backgroundTasks) throws Interrupte
106122

107123
private void announceMachineAdditionToNode() throws FBaseStorageConnectorException,
108124
FBaseCommunicationException, FBaseNamingServiceException {
109-
if (connector.heartbeats_listAll().size() <= 1) {
125+
Map<String, Pair<String, Long>> heartbeats = connector.heartbeats_listAll();
126+
if (heartbeats.size() <= 1) {
127+
logger.debug("We are the first machine of the node");
110128
// update myself (must exist before, created by another node)
111129
taskmanager.runAnnounceUpdateOfOwnNodeConfigurationTask();
130+
// TODO 2: get all keygroups in which node is either replica/trigger node from
131+
// naming service (important for restart)
132+
return;
112133
}
113-
114-
115-
// TODO 1: Tell a machine that is already registered about addition (we need a one-to-one
116-
// here)
117134

135+
logger.debug("In total, the node has " + heartbeats.size() + " including myself.");
136+
Iterator<String> iterator = heartbeats.keySet().iterator();
137+
while (iterator.hasNext()) {
138+
String next = iterator.next();
139+
if (!configuration.getMachineName().equals(next)) {
140+
DirectMessageSender sender =
141+
new DirectMessageSender("tcp://" + heartbeats.get(next).getValue0(),
142+
configuration.getMessagePort(), this);
143+
try {
144+
sender.sendAnnounceMeRequest();
145+
sender.shutdown();
146+
break;
147+
} catch (FBaseException e) {
148+
logger.debug(
149+
"Machine " + next + " could not announce me, trying with another one");
150+
}
151+
sender.shutdown();
152+
}
153+
// no machine left for announcing
154+
if (iterator.hasNext()) {
155+
logger.fatal("No machine could announce me, shutting down");
156+
}
157+
}
118158
}
119159

120160
public void tearDown() {
121-
// TODO 1: stop all background tasks
161+
logger.info("Stopping background tasks");
162+
for (Future<Boolean> backgroundTask : backgroundTaskList) {
163+
backgroundTask.cancel(true);
164+
}
165+
if (server != null) {
166+
server.stopServer();
167+
}
122168
subscriptionRegistry.deleteAllData();
169+
messageIdEvaluator.tearDown();
123170
publisher.shutdown();
124171
namingServiceSender.shutdown();
125-
messageIdEvaluator.tearDown();
126172
directMessageReceiver.stopReception();
127-
if (server != null) {
128-
server.stopServer();
129-
}
173+
taskmanager.tearDown();
130174
try {
131175
Thread.sleep(500);
132176
} catch (InterruptedException e) {
133177
e.printStackTrace();
134178
}
179+
System.exit(0);
135180
}
136181

137182
public void fillWithData() throws FBaseStorageConnectorException, InterruptedException,

src/main/java/control/Starter.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package control;
22

3+
import java.io.IOException;
34
import java.util.concurrent.ExecutionException;
45
import java.util.concurrent.TimeoutException;
56

@@ -11,7 +12,7 @@ public class Starter {
1112

1213
public static void main(String[] args)
1314
throws FBaseStorageConnectorException, InterruptedException, ExecutionException,
14-
TimeoutException, FBaseCommunicationException, FBaseNamingServiceException {
15+
TimeoutException, FBaseCommunicationException, FBaseNamingServiceException, IOException {
1516
FBase fbase;
1617
if (args.length == 1) {
1718
fbase = new FBase(args[0]);
@@ -20,6 +21,10 @@ public static void main(String[] args)
2021
}
2122
fbase.startup(true, true); // TODO 2: parse from args
2223
//fbase.fillWithData();
24+
25+
System.out.println("FBase started, press any key to stop.");
26+
System.in.read();
27+
fbase.tearDown();
2328
}
2429

2530
}

src/main/java/storageconnector/S3DBConnector.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ public void deleteBuckets() throws FBaseStorageConnectorException {
140140
break;
141141
}
142142
}
143+
logger.debug("Deleting bucket " + bucketName);
143144
s3.deleteBucket(bucketName);
144145
} catch (AmazonServiceException e) {
145146
throw new FBaseStorageConnectorException(e);

src/main/java/tasks/AnnounceUpdateOfOwnNodeConfigurationTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public Boolean executeFunctionality() {
5454
+ e.getMessage());
5555
}
5656

57-
// publish to all keygroups the machine is responsible for
57+
// publish to all keygroups the node is responsible for
5858
Set<KeygroupID> responsibilities =
5959
fBase.connector.keyGroupSubscriberMachines_listAll().keySet();
6060
for (KeygroupID keygroupID : responsibilities) {

src/main/java/tasks/TaskManager.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ public enum TaskName {
4646
B_DETECT_LOST_RESPONSIBILITY
4747
}
4848

49+
public void tearDown() {
50+
pool.shutdownNow();
51+
}
52+
4953
public void storeHistory() {
5054
storingHistory = true;
5155
}

src/main/java/tasks/background/PollLatestConfigurationDataForResponsibleKeygroupsTask.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@
3535
*
3636
* TODO T: Write test for this task
3737
*
38+
* TODO 2: It might make sense to ask the naming service about all keygroups this node is responsbile for,
39+
* instead of relying on the data from the responsibility table
40+
*
3841
* @author jonathanhasenburg
3942
*
4043
*/

src/test/java/communication/NamingServiceSenderTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public class NamingServiceSenderTest {
4848
private static ExecutorService executor;
4949
private static FBase fbase;
5050
private static final String ownNodeConfigJSONPath =
51-
"src/test/resources/NamingServiceSenderTest_NodeConfig.json";
51+
"src/test/resources/NamingServiceSenderTest_NodeConfig1.json";
5252

5353
private static NamingServiceSender localSender = null;
5454
private static NamingServiceSender nsSender = null;

src/test/java/tasks/FBaseFactory.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ public class FBaseFactory {
1313
private static final String BASIC_PROPERTIES_FILE = "FBaseFactory_Basic";
1414
private static final String NAMING_SERVICE_PROPERTIES_FILE = "FBaseFactory_NamingService";
1515

16-
1716
public static FBase basic(int instanceNumber, boolean announce, boolean backgroundTasks)
1817
throws FBaseStorageConnectorException, InterruptedException, ExecutionException,
1918
TimeoutException, FBaseCommunicationException, FBaseNamingServiceException {

src/test/resources/NamingServiceSenderTest_NodeConfig.json renamed to src/test/resources/NamingServiceSenderTest_NodeConfig1.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,5 @@
1212
"messagePort": 6001,
1313
"restPort": 8081,
1414
"location": "Everywhere",
15-
"description": "Machine specified in JUnit tests"
15+
"description": "Machine specified in JUnit tests (Node 1)"
1616
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
{
2+
"version": 1,
3+
"nodeID": {
4+
"nodeID": "N2"
5+
},
6+
"publicKey": "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAqMCldz9e44B0I6V916WUX5tHlw0S4M+zUniAxeRq3rHFOtlKA+ARI8MW85sDz8MAMT51HxhcXOc09L76s0Nh2eWYdvFkaTH66BH2/d6QBIjmPOj8ZWM3t+U6CzqhN9wXqNKLj19Aop+cy4KqhfstiQpAb33803Wto/Zm+Gt3XQEpH/IRs5fgrnp73I4LDenosz5t/Xjo3DMG1+w9fx9ccB+BUikgFPHVDM5bHVT3KZG/9F3Qh4sYNXKWgNuLGpn2v6dVEAFZO/QM4UzJES7O0vHt9Dis4XQloCXqQaZ3FpCd8fQV7xnNO33gdyIJZyjEv8O5ucSuVbddsr+63cQ76QIDAQAB",
7+
"encryptionAlgorithm": "RSA",
8+
"machines": [
9+
"m1"
10+
],
11+
"publisherPort": 7002,
12+
"messagePort": 6002,
13+
"restPort": 8082,
14+
"location": "Everywhere",
15+
"description": "Configuration fitting for Node 2"
16+
}

0 commit comments

Comments
 (0)