Skip to content
This repository was archived by the owner on Feb 5, 2024. It is now read-only.

Announce and shutdown functionality #23

Merged
merged 2 commits into from
Oct 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions src/main/java/communication/DirectMessageReceiver.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package communication;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.log4j.Logger;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.Socket;

import control.FBase;
import crypto.CryptoProvider.EncryptionAlgorithm;
import de.hasenburg.fbase.model.GetMissedMessageResponse;
import exceptions.FBaseEncryptionException;
import exceptions.FBaseException;
import model.JSONable;
import model.config.NodeConfig;
Expand Down Expand Up @@ -53,8 +58,10 @@ protected void interpreteReceivedEnvelope(Envelope envelope, Socket responseSock

envelope.getMessage().decryptFields(fBase.configuration.getPrivateKey(),
EncryptionAlgorithm.RSA);
envelope.getMessage().verifyMessage(requestingNode.getPublicKey(),
EncryptionAlgorithm.RSA);
if (!envelope.getMessage().verifyMessage(requestingNode.getPublicKey(),
EncryptionAlgorithm.RSA)) {
throw new FBaseEncryptionException("The message was not signed correctly");
}

// INTERPRET MESSAGE
// if slow, it might be wise to create another task which executes the processing
Expand All @@ -74,6 +81,17 @@ protected void interpreteReceivedEnvelope(Envelope envelope, Socket responseSock
} catch (FBaseException e) {
responseMessage.setTextualInfo("Required messageID not parseable");
}
} else if (Command.ANNOUNCE_OWN_NODE_CONFIGURATION_CHANGE.equals(envelope.getMessage().getCommand())) {
responseMessage.setContent("true");
try {
boolean status = fBase.taskmanager.runAnnounceUpdateOfOwnNodeConfigurationTask().get(5, TimeUnit.SECONDS);
if (status == false) {
responseMessage.setContent("Announcing failed, check log of target machine");
}
} catch (InterruptedException | ExecutionException | TimeoutException e) {
responseMessage.setContent(e.getMessage());
}

} else {
responseMessage.setTextualInfo(
"Unknown command " + envelope.getMessage().getCommand().toString());
Expand Down
39 changes: 39 additions & 0 deletions src/main/java/communication/DirectMessageSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import model.messages.Command;
import model.messages.Envelope;
import model.messages.Message;
import tasks.AnnounceUpdateOfOwnNodeConfigurationTaskTest;

/**
* Sends requests to designated receivers.
Expand All @@ -35,13 +36,22 @@ public class DirectMessageSender extends AbstractSender {

/**
* Initializes the Message, it then can be used without further modifications.
* Messages are send to a random machine of the given node configuration.
*/
public DirectMessageSender(NodeConfig targetNode, FBase fBase) {
super(getRandomAddress(targetNode.getMachines()), targetNode.getMessagePort(), ZMQ.REQ);
this.fBase = fBase;
this.targetNode = targetNode;
}

/**
* Initializes the Message, it then can be used without further modifications.
*/
public DirectMessageSender(String targetAddress, int targetPort, FBase fBase) {
super(targetAddress, targetPort, ZMQ.REQ);
this.fBase = fBase;
}

private static String getRandomAddress(List<String> machines) {
int randomNum = ThreadLocalRandom.current().nextInt(0, machines.size());
return "tcp://" + machines.get(randomNum);
Expand Down Expand Up @@ -125,6 +135,35 @@ public GetMissedMessageResponse sendGetDataRecord(MessageID messageID)
}
}

/**
* Ask the target machine to run {@link AnnounceUpdateOfOwnNodeConfigurationTaskTest}.
* This is necessary if a new machine was added to a node, because node configuration
* updates are distributed via the publisher to other nodes (and other nodes have not
* subscribed to the new machine yet).
*
* @throws FBaseCommunicationException - if other node declines/cannot be reached
*/
public void sendAnnounceMeRequest() throws FBaseCommunicationException {
Message m = new Message();
m.setContent(Command.ANNOUNCE_OWN_NODE_CONFIGURATION_CHANGE.toString());
m.setCommand(Command.ANNOUNCE_OWN_NODE_CONFIGURATION_CHANGE);
try {
String answer = send(createEncryptedEnvelope(m, fBase.configuration.getPublicKey()), null, null);
Message response = createDecryptedMessage(answer, fBase.configuration.getPublicKey());

if (response.getContent().equals("true")) {
logger.debug("The other machine announced me");
return;
} else {
logger.debug("The other machine could not announce me");
throw new FBaseCommunicationException("The other machine could not announce me " + response.getContent());
}
} catch (FBaseEncryptionException e1) {
logger.error(e1.getMessage(), e1);
throw new FBaseCommunicationException("Announce Request failed " + e1.getMessage());
}
}

/**
* Create an envelope that is signed with the private key of the node and encrypted with
* the public key of the target node. Also sets
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/communication/MessageIdEvaluator.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
* he finds a gap, he will create a directMessageSender and ask the related node about the send
* data.
*
* TODO 2: clean ID storage from time to time
*
* @author jonathanhasenburg
*
*/
Expand Down
81 changes: 63 additions & 18 deletions src/main/java/control/FBase.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
package control;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.log4j.Logger;
import org.javatuples.Pair;

