Skip to content
This repository was archived by the owner on Feb 5, 2024. It is now read-only.

Commit 1cd0498

Browse files
authored
Merge pull request #22 from OpenFogStack/subscriptionManagement
Add Rest-Client and update subscription management to thesis version Some tests are missing and S3 Connector needs to be adjusted
2 parents ae1979b + 6169ba9 commit 1cd0498

File tree

61 files changed

+2869
-1695
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+2869
-1695
lines changed
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
# Implementation Subscription Management
2+
3+
## Events
4+
5+
### [1] Missing heartbeats detected
6+
7+
- [x] Background task that stores own heartbeats in the node database
8+
- [x] Background task to detect missing heartbeats
9+
- [x] Detected: run RemoveMachineFromNodeTask
10+
11+
### [2] No responsibility detected
12+
- [x] Background task to detect missing responsibilities
13+
- [x] Detected: run UpdateKeygroupSubscriptionsTask
14+
15+
### [3] Lost responsibility detected
16+
17+
- [x] Background task to detect lost responsibilities for each keygroup for which subscriptions exist
18+
- [x] Detected: run UpdateKeygroupSubscriptionsTask
19+
20+
### [4] Client updates/deletes keygroup via node
21+
22+
- [x] Add client keygroup update/delete functionality incl. naming service validation
23+
- [x] Naming service approves: run UpdateKeygroupConfigTask
24+
25+
### [5] Not interpretable message
26+
27+
- [x] Add encryption exception handling to subscriber
28+
- [x] Catched: get newest keygroup config version from naming service
29+
- [x] Run UpdateKeygroupConfigTask
30+
31+
### [6] Periodic configuration update
32+
33+
- [x] Background task that periodically polls all responsible keygroup configurations and node configurations for nodes present in the keygroups
34+
- [x] Keygroup: run UpdateKeygroupConfigTask
35+
- [x] Node: UpdateForeignNodeConfigTask
36+
37+
### [7] Recognize foreign keygroup update
38+
Needed if [4] is performed by other node
39+
40+
- [x] Add background task that checks whether version of keygroup config changed
41+
- [x] Changed: Run UpdateKeygroupSubscriptionsTask
42+
43+
### [8] Subscriber receives configuration update
44+
45+
- [x] Add configuration update handling to subscriber
46+
- [x] Run UpdateKeygroupConfigTask or UpdateForeignNodeConfigTask
47+
48+
## Tasks (not background)
49+
50+
### [D] UpdateKeygroupSubscriptionsTask
51+
52+
1. Remove all subscriptions for a given keygroup
53+
2. Create new subscriptions if all `true`:
54+
- node still apart of the keygroup
55+
- keygroup not tombstoned
56+
- no machine responsible yet/I am responsible
57+
3. Update responsibility table
58+
59+
### [B] UpdateKeygroupConfigTask
60+
Only run if version differs
61+
62+
1. Put configuration in node database
63+
2. Run UpdateKeygroupSubscriptionsTask
64+
3. Publish config to all subscribers if started by [4]
65+
66+
### [C] UpdateForeignNodeConfigTask
67+
Only run if the machines changed
68+
69+
1. Put configuration in node database
70+
2. Get all keygroups in which node participates
71+
* Run UpdateKeygroupSubscriptionsTask for keygroup
72+
73+
### [A] RemoveMachineFromNodeTask
74+
75+
1. Remove machine from responsibility table
76+
2. Remove machine from heartbeats table
77+
3. Rebuild nodeconfig and send to naming service
78+
4. Rebuild nodeconfig and publish to all subscribers for all keygroups
79+
5. UpdateKeygroupSubscriptionsTask
80+
81+
### AddMachineToNodeTask
82+
Not used here, is used by startup functionality
83+
84+
1. Put heartbeat into heartbeats
85+
2. Rebuild nodeconfig and send to naming service
86+
3. Rebuild nodeconfig and publish to all subscribers for all keygroups (must be done by another node!)

README.md

Lines changed: 11 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -9,44 +9,23 @@ For some of the tests, a running naming service is required. The naming service
99
# Current ToDos / Missing Functionality
1010

