Skip to content
This repository was archived by the owner on Feb 5, 2024. It is now read-only.

Commit e8a208e

Browse files
committed
Fix s3 db connector
1 parent 879d156 commit e8a208e

File tree

7 files changed

+71
-25
lines changed

7 files changed

+71
-25
lines changed

ImplementationSubscriptionManagement.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
# Implementation Subscription Management
22

3+
Numbers are not as written in thesis
4+
35
## Events
46

57
### [1] Missing heartbeats detected

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ For some of the tests, a running naming service is required. The naming service
2121

2222
### Handling Missed Messages
2323
- [x] Add message history size to machine config (node specific), cleanup config files
24-
- [ ] (Add message history cleanup functionality (on receiver and sender side)), thesis states not implemented
24+
- [ ] (Add message history cleanup functionality (on receiver and sender side), not implemented in thesis)
2525

2626
## Controlling FBase with Clients
2727
- [ ] Enable encryption and authentication

src/main/java/control/FBase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ public class FBase {
5252

5353
public FBase(String configName) {
5454
configuration = new Configuration(configName);
55-
publisher = new Publisher("tcp://0.0.0.0", configuration.getPublisherPort());
5655
}
5756

5857
public void startup(boolean announce, boolean backgroundTasks) throws InterruptedException,
@@ -71,6 +70,8 @@ public void startup(boolean announce, boolean backgroundTasks) throws Interrupte
7170
server = new WebServer(this);
7271
server.startServer();
7372
}
73+
publisher = new Publisher("tcp://0.0.0.0", configuration.getPublisherPort());
74+
7475
namingServiceSender = new NamingServiceSender(configuration.getNamingServiceAddress(),
7576
configuration.getNamingServicePort(), this);
7677

src/main/java/de/hasenburg/fbase/rest/jersey/NodesResource.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,6 @@
2222
* operations.
2323
*
2424
*
25-
* TODO C <br>
26-
* Do not return the whole config (e.g. keys)
27-
*
2825
* @author jonathanhasenburg
2926
*
3027
*/

src/main/java/storageconnector/AbstractDBConnector.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import java.util.List;
77
import java.util.Map;
8+
import java.util.Random;
89
import java.util.Set;
910

1011
import org.javatuples.Pair;
@@ -416,5 +417,18 @@ public boolean keygroup_delete(String app, String tenant, String group)
416417
throws FBaseStorageConnectorException {
417418
return keygroup_delete(new KeygroupID(app, tenant, group));
418419
}
420+
421+
protected String generateNameString(int size) {
422+
String possibleCharacterString = "abcdefghijklmnopqrstuvwxyz";
423+
char[] chars = possibleCharacterString.toCharArray();
424+
StringBuilder builder = new StringBuilder();
425+
Random random = new Random();
426+
for (int i = 0; i < size; i++) {
427+
char c = chars[random.nextInt(chars.length)];
428+
builder.append(c);
429+
}
430+
String output = builder.toString();
431+
return output;
432+
}
419433

420434
}

