Skip to content
This repository was archived by the owner on Feb 5, 2024. It is now read-only.

Commit 6169ba9

Browse files
committed
Add code for: [3] Lost responsibility detected
1 parent eac8786 commit 6169ba9

File tree

6 files changed

+116
-6
lines changed

6 files changed

+116
-6
lines changed

ImplementationSubscriptionManagement.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@
1414

1515
### [3] Lost responsibility detected
1616

17-
- [ ] Background task to detect lost responsibilities for each keygroup for which subscriptions exist
18-
- [ ] Detected: run UpdateKeygroupSubscriptionsTask
17+
- [x] Background task to detect lost responsibilities for each keygroup for which subscriptions exist
18+
- [x] Detected: run UpdateKeygroupSubscriptionsTask
1919

2020
### [4] Client updates/deletes keygroup via node
2121

22-
- [ ] Add client keygroup update/delete functionality incl. naming service validation
23-
- [ ] Naming service approves: run UpdateKeygroupConfigTask
22+
- [x] Add client keygroup update/delete functionality incl. naming service validation
23+
- [x] Naming service approves: run UpdateKeygroupConfigTask
2424

2525
### [5] Not interpretable message
2626

src/main/java/communication/SubscriptionRegistry.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
import java.util.ArrayList;
44
import java.util.HashMap;
5+
import java.util.HashSet;
56
import java.util.Map;
7+
import java.util.Set;
68

79
import org.apache.log4j.Logger;
810

@@ -87,6 +89,10 @@ public synchronized void unsubscribeFromKeygroup(KeygroupID keygroupID) {
8789
s.stopReception();
8890
}
8991
}
92+
93+
public synchronized Set<KeygroupID> getSubscribedKeygroups() {
94+
return new HashSet<>(activeSubscriptions.keySet());
95+
}
9096