import communication.DirectMessageReceiver;
import communication.DirectMessageSender;
import communication.MessageIdEvaluator;
import communication.NamingServiceSender;
import communication.Publisher;
import communication.SubscriptionRegistry;
import crypto.CryptoProvider.EncryptionAlgorithm;
import de.hasenburg.fbase.rest.WebServer;
import exceptions.FBaseCommunicationException;
import exceptions.FBaseException;
import exceptions.FBaseNamingServiceException;
import exceptions.FBaseStorageConnectorException;
import model.config.ClientConfig;
Expand All @@ -32,6 +40,10 @@
/**
* Main control class of a FBase machine.
*
* TODO 2: Missing functionality: <br>
* * clean up of data records that are stored longer than needed<br>
* * clean up of message history of outgoing messages
*
* @author jonathanhasenburg
*
*/
Expand All @@ -50,6 +62,8 @@ public class FBase {
public MessageIdEvaluator messageIdEvaluator = null;
private WebServer server = null;

private List<Future<Boolean>> backgroundTaskList = new ArrayList<>();

public FBase(String configName) {
configuration = new Configuration(configName);
}
Expand All @@ -58,8 +72,7 @@ public void startup(boolean announce, boolean backgroundTasks) throws Interrupte
ExecutionException, TimeoutException, FBaseStorageConnectorException,
FBaseCommunicationException, FBaseNamingServiceException {
if (Connector.S3.equals(configuration.getDatabaseConnector())) {
connector =
new S3DBConnector(configuration.getNodeID());
connector = new S3DBConnector(configuration.getNodeID());
} else {
connector = new OnHeapDBConnector(configuration.getNodeID());
}
Expand All @@ -84,20 +97,23 @@ public void startup(boolean announce, boolean backgroundTasks) throws Interrupte
messageIdEvaluator.startup();

// start putting heartbeats (pulse 0 = default)
taskmanager.startBackgroundPutHeartbeatTask(0);
backgroundTaskList.add(taskmanager.startBackgroundPutHeartbeatTask(0));

// add machine to node
if (announce) {
Thread.sleep(400); // make sure own heartbeat is in database
announceMachineAdditionToNode();
}

// start other background tasks (interval 0 = default)
if (backgroundTasks) {
taskmanager.startBackgroundPollLatesConfigurationDataForResponsibleKeygroupsTask(0);
taskmanager.startBackgroundCheckKeygroupConfigurationsOnUpdatesTask(0);
taskmanager.startDetectMissingHeartbeatsTask(0, 0);
taskmanager.startBackgroundDetectMissingResponsibility(0);
taskmanager.startBackgroundDetectLostResponsibility(0);
backgroundTaskList.add(taskmanager
.startBackgroundPollLatesConfigurationDataForResponsibleKeygroupsTask(0));
backgroundTaskList
.add(taskmanager.startBackgroundCheckKeygroupConfigurationsOnUpdatesTask(0));
backgroundTaskList.add(taskmanager.startDetectMissingHeartbeatsTask(0, 0));
backgroundTaskList.add(taskmanager.startBackgroundDetectMissingResponsibility(0));
backgroundTaskList.add(taskmanager.startBackgroundDetectLostResponsibility(0));
}

Thread.sleep(50);
Expand All @@ -106,32 +122,61 @@ public void startup(boolean announce, boolean backgroundTasks) throws Interrupte

private void announceMachineAdditionToNode() throws FBaseStorageConnectorException,
FBaseCommunicationException, FBaseNamingServiceException {
if (connector.heartbeats_listAll().size() <= 1) {
Map<String, Pair<String, Long>> heartbeats = connector.heartbeats_listAll();
if (heartbeats.size() <= 1) {
logger.debug("We are the first machine of the node");
// update myself (must exist before, created by another node)
taskmanager.runAnnounceUpdateOfOwnNodeConfigurationTask();
// TODO 2: get all keygroups in which node is either replica/trigger node from
// naming service (important for restart)
return;
}


// TODO 1: Tell a machine that is already registered about addition (we need a one-to-one
// here)

logger.debug("In total, the node has " + heartbeats.size() + " including myself.");
Iterator<String> iterator = heartbeats.keySet().iterator();
while (iterator.hasNext()) {
String next = iterator.next();
if (!configuration.getMachineName().equals(next)) {
DirectMessageSender sender =
new DirectMessageSender("tcp://" + heartbeats.get(next).getValue0(),
configuration.getMessagePort(), this);
try {
sender.sendAnnounceMeRequest();
sender.shutdown();
break;
} catch (FBaseException e) {
logger.debug(
"Machine " + next + " could not announce me, trying with another one");
}
sender.shutdown();
}
// no machine left for announcing
if (iterator.hasNext()) {
logger.fatal("No machine could announce me, shutting down");
}
}
}

public void tearDown() {
// TODO 1: stop all background tasks
logger.info("Stopping background tasks");
for (Future<Boolean> backgroundTask : backgroundTaskList) {
backgroundTask.cancel(true);
}
if (server != null) {
server.stopServer();
}
subscriptionRegistry.deleteAllData();
messageIdEvaluator.tearDown();
publisher.shutdown();
namingServiceSender.shutdown();
messageIdEvaluator.tearDown();
directMessageReceiver.stopReception();
if (server != null) {
server.stopServer();
}
taskmanager.tearDown();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.exit(0);
}

public void fillWithData() throws FBaseStorageConnectorException, InterruptedException,
Expand Down
7 changes: 6 additions & 1 deletion src/main/java/control/Starter.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package control;

import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

Expand All @@ -11,7 +12,7 @@ public class Starter {

public static void main(String[] args)
throws FBaseStorageConnectorException, InterruptedException, ExecutionException,
TimeoutException, FBaseCommunicationException, FBaseNamingServiceException {
TimeoutException, FBaseCommunicationException, FBaseNamingServiceException, IOException {
FBase fbase;
if (args.length == 1) {
fbase = new FBase(args[0]);
Expand All @@ -20,6 +21,10 @@ public static void main(String[] args)
}
fbase.startup(true, true); // TODO 2: parse from args
//fbase.fillWithData();

System.out.println("FBase started, press any key to stop.");
System.in.read();
fbase.tearDown();
}

}
1 change: 1 addition & 0 deletions src/main/java/storageconnector/S3DBConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ public void deleteBuckets() throws FBaseStorageConnectorException {
break;
}
}
logger.debug("Deleting bucket " + bucketName);
s3.deleteBucket(bucketName);
} catch (AmazonServiceException e) {
throw new FBaseStorageConnectorException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public Boolean executeFunctionality() {
+ e.getMessage());
}

// publish to all keygroups the machine is responsible for
// publish to all keygroups the node is responsible for
Set<KeygroupID> responsibilities =
fBase.connector.keyGroupSubscriberMachines_listAll().keySet();
for (KeygroupID keygroupID : responsibilities) {
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/tasks/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public enum TaskName {
B_DETECT_LOST_RESPONSIBILITY
}

public void tearDown() {
pool.shutdownNow();
}

public void storeHistory() {
storingHistory = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
*
* TODO T: Write test for this task
*
* TODO 2: It might make sense to ask the naming service about all keygroups this node is responsbile for,
* instead of relying on the data from the responsibility table
*
* @author jonathanhasenburg
*
*/
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/communication/NamingServiceSenderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class NamingServiceSenderTest {
private static ExecutorService executor;
private static FBase fbase;
private static final String ownNodeConfigJSONPath =
"src/test/resources/NamingServiceSenderTest_NodeConfig.json";
"src/test/resources/NamingServiceSenderTest_NodeConfig1.json";

private static NamingServiceSender localSender = null;
private static NamingServiceSender nsSender = null;
Expand Down
1 change: 0 additions & 1 deletion src/test/java/tasks/FBaseFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ public class FBaseFactory {
private static final String BASIC_PROPERTIES_FILE = "FBaseFactory_Basic";
private static final String NAMING_SERVICE_PROPERTIES_FILE = "FBaseFactory_NamingService";


public static FBase basic(int instanceNumber, boolean announce, boolean backgroundTasks)
throws FBaseStorageConnectorException, InterruptedException, ExecutionException,
TimeoutException, FBaseCommunicationException, FBaseNamingServiceException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@
"messagePort": 6001,
"restPort": 8081,
"location": "Everywhere",
"description": "Machine specified in JUnit tests"
"description": "Machine specified in JUnit tests (Node 1)"
}
16 changes: 16 additions & 0 deletions src/test/resources/NamingServiceSenderTest_NodeConfig2.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"version": 1,
"nodeID": {
"nodeID": "N2"
},
"publicKey": "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAqMCldz9e44B0I6V916WUX5tHlw0S4M+zUniAxeRq3rHFOtlKA+ARI8MW85sDz8MAMT51HxhcXOc09L76s0Nh2eWYdvFkaTH66BH2/d6QBIjmPOj8ZWM3t+U6CzqhN9wXqNKLj19Aop+cy4KqhfstiQpAb33803Wto/Zm+Gt3XQEpH/IRs5fgrnp73I4LDenosz5t/Xjo3DMG1+w9fx9ccB+BUikgFPHVDM5bHVT3KZG/9F3Qh4sYNXKWgNuLGpn2v6dVEAFZO/QM4UzJES7O0vHt9Dis4XQloCXqQaZ3FpCd8fQV7xnNO33gdyIJZyjEv8O5ucSuVbddsr+63cQ76QIDAQAB",
"encryptionAlgorithm": "RSA",
"machines": [
"m1"
],
"publisherPort": 7002,
"messagePort": 6002,
"restPort": 8082,
"location": "Everywhere",
"description": "Configuration fitting for Node 2"
}