src/main/java/storageconnector/S3DBConnector.java

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public S3DBConnector(NodeID nodeID, String bucketName) {
7373
this(nodeID);
7474
this.bucketPrefix = bucketName;
7575
}
76-
76+
7777
/**
7878
* Mainly used for tests.
7979
*
@@ -86,7 +86,7 @@ public void setNodeIDAndMachineName(NodeID nodeID, String machineName) {
8686
}
8787

8888
@Override
89-
public void dbConnection_initiate() throws FBaseStorageConnectorException {
89+
public String dbConnection_initiate() throws FBaseStorageConnectorException {
9090
try {
9191
s3 = AmazonS3ClientBuilder.defaultClient();
9292
} catch (SdkClientException e) {
@@ -105,9 +105,19 @@ public void dbConnection_initiate() throws FBaseStorageConnectorException {
105105
}
106106
}
107107
}
108-
// TODO S3: set this.machinename to generated name
108+
109+
// read all machine names
110+
Map<String, Pair<String, Long>> heartbeats_listAll = heartbeats_listAll();
111+
112+
// generate names until we found one that is not used, yet.
113+
while (true) {
114+
String name = generateNameString(5);
115+
if (!heartbeats_listAll.containsKey(name)) {
116+
this.machineName = name;
117+
return this.machineName;
118+
}
119+
}
109120

110-
return this.machineName;
111121
}
112122

113123
public void deleteBuckets() throws FBaseStorageConnectorException {
@@ -544,19 +554,20 @@ private String getHeartbeatsPath(String machine) {
544554
}
545555

546556
@Override
547-
public void heartbeats_update(String machine) throws FBaseStorageConnectorException {
557+
public void heartbeats_update(String machine, String address)
558+
throws FBaseStorageConnectorException {
548559
try {
549560
s3.putObject(getHeartbeatBucketName(), getHeartbeatsPath(machine),
550-
Long.toString(System.currentTimeMillis()));
561+
Long.toString(System.currentTimeMillis()) + "\n" + address);
551562
} catch (AmazonServiceException e) {
552563
throw new FBaseStorageConnectorException(e);
553564
}
554565
}
555566

556567
@Override
557-
public Map<String, Long> heartbeats_listAll() throws FBaseStorageConnectorException {
568+
public Map<String, Pair<String, Long>> heartbeats_listAll() throws FBaseStorageConnectorException {
558569
try {
559-
Map<String, Long> heartbeats = new HashMap<>();
570+
Map<String, Pair<String, Long>> heartbeats = new HashMap<>();
560571
ObjectListing ol = s3.listObjects(getHeartbeatBucketName());
561572
while (true) {
562573
List<S3ObjectSummary> objects = ol.getObjectSummaries();
@@ -565,7 +576,8 @@ public Map<String, Long> heartbeats_listAll() throws FBaseStorageConnectorExcept
565576
.getObject(getHeartbeatBucketName(), os.getKey()).getObjectContent()));
566577
try {
567578
Long time = Long.parseLong(reader.readLine());
568-
heartbeats.put(os.getKey(), time);
579+
String address = reader.readLine();
580+
heartbeats.put(os.getKey(), new Pair<String, Long>(address, time));
569581
} catch (NumberFormatException | IOException e) {
570582
logger.error("Cannot parse time from " + os.getKey(), e);
571583
}
@@ -582,6 +594,16 @@ public Map<String, Long> heartbeats_listAll() throws FBaseStorageConnectorExcept
582594
throw new FBaseStorageConnectorException(e);
583595
}
584596
}
597+
598+
@Override
599+
public boolean heartbeats_remove(String machine) throws FBaseStorageConnectorException {
600+
try {
601+
s3.deleteObject(getHeartbeatBucketName(), getHeartbeatsPath(machine));
602+
return true;
603+
} catch (AmazonServiceException e) {
604+
throw new FBaseStorageConnectorException(e);
605+
}
606+
}
585607

586608
private String getMessageBucketName() {
587609
return bucketPrefix + suffixMap.get(Suffix.MESSAGE_HISTORY);

src/test/java/storageconnector/S3DBConnectorTest.java

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ public static void tearDownAfterClass() throws Exception {
5050

5151
@Before
5252
public void setUp() throws Exception {
53-
connector = new S3DBConnector(new NodeID("N1"),
54-
"de.hasenburg.fbase.s3dbconnector-testbucket");
55-
connector.dbConnection_initiate();
53+
connector =
54+
new S3DBConnector(new NodeID("N1"), "de.hasenburg.fbase.s3dbconnector-testbucket");
55+
logger.debug("Machine name: " + connector.dbConnection_initiate());
5656
keygroupID1 = new KeygroupID("smartlight", "h1", "lightning");
5757
keygroupID2 = new KeygroupID("smartlight", "h1", "brightness");
5858
}
@@ -235,26 +235,36 @@ public void testHeartbeats() throws FBaseStorageConnectorException, InterruptedE
235235

236236
// store data
237237
Long time = System.currentTimeMillis();
238-
connector.heartbeats_update("M1");
239-
connector.heartbeats_update("M2");
238+
connector.heartbeats_update("M1", "addressM1");
239+
connector.heartbeats_update("M2", "addressM2");
240240

241241
// check map
242-
Map<String, Long> heartbeats = connector.heartbeats_listAll();
242+
Map<String, Pair<String, Long>> heartbeats = connector.heartbeats_listAll();
243243
assertEquals(2, heartbeats.keySet().size());
244-
assertTrue(heartbeats.get("M1") >= time && heartbeats.get("M1") < (time + 1000));
245-
assertTrue(heartbeats.get("M2") >= time && heartbeats.get("M2") < (time + 1000));
244+
assertTrue(heartbeats.get("M1").getValue1() >= time
245+
&& heartbeats.get("M1").getValue1() < (time + 1000));
246+
assertTrue(heartbeats.get("M2").getValue1() >= time
247+
&& heartbeats.get("M2").getValue1() < (time + 1000));
248+
249+
assertEquals("addressM1", heartbeats.get("M1").getValue0());
250+
assertEquals("addressM2", heartbeats.get("M2").getValue0());
246251

247252
Thread.sleep(1000);
248-
connector.heartbeats_update("M1");
253+
connector.heartbeats_update("M1", "addressMX");
249254
heartbeats = connector.heartbeats_listAll();
250-
assertTrue(heartbeats.get("M1") >= time + 1000 && heartbeats.get("M1") < (time + 2000));
255+
assertTrue(heartbeats.get("M1").getValue1() >= time + 1000
256+
&& heartbeats.get("M1").getValue1() < (time + 2000));
257+
258+
assertEquals("addressMX", heartbeats.get("M1").getValue0());
251259

252260
logger.debug("Finished testHeartbeats.");
253261
}
254262

255263
@Test
256264
public void testMessageHistory() throws InterruptedException, FBaseException {
257-
logger.debug("-------Starting testMessageHistory-------");
265+
logger.debug("-------Starting testMessageHistory-------");
266+
connector.setNodeIDAndMachineName(new NodeID("N1"), "M1");
267+
258268
MessageID mID1 = new MessageID();
259269
mID1.setMessageIDString("N1/M1/1");
260270
MessageID mID2 = new MessageID();

0 commit comments

Comments
 (0)