9197
/**
9298
* Subscriptions for the given keygroupID exist, a value for the given key is present.

src/main/java/control/FBase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ public void startup(boolean announce) throws InterruptedException, ExecutionExce
9696
taskmanager.startBackgroundCheckKeygroupConfigurationsOnUpdatesTask(0);
9797
taskmanager.startDetectMissingHeartbeatsTask(0, 0);
9898
taskmanager.startBackgroundDetectMissingResponsibility(0);
99+
taskmanager.startBackgroundDetectLostResponsibility(0);
99100

100101
Thread.sleep(50);
101102
logger.info("FBase started, all background tasks up and running.");

src/main/java/tasks/TaskManager.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import model.data.DataRecord;
1717
import model.messages.Envelope;
1818
import tasks.background.CheckKeygroupConfigurationsOnUpdatesTask;
19+
import tasks.background.DetectLostResponsibility;
1920
import tasks.background.DetectMissingHeartbeats;
2021
import tasks.background.DetectMissingResponsibility;
2122
import tasks.background.PollLatestConfigurationDataForResponsibleKeygroupsTask;
@@ -41,7 +42,8 @@ public enum TaskName {
4142
PROCESS_MESSAGE_WITH_UNKNOWN_ENCRYPTION, CHECK_NAMING_SERVICE_CONFIGURATION_DATA,
4243
B_POLL_LATEST_CONFIGURATION_DATA_FOR_RESPONSIBLE_KEYGROUPS, B_PUT_HEARTBEAT,
4344
B_DETECT_MISSING_HEARTBEATS, REMOVE_MACHINE_FROM_NODE,
44-
ANNOUNCE_UPDATE_OF_OWN_NODE_CONFIGURATION, B_DETECT_MISSING_RESPONSIBILITY
45+
ANNOUNCE_UPDATE_OF_OWN_NODE_CONFIGURATION, B_DETECT_MISSING_RESPONSIBILITY,
46+
B_DETECT_LOST_RESPONSIBILITY
4547
}
4648

4749
public void storeHistory() {
@@ -182,4 +184,9 @@ public Future<Boolean> startBackgroundDetectMissingResponsibility(int interval)
182184
return future;
183185
}
184186

187+
public Future<Boolean> startBackgroundDetectLostResponsibility(int interval) {
188+
Future<Boolean> future = pool.submit(new DetectLostResponsibility(fBase, interval));
189+
return future;
190+
}
191+
185192
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package tasks.background;
2+
3+
import java.util.Map;
4+
import java.util.Set;
5+
6+
import org.apache.log4j.Level;
7+
import org.apache.log4j.Logger;
8+
import org.javatuples.Pair;
9+
10+
import control.FBase;
11+
import exceptions.FBaseException;
12+
import exceptions.FBaseStorageConnectorException;
13+
import model.config.KeygroupConfig;
14+
import model.data.KeygroupID;
15+
import tasks.Task;
16+
import tasks.TaskManager.TaskName;
17+
18+
/**
19+
*
20+
* This background task detects if a keygroup a machine created subscriptions for is not in
21+
* the machine's responsibility anymore
22+
*
23+
* @author jonathanhasenburg
24+
*
25+
*/
26+
public class DetectLostResponsibility extends Task<Boolean> {
27+
28+
private static Logger logger = Logger.getLogger(DetectLostResponsibility.class.getName());
29+
30+
static {
31+
logger.setLevel(Level.INFO);
32+
}
33+
34+
/**
35+
* Creates a new {@link DetectLostResponsibility}. If checkInterval <= 0, the default is
36+
* used (10 sec).
37+
*
38+
* @param fBase
39+
* @param checkInterval - the interval to put heartbeats in milliseconds
40+
*/
41+
public DetectLostResponsibility(FBase fBase, int checkInterval) {
42+
super(TaskName.B_DETECT_LOST_RESPONSIBILITY, fBase);
43+
if (checkInterval > 0) {
44+
this.checkInterval = checkInterval;
45+
}
46+
47+
}
48+
49+
private int checkInterval = 10000;
50+
51+
@Override
52+
public Boolean executeFunctionality() {
53+
54+
while (!Thread.currentThread().isInterrupted()) {
55+
logger.info("Looking for lost responsibilities");
56+
57+
try {
58+
Set<KeygroupID> subscribedKeygroups =
59+
fBase.subscriptionRegistry.getSubscribedKeygroups();
60+
61+
Map<KeygroupID, Pair<String, Integer>> responsibilities =
62+
fBase.connector.keyGroupSubscriberMachines_listAll();
63+
64+
for (KeygroupID keygroupID : subscribedKeygroups) {
65+
try {
66+
Pair<String, Integer> pair = responsibilities.get(keygroupID);
67+
if (pair == null
68+
|| !pair.getValue0().equals(fBase.configuration.getMachineName())) {
69+
logger.info("Lost responsibility for keygroup " + keygroupID);
70+
KeygroupConfig config = fBase.connector.keygroupConfig_get(keygroupID);
71+
fBase.taskmanager.runUpdateKeygroupSubscriptionsTask(config);
72+
}
73+
} catch (FBaseException e) {
74+
logger.error("Could not run the subscription update process for keygroup "
75+
+ keygroupID, e);
76+
}
77+
}
78+
79+
} catch (FBaseStorageConnectorException e1) {
80+
logger.error(
81+
"Could not read data to detect lost responsibilities, going back to sleep",
82+
e1);
83+
}
84+
85+
try {
86+
Thread.sleep(checkInterval);
87+
} catch (InterruptedException e) {
88+
Thread.currentThread().interrupt();
89+
logger.error("Background task has been interrupted");
90+
}
91+
}
92+
logger.info("Stopping task, because Thread was interrupted");
93+
return true;
94+
}
95+
96+
}

src/main/java/tasks/background/DetectMissingResponsibility.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public Boolean executeFunctionality() {
6868
}
6969

7070
} catch (FBaseStorageConnectorException e1) {
71-
logger.error("Could not read heartbeats, going back to sleep", e1);
71+
logger.error("Could not read responsibilities, going back to sleep", e1);
7272
}
7373

7474
try {

0 commit comments

Comments
 (0)