1111
## Startup
12-
- [ ] A machineName should be dynamically created on Startup
13-
- [ ] Background task that checks whether node config contains myself (might have been overwritten by removal of machine due to heartbeats)
14-
- [ ] Add background task that checks whether node config only contains nodes of machines that have heartbeats (overwritten by starting node)
12+
- [ ] A machineName should be dynamically created on Startup (clean up configs)
13+
- [x] Run AddMachineToNodeTask (ended up not being a task)
1514

16-
## General Node Database
17-
- [x] Add a connector that supports multi-machine nodes
18-
- [x] Add versions to all configurations so that a node/machine can identify updates
19-
20-
## Publish/Subscribe
21-
- [x] Add capabilities to process messages that cannot be encrypted
22-
23-
### Subscription Management
24-
- [x] Add background task that checks whether any of the keygroups I am responsible for have been updated by another machine (CheckKeygroupConfigurationsOnUpdatesTask) #11
25-
- [x] Instead of unsubscribing/subscribing, each keygroup config update should lead to a complete reset of subscriptions
26-
- [ ] Make subscription management code as defined in thesis (event detection)
27-
28-
### Heartbeats
29-
- [ ] Add background task that stores own heartbeats in the node database #11
30-
- [ ] Add background task that checks other machine's heartbeats and removes machines from a node if they did not respond to long #11 (run subscription management and clean heartbeat table/node config after started subscriptions)
15+
## Subscription Management incl. Heartbeats
16+
- See ImplementationSubscriptionManagement.md
3117

3218
## One to One Communication
33-
- [x] Rebuild asymmetric encryption so that it uses a symmetric approach for the actual data
3419
- [ ] Add one to one communication for datarecords
3520

36-
### Naming Service based Management
37-
- [x] Add keygroup configuration control methods to sender
38-
- [x] Add response processing to all methods
39-
- [x] Add node management methods (e.g. to update a node configuration when a machine is added/removed)
40-
- [x] Add a background task that periodically polls the naming service about the newest configurations
41-
4221
### Handling Missed Messages
43-
- [ ] Add message history size to machine config (node specific)
44-
- [x] Messaging data needs to be stored after each data related publish
45-
- [x] Add sender/receiver capabilities for missed messages
46-
- [x] Write and test logic that uses functionality
22+
- [ ] Add message history size to machine config (node specific), cleanup config files
4723
- [ ] (Add message history cleanup functionality (on receiver and sender side)), thesis states not implemented
4824

4925
## Controlling FBase with Clients
50-
- [ ] Enable asymmetric encryption
51-
- [ ] Identify and add missing control methods
52-
- [ ] Keygroup updates/deletions from client are forwarded to other nodes
26+
- [ ] Enable encryption and authentication
27+
- [ ] Method to instruct machine to update all configurations with naming service data
28+
- [x] Keygroup C, R, D
29+
- [x] Keygroup all update methods
30+
- [X] Client C, D
31+
- [X] DataRecord P, R, D, List

pom.xml

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,34 @@
6060
</dependency>
6161
<dependency>
6262
<groupId>org.eclipse.jetty</groupId>
63-
<artifactId>jetty-security</artifactId>
63+
<artifactId>jetty-servlet</artifactId>
6464
<version>9.4.6.v20170531</version>
6565
</dependency>
6666
<dependency>
6767
<groupId>org.eclipse.jetty</groupId>
68-
<artifactId>jetty-servlet</artifactId>
68+
<artifactId>jetty-util</artifactId>
6969
<version>9.4.6.v20170531</version>
7070
</dependency>
71+
<dependency>
72+
<groupId>org.glassfish.jersey.core</groupId>
73+
<artifactId>jersey-server</artifactId>
74+
<version>2.26</version>
75+
</dependency>
76+
<dependency>
77+
<groupId>org.glassfish.jersey.containers</groupId>
78+
<artifactId>jersey-container-servlet-core</artifactId>
79+
<version>2.26</version>
80+
</dependency>
81+
<dependency>
82+
<groupId>org.glassfish.jersey.containers</groupId>
83+
<artifactId>jersey-container-jetty-http</artifactId>
84+
<version>2.26</version>
85+
</dependency>
86+
<dependency>
87+
<groupId>org.glassfish.jersey.inject</groupId>
88+
<artifactId>jersey-hk2</artifactId>
89+
<version>2.26</version>
90+
</dependency>
7191
<!-- Client -->
7292
<dependency>
7393
<groupId>com.mashape.unirest</groupId>

