Skip to content
This repository was archived by the owner on Feb 5, 2024. It is now read-only.

Commit 199cd17

Browse files
committed
Add code for: [2] No responsibility detected
1 parent 83ffb04 commit 199cd17

File tree

5 files changed

+104
-5
lines changed

5 files changed

+104
-5
lines changed

src/main/java/control/FBase.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,9 @@ public void startup(boolean announce) throws InterruptedException, ExecutionExce
9595
taskmanager.startBackgroundPollLatesConfigurationDataForResponsibleKeygroupsTask(0);
9696
taskmanager.startBackgroundCheckKeygroupConfigurationsOnUpdatesTask(0);
9797
taskmanager.startDetectMissingHeartbeatsTask(0, 0);
98+
taskmanager.startBackgroundDetectMissingResponsibility(0);
9899

100+
Thread.sleep(50);
99101
logger.info("FBase started, all background tasks up and running.");
100102
}
101103

src/main/java/tasks/TaskManager.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import model.messages.Envelope;
1818
import tasks.background.CheckKeygroupConfigurationsOnUpdatesTask;
1919
import tasks.background.DetectMissingHeartbeats;
20+
import tasks.background.DetectMissingResponsibility;
2021
import tasks.background.PollLatestConfigurationDataForResponsibleKeygroupsTask;
2122
import tasks.background.PutHeartbeatTask;
2223

@@ -40,7 +41,7 @@ public enum TaskName {
4041
PROCESS_MESSAGE_WITH_UNKNOWN_ENCRYPTION, CHECK_NAMING_SERVICE_CONFIGURATION_DATA,
4142
B_POLL_LATEST_CONFIGURATION_DATA_FOR_RESPONSIBLE_KEYGROUPS, B_PUT_HEARTBEAT,
4243
B_DETECT_MISSING_HEARTBEATS, REMOVE_MACHINE_FROM_NODE,
43-
ANNOUNCE_UPDATE_OF_OWN_NODE_CONFIGURATION
44+
ANNOUNCE_UPDATE_OF_OWN_NODE_CONFIGURATION, B_DETECT_MISSING_RESPONSIBILITY
4445
}
4546

4647
public void storeHistory() {
@@ -141,7 +142,7 @@ public Future<Boolean> runRemoveMachineFromNodeTask(String machineName) {
141142
Future<Boolean> future = pool.submit(new RemoveMachineFromNodeTask(machineName, fBase));
142143
return future;
143144
}
144-
145+
145146
public Future<Boolean> runAnnounceUpdateOfOwnNodeConfigurationTask() {
146147
Future<Boolean> future = pool.submit(new AnnounceUpdateOfOwnNodeConfigurationTask(fBase));
147148
return future;
@@ -176,4 +177,9 @@ public Future<Boolean> startDetectMissingHeartbeatsTask(int checkInterval,
176177
return future;
177178
}
178179

180+
public Future<Boolean> startBackgroundDetectMissingResponsibility(int interval) {
181+
Future<Boolean> future = pool.submit(new DetectMissingResponsibility(fBase, interval));
182+
return future;
183+
}
184+
179185
}

src/main/java/tasks/background/DetectMissingHeartbeats.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616

1717
/**
1818
*
19-
* This background task stores heartbeats inside the node database for this machine.
19+
* This background task detects missing heartbeats of other machines and removes them from the
20+
* node if detected.
2021
*
2122
* @author jonathanhasenburg
2223
*
@@ -30,11 +31,11 @@ public class DetectMissingHeartbeats extends Task<Boolean> {
3031
}
3132

3233
/**
33-
* Creates a new {@link DetectMissingHeartbeats}. If pulse <= 0, the default is used (10
34+
* Creates a new {@link DetectMissingHeartbeats}. If checkInterval <= 0, the default is used (10
3435
* sec). If toleratedMissingHeartbeat <= 0, the default is used (100 sec).
3536
*
3637
* @param fBase
37-
* @param pulse - the interval to put heartbeats in milliseconds
38+
* @param checkInterval - the interval to put heartbeats in milliseconds
3839
* @param toleratedMissingHeartbeat - the maximum number of milliseconds tolerated for a
3940
* heartbeat to not have been updated
4041
*/
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package tasks.background;
2+
3+
import java.util.Map;
4+
5+
import org.apache.log4j.Level;
6+
import org.apache.log4j.Logger;
7+
import org.javatuples.Pair;
8+
9+
import control.FBase;
10+
import exceptions.FBaseException;
11+
import exceptions.FBaseStorageConnectorException;
12+
import model.data.KeygroupID;
13+
import tasks.Task;
14+
import tasks.TaskManager.TaskName;
15+
16+
/**
17+
*
18+
* This background task detects keygroups which do not have a responsiblity assigned
19+
*
20+
* @author jonathanhasenburg
21+
*
22+
*/
23+
public class DetectMissingResponsibility extends Task<Boolean> {
24+
25+
private static Logger logger = Logger.getLogger(DetectMissingResponsibility.class.getName());
26+
27+
static {
28+
logger.setLevel(Level.INFO);
29+
}
30+
31+
/**
32+
* Creates a new {@link DetectMissingResponsibility}. If checkInterval <= 0, the default
33+
* is used (10 sec).
34+
*
35+
* @param fBase
36+
* @param checkInterval - the interval to put heartbeats in milliseconds
37+
*/
38+
public DetectMissingResponsibility(FBase fBase, int checkInterval) {
39+
super(TaskName.B_DETECT_MISSING_RESPONSIBILITY, fBase);
40+
if (checkInterval > 0) {
41+
this.checkInterval = checkInterval;
42+
}
43+
44+
}
45+
46+
private int checkInterval = 10000;
47+
48+
@Override
49+
public Boolean executeFunctionality() {
50+
51+
while (!Thread.currentThread().isInterrupted()) {
52+
logger.info("Looking for missing responsibilities");
53+
54+
try {
55+
Map<KeygroupID, Pair<String, Integer>> responsibilities =
56+
fBase.connector.keyGroupSubscriberMachines_listAll();
57+
58+
for (KeygroupID keygroupID : responsibilities.keySet()) {
59+
if (responsibilities.get(keygroupID).getValue0() == null) {
60+
try {
61+
fBase.taskmanager.runUpdateKeygroupSubscriptionsTask(
62+
fBase.connector.keygroupConfig_get(keygroupID));
63+
} catch (FBaseException e) {
64+
logger.error(
65+
"Could not update responsibilites for keygroup " + keygroupID);
66+
}
67+
}
68+
}
69+
70+
} catch (FBaseStorageConnectorException e1) {
71+
logger.error("Could not read heartbeats, going back to sleep", e1);
72+
}
73+
74+
try {
75+
Thread.sleep(checkInterval);
76+
} catch (InterruptedException e) {
77+
Thread.currentThread().interrupt();
78+
logger.error("Background task has been interrupted");
79+
}
80+
}
81+
logger.info("Stopping task, because Thread was interrupted");
82+
return true;
83+
}
84+
85+
}

src/test/java/scenario/SubscriptionUpdateProcessScenario.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,5 +49,10 @@ public void tearDown() throws Exception {
4949
public void testMissingHeartbeatDetected() {
5050
fail("Not yet implemented");
5151
}
52+
53+
@Test
54+
public void testNoResponsibilityDetected() {
55+
fail("Not yet implemented");
56+
}
5257

5358
}

0 commit comments

Comments
 (0)