Skip to content
This repository was archived by the owner on Feb 5, 2024. It is now read-only.

Commit 83ffb04

Browse files
committed
Add code for: [1] Missing heartbeats detected
1 parent 2c38897 commit 83ffb04

14 files changed

+523
-39
lines changed

ImplementationSubscriptionManagement.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@
44

55
### [1] Missing heartbeats detected
66

7-
- [ ] Background task that stores own heartbeats in the node database
8-
- [ ] Background task to detect missing heartbeats
9-
- [ ] Detected: run RemoveMachineFromNodeTask
7+
- [x] Background task that stores own heartbeats in the node database
8+
- [x] Background task to detect missing heartbeats
9+
- [x] Detected: run RemoveMachineFromNodeTask
1010

1111
### [2] No responsibility detected
1212
- [ ] Background task to detect missing responsibilities

src/main/java/communication/Publisher.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,18 @@ public Publisher(String address, int port) {
2626
}
2727

2828
/**
29-
* Publishes a new envelope to all subscribers and encrypts the message with the given secret and
30-
* algorithm
29+
* Publishes a new envelope to all subscribers and encrypts the message with the given
30+
* secret and algorithm
3131
*
3232
* @param envelope - the published envelope
3333
* @param secret - the secret used for encryption
3434
* @param algorithm - the algorithm used for encryption
3535
* @return null
36-
* @throws FBaseEncryptionException
36+
* @throws FBaseEncryptionException
3737
*/
3838
@Override
39-
public String send(Envelope envelope, String secret, EncryptionAlgorithm algorithm) throws FBaseEncryptionException {
39+
public String send(Envelope envelope, String secret, EncryptionAlgorithm algorithm)
40+
throws FBaseEncryptionException {
4041
logger.debug("Publishing envelope with namespace " + envelope.getConfigID().getID());
4142
envelope.getMessage().encryptFields(secret, algorithm);
4243
sender.sendMore(envelope.getConfigID().getID());

src/main/java/control/FBase.java

Lines changed: 15 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
package control;
22

3-
import java.util.Map;
43
import java.util.concurrent.ExecutionException;
54
import java.util.concurrent.TimeUnit;
65
import java.util.concurrent.TimeoutException;
76

87
import org.apache.log4j.Logger;
9-
import org.javatuples.Pair;
108

119
import communication.DirectMessageReceiver;
1210
import communication.MessageIdEvaluator;
@@ -20,7 +18,6 @@
2018
import exceptions.FBaseStorageConnectorException;
2119
import model.config.ClientConfig;
2220
import model.config.KeygroupConfig;
23-
import model.config.NodeConfig;
2421
import model.data.ClientID;
2522
import model.data.DataIdentifier;
2623
import model.data.DataRecord;
@@ -58,7 +55,7 @@ public FBase(String configName) {
5855
publisher = new Publisher("tcp://0.0.0.0", configuration.getPublisherPort());
5956
}
6057

61-
public void startup(boolean tellEveryone) throws InterruptedException, ExecutionException,
58+
public void startup(boolean announce) throws InterruptedException, ExecutionException,
6259
TimeoutException, FBaseStorageConnectorException, FBaseCommunicationException,
6360
FBaseNamingServiceException {
6461
if (Connector.S3.equals(configuration.getDatabaseConnector())) {
@@ -85,39 +82,28 @@ public void startup(boolean tellEveryone) throws InterruptedException, Execution
8582
subscriptionRegistry = new SubscriptionRegistry(this);
8683
messageIdEvaluator = new MessageIdEvaluator(this);
8784
messageIdEvaluator.startup();
88-
85+
86+
// start putting heartbeats (pulse 0 = default)
87+
taskmanager.startBackgroundPutHeartbeatTask(0);
88+
8989
// add machine to node
90-
addMachineToNodeConfiguration(tellEveryone);
91-
92-
// start background tasks (interval 0 = default)
90+
if (announce) {
91+
announceMachineAdditionToNode();
92+
}
93+
94+
// start other background tasks (interval 0 = default)
9395
taskmanager.startBackgroundPollLatesConfigurationDataForResponsibleKeygroupsTask(0);
9496
taskmanager.startBackgroundCheckKeygroupConfigurationsOnUpdatesTask(0);
97+
taskmanager.startDetectMissingHeartbeatsTask(0, 0);
9598

99+
logger.info("FBase started, all background tasks up and running.");
96100
}
97101

98-
private void addMachineToNodeConfiguration(boolean tellEveryone)
102+
private void announceMachineAdditionToNode()
99103
throws FBaseStorageConnectorException, FBaseCommunicationException,
100104
FBaseNamingServiceException {
101-
NodeConfig nodeConfig = configuration.buildNodeConfigBasedOnData();
102-
// add other machines based on heartbeats (I am not included here, because I did not
103-
// report a heartbeat yet)
104-
Map<String, Pair<String, Long>> heartbeats = connector.heartbeats_listAll();
105-
for (Pair<String, Long> address : heartbeats.values()) {
106-
nodeConfig.addMachine(address.getValue0());
107-
}
108-
109-
if (tellEveryone) {
110-
// only updates are possible, because a node cannot create itself, only other
111-
// nodes can
112-
try {
113-
namingServiceSender.sendNodeConfigUpdate(nodeConfig);
114-
logger.info("Updated node configuration at the namingservice");
115-
} catch (FBaseCommunicationException e) {
116-
logger.info("Could not update node configuration at the naming service: "
117-
+ e.getMessage());
118-
}
119-
// TODO 1: Needs to be published to other nodes, but how?
120-
}
105+
106+
// TODO 1: Tell a node that is already registered about addition (we need a one-to-one here)
121107

122108
}
123109

src/main/java/storageconnector/AbstractDBConnector.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,18 @@ public abstract void heartbeats_update(String machine, String address)
294294
*/
295295
public abstract Map<String, Pair<String, Long>> heartbeats_listAll()
296296
throws FBaseStorageConnectorException;
297+
298+
/**
299+
* HEARTBEATS<br>
300+
* <br>
301+
* removes a machine from the heartbeats table.
302+
*
303+
* @param machine - the machine's name
304+
* @return true if the item no longer exists after the method call, false otherwise
305+
* @throws FBaseStorageConnectorException when the operation fails
306+
*/
307+
public abstract boolean heartbeats_remove(String machine)
308+
throws FBaseStorageConnectorException;
297309

298310
/**
299311
* MESSAGEHISTORY<br>

src/main/java/storageconnector/OnHeapDBConnector.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,12 @@ public Map<String, Pair<String, Long>> heartbeats_listAll()
327327
throws FBaseStorageConnectorException {
328328
return new HashMap<>(heartbeats);
329329
}
330+
331+
@Override
332+
public boolean heartbeats_remove(String machine) throws FBaseStorageConnectorException {
333+
heartbeats.remove(machine);
334+
return true;
335+
}
330336

331337
@Override
332338
public MessageID messageHistory_getNextMessageID() throws FBaseStorageConnectorException {
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package tasks;
2+
3+
import java.util.Map;
4+
import java.util.Set;
5+
6+
import org.apache.log4j.Logger;
7+
import org.javatuples.Pair;
8+
9+
import control.FBase;
10+
import exceptions.FBaseCommunicationException;
11+
import exceptions.FBaseException;
12+
import exceptions.FBaseNamingServiceException;
13+
import exceptions.FBaseStorageConnectorException;
14+
import model.JSONable;
15+
import model.config.KeygroupConfig;
16+
import model.config.NodeConfig;
17+
import model.data.KeygroupID;
18+
import model.messages.Command;
19+
import model.messages.Envelope;
20+
import model.messages.Message;
21+
import tasks.TaskManager.TaskName;
22+
23+
class AnnounceUpdateOfOwnNodeConfigurationTask extends Task<Boolean> {
24+
25+
private static Logger logger =
26+
Logger.getLogger(AnnounceUpdateOfOwnNodeConfigurationTask.class.getName());
27+
28+
public AnnounceUpdateOfOwnNodeConfigurationTask(FBase fBase) {
29+
super(TaskName.ANNOUNCE_UPDATE_OF_OWN_NODE_CONFIGURATION, fBase);
30+
}
31+
32+
@Override
33+
public Boolean executeFunctionality() {
34+
35+
try {
36+
// build node config
37+
NodeConfig nodeConfig = fBase.configuration.buildNodeConfigBasedOnData();
38+
// add other machines based on heartbeats
39+
Map<String, Pair<String, Long>> heartbeats = fBase.connector.heartbeats_listAll();
40+
for (Pair<String, Long> address : heartbeats.values()) {
41+
if (!address.getValue0().equals(fBase.configuration.getMachineIPAddress())) {
42+
nodeConfig.addMachine(address.getValue0());
43+
}
44+
}
45+
46+
// naming service
47+
try {
48+
// only updates are possible, because a node cannot create itself, only other
49+
// nodes can
50+
fBase.namingServiceSender.sendNodeConfigUpdate(nodeConfig);
51+
logger.info("Updated node configuration at the namingservice");
52+
} catch (FBaseCommunicationException e) {
53+
logger.info("Could not update node configuration at the naming service: "
54+
+ e.getMessage());
55+
}
56+
57+
// publish to all keygroups the machine is responsible for
58+
Set<KeygroupID> responsibilities =
59+
fBase.connector.keyGroupSubscriberMachines_listAll().keySet();
60+
for (KeygroupID keygroupID : responsibilities) {
61+
try {
62+
// get config
63+
KeygroupConfig config = fBase.configAccessHelper.keygroupConfig_get(keygroupID);
64+
// publish
65+
Message m = new Message();
66+
m.setCommand(Command.UPDATE_FOREIGN_NODE_CONFIG);
67+
m.setContent(JSONable.toJSON(nodeConfig));
68+
Envelope e = new Envelope(config.getKeygroupID(), m);
69+
fBase.publisher.send(e, config.getEncryptionSecret(),
70+
config.getEncryptionAlgorithm());
71+
} catch (FBaseException e) {
72+
logger.error(
73+
"Could not publish update to recipents of keygroupID " + keygroupID);
74+
}
75+
}
76+
77+
} catch (FBaseStorageConnectorException | FBaseNamingServiceException e) {
78+
logger.error("Could not announce update to own node configuration", e);
79+
return false;
80+
}
81+
82+
return true;
83+
}
84+
85+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package tasks;
2+
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
import java.util.Map;
6+
7+
import org.apache.log4j.Logger;
8+
import org.javatuples.Pair;
9+
10+
import control.FBase;
11+
import exceptions.FBaseException;
12+
import exceptions.FBaseStorageConnectorException;
13+
import model.config.KeygroupConfig;
14+
import model.data.KeygroupID;
15+
import tasks.TaskManager.TaskName;
16+
17+
class RemoveMachineFromNodeTask extends Task<Boolean> {
18+
19+
private static Logger logger = Logger.getLogger(RemoveMachineFromNodeTask.class.getName());
20+
21+
private String machineName = "";
22+
23+
public RemoveMachineFromNodeTask(String machineName, FBase fBase) {
24+
super(TaskName.REMOVE_MACHINE_FROM_NODE, fBase);
25+
this.machineName = machineName;
26+
}
27+
28+
@Override
29+
public Boolean executeFunctionality() throws FBaseException {
30+
logger.info("Removing machine " + machineName + " from node");
31+
32+
Map<KeygroupID, Pair<String, Integer>> responsibilities =
33+
fBase.connector.keyGroupSubscriberMachines_listAll();
34+
35+
List<KeygroupID> responsibleKeygroups = new ArrayList<>();
36+
boolean removalWithoutProblems = true;
37+
for (KeygroupID keygroupID : responsibilities.keySet()) {
38+
if (machineName.equals(responsibilities.get(keygroupID).getValue0())) {
39+
responsibleKeygroups.add(keygroupID);
40+
// remove from responsibility table
41+
try {
42+
fBase.connector.keyGroupSubscriberMachines_put(keygroupID, null);
43+
} catch (FBaseStorageConnectorException e) {
44+
logger.error("Could not remove responsibility of " + machineName + " for "
45+
+ keygroupID);
46+
removalWithoutProblems = false;
47+
}
48+
}
49+
}
50+
51+
// remove from heartbeats table if all responsibilites have been removed
52+
// keep inside if problem so that others can try again
53+
if (removalWithoutProblems) {
54+
fBase.connector.heartbeats_remove(machineName);
55+
}
56+
57+
// move code to separate task, that cannot executed more than once on the same machine
58+
fBase.taskmanager.runAnnounceUpdateOfOwnNodeConfigurationTask();
59+
60+
// get keygroup configurations and run updatesubscriptions
61+
for (KeygroupID keygroupID : responsibleKeygroups) {
62+
KeygroupConfig config = fBase.connector.keygroupConfig_get(keygroupID);
63+
fBase.taskmanager.runUpdateKeygroupSubscriptionsTask(config);
64+
}
65+
66+
return true;
67+
}
68+
69+
}

src/main/java/tasks/TaskManager.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
import model.data.DataRecord;
1717
import model.messages.Envelope;
1818
import tasks.background.CheckKeygroupConfigurationsOnUpdatesTask;
19+
import tasks.background.DetectMissingHeartbeats;
1920
import tasks.background.PollLatestConfigurationDataForResponsibleKeygroupsTask;
21+
import tasks.background.PutHeartbeatTask;
2022

2123
public class TaskManager {
2224

@@ -36,7 +38,9 @@ public enum TaskName {
3638
LOG, SLEEP, UPDATE_KEYGROUP_CONFIG, UPDATE_KEYGROUP_SUBSCRIPTIONS, PUT_DATA_RECORD,
3739
DELETE_DATA_RECORD, UPDATE_FOREIGN_NODE_CONFIG, B_CHECK_KEYGROUP_CONFIGURATIONS_ON_UPDATES,
3840
PROCESS_MESSAGE_WITH_UNKNOWN_ENCRYPTION, CHECK_NAMING_SERVICE_CONFIGURATION_DATA,
39-
B_POLL_LATEST_CONFIGURATION_DATA_FOR_RESPONSIBLE_KEYGROUPS
41+
B_POLL_LATEST_CONFIGURATION_DATA_FOR_RESPONSIBLE_KEYGROUPS, B_PUT_HEARTBEAT,
42+
B_DETECT_MISSING_HEARTBEATS, REMOVE_MACHINE_FROM_NODE,
43+
ANNOUNCE_UPDATE_OF_OWN_NODE_CONFIGURATION
4044
}
4145

4246
public void storeHistory() {
@@ -133,6 +137,16 @@ public Future<Boolean> runProcessMessageWithUnknownEncryptionTask(Envelope envel
133137
return future;
134138
}
135139

140+
public Future<Boolean> runRemoveMachineFromNodeTask(String machineName) {
141+
Future<Boolean> future = pool.submit(new RemoveMachineFromNodeTask(machineName, fBase));
142+
return future;
143+
}
144+
145+
public Future<Boolean> runAnnounceUpdateOfOwnNodeConfigurationTask() {
146+
Future<Boolean> future = pool.submit(new AnnounceUpdateOfOwnNodeConfigurationTask(fBase));
147+
return future;
148+
}
149+
136150
/*
137151
* ------ Background Initiators ------
138152
*/
@@ -142,12 +156,24 @@ public Future<Boolean> startBackgroundCheckKeygroupConfigurationsOnUpdatesTask(i
142156
pool.submit(new CheckKeygroupConfigurationsOnUpdatesTask(fBase, interval));
143157
return future;
144158
}
145-
159+
146160
public Future<Boolean> startBackgroundPollLatesConfigurationDataForResponsibleKeygroupsTask(
147161
int checkInterval) {
148162
Future<Boolean> future = pool.submit(
149163
new PollLatestConfigurationDataForResponsibleKeygroupsTask(fBase, checkInterval));
150164
return future;
151165
}
152166

167+
public Future<Boolean> startBackgroundPutHeartbeatTask(int pulse) {
168+
Future<Boolean> future = pool.submit(new PutHeartbeatTask(fBase, pulse));
169+
return future;
170+
}
171+
172+
public Future<Boolean> startDetectMissingHeartbeatsTask(int checkInterval,
173+
int toleratedMissingHeartbeat) {
174+
Future<Boolean> future = pool.submit(
175+
new DetectMissingHeartbeats(fBase, checkInterval, toleratedMissingHeartbeat));
176+
return future;
177+
}
178+
153179
}

0 commit comments

Comments
 (0)