src/main/java/client/Client.java

Lines changed: 50 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,13 @@
11
package client;
22

3-
import java.io.IOException;
4-
import java.util.List;
5-
63
import org.apache.log4j.Logger;
74

8-
import com.fasterxml.jackson.databind.ObjectMapper;
9-
import com.mashape.unirest.http.HttpResponse;
10-
import com.mashape.unirest.http.Unirest;
115
import com.mashape.unirest.http.exceptions.UnirestException;
126

13-
import model.JSONable;
7+
import exceptions.FBaseEncryptionException;
148
import model.data.DataIdentifier;
159
import model.data.DataRecord;
1610
import model.data.KeygroupID;
17-
import model.messages.Message;
1811

1912
/**
2013
* A client implemantation which allows the usage of the FBase rest interface
@@ -25,111 +18,59 @@
2518
public class Client {
2619

2720
private static Logger logger = Logger.getLogger(Client.class.getName());
28-
29-
public static void main(String[] args) throws UnirestException {
30-
Client c = new Client();
31-
DataRecord record = new DataRecord();
32-
DataIdentifier dataIdentifier = new DataIdentifier("smartlight", "h1", "brightness", "M-1");
33-
record.setDataIdentifier(new DataIdentifier(dataIdentifier.getKeygroupID(), "M-4"));
34-
Message m = c.runPutRecordRequest("http://localhost", 8080, record);
35-
logger.info(JSONable.toJSON(m));
36-
37-
DataRecord record2 = c.runGetRecordRequest("http://localhost", 8080, dataIdentifier);
38-
logger.info(JSONable.toJSON(record2));
39-
40-
List<String> list = c.runGetListRecordRequest("http://localhost", 8080,
41-
dataIdentifier.getKeygroupID());
42-
list.stream().forEach(i -> logger.info(i));
21+
22+
public void getScenario(String address, int port) throws UnirestException {
23+
KeygroupID keygroupID = new KeygroupID("smartlight", "h1", "brightness");
24+
DataIdentifier dataID = new DataIdentifier(keygroupID, "M-1");
25+
26+
DataRecord newRecord = new DataRecord(new DataIdentifier(keygroupID, "M-2"), null);
27+
newRecord.setValueWithoutKey("Example value");
28+
29+
RecordRequest records = new RecordRequest(address, port);
30+
31+
logger.info("Got M-1: " + records.getDataRecord(dataID));
32+
logger.info("Put M-2: " + records.putDataRecord(newRecord));
33+
logger.info("Got M-2: " + records.getDataRecord(newRecord.getDataIdentifier()));
34+
logger.info("List: " + records.listDataRecords(keygroupID));
35+
logger.info("Deleted M-2: " + records.deleteDataRecord(newRecord.getDataIdentifier()));
36+
logger.info("Got M-2: " + records.getDataRecord(newRecord.getDataIdentifier()));
37+
logger.info("List: " + records.listDataRecords(keygroupID));
4338
}
44-
45-
public Client() {
39+
40+
public static void main(String[] args) throws UnirestException, FBaseEncryptionException {
41+
String address = "localhost";
42+
int port = 8081;
43+
44+
Client c = new Client();
45+
c.getScenario(address, port);
4646
}
4747

48-
public DataRecord runGetRecordRequest(String address, int port, DataIdentifier dataIdentifier)
49-
throws UnirestException {
50-
DataRecord record = null;
51-
try {
52-
String target = address + ":" + port + "/record";
53-
logger.info("Running get request targeting " + target);
54-
HttpResponse<String> response = Unirest.get(target)
55-
.queryString("dataIdentifier", dataIdentifier).asString();
56-
if (response.getStatus() == 200) {
57-
logger.info("Status = 200");
58-
Message m = JSONable.fromJSON(response.getBody(), Message.class);
59-
// Insert decryption here if needed
60-
record = JSONable.fromJSON(m.getContent(), DataRecord.class);
61-
} else {
62-
logger.info("Status = " + response.getStatus());
63-
}
64-
} catch (UnirestException e) {
65-
e.printStackTrace();
66-
}
6748

68-
return record;
69-
}
7049

71-
@SuppressWarnings("unchecked")
72-
public List<String> runGetListRecordRequest(String address, int port, KeygroupID keygroupID) {
73-
List<String> list = null;
74-
try {
75-
String target = address + ":" + port + "/record/list";
76-
logger.info("Running get request targeting " + target);
77-
HttpResponse<String> response = Unirest.get(target)
78-
.queryString("keygroupID", keygroupID).asString();
79-
if (response.getStatus() == 200) {
80-
logger.info("Status = 200");
81-
Message m = JSONable.fromJSON(response.getBody(), Message.class);
82-
// Insert decryption here if needed
83-
ObjectMapper mapper = new ObjectMapper();
84-
list = mapper.readValue(m.getContent(), List.class);
85-
} else {
86-
logger.info("Status = " + response.getStatus());
87-
}
88-
} catch (UnirestException | IOException e) {
89-
e.printStackTrace();
90-
}
91-
return list;
92-
}
50+
// public boolean keygroupConfig_create(String address, int port, KeygroupConfig keygroupConfig)
51+
// throws UnirestException, FBaseEncryptionException {
52+
//
53+
// // prepare
54+
// String target = address + ":" + port + "/keygroupConfig";
55+
// logger.info("Running post request targeting " + target);
56+
//
57+
// // create and sign request message
58+
// Message requestM = new Message();
59+
// requestM.setContent(JSONable.toJSON(keygroupConfig));
60+
// requestM.signMessage(privateKey, algorithm);
61+
//
62+
// // send message
63+
// HttpResponse<String> response = Unirest.post(target).header("accept", "application/json")
64+
// .queryString("clientID", clientID.getID()).body(JSONable.toJSON(requestM))
65+
// .asString();
66+
//
67+
// // process response
68+
// if (response.getStatus() == 200) {
69+
// return true;
70+
// } else {
71+
// logger.error("Status = " + response.getStatus());
72+
// return false;
73+
// }
74+
// }
9375

94-
public Message runPutRecordRequest(String address, int port, DataRecord record) {
95-
Message m = null;
96-
try {
97-
String target = address + ":" + port + "/record";
98-
logger.info("Running put request targeting " + target);
99-
HttpResponse<String> response = Unirest.put(target).header("accept", "application/json")
100-
// insert decryption here if needed
101-
.queryString("keygroupID", record.getKeygroupID().getID()).body(JSONable.toJSON(record))
102-
.asString();
103-
if (response.getStatus() == 200) {
104-
m = JSONable.fromJSON(response.getBody(), Message.class);
105-
} else {
106-
logger.info("Status = " + response.getStatus());
107-
}
108-
} catch (UnirestException e) {
109-
e.printStackTrace();
110-
}
111-
return m;
112-
}
113-
114-
public Message runDeleteRecordRequest(String address, int port, DataIdentifier identifier) {
115-
Message m = null;
116-
try {
117-
String target = address + ":" + port + "/record";
118-
logger.info("Running delete request targeting " + target);
119-
HttpResponse<String> response = Unirest.delete(target)
120-
.header("accept", "application/json")
121-
.queryString("keygroupID", identifier.getKeygroupID().getID())
122-
.body(JSONable.toJSON(identifier)) // insert encryption here if needed
123-
.asString();
124-
if (response.getStatus() == 200) {
125-
m = JSONable.fromJSON(response.getBody(), Message.class);
126-
} else {
127-
logger.info("Status = " + response.getStatus());
128-
}
129-
} catch (UnirestException e) {
130-
e.printStackTrace();
131-
}
132-
return m;
133-
}
134-
13576
}

0 commit comments

Comments
 (0)