Skip to content

Commit

Permalink
Use the key/value library provided by cava
Browse files Browse the repository at this point in the history
  • Loading branch information
atoulme committed Jun 25, 2018
1 parent 0083603 commit a408859
Show file tree
Hide file tree
Showing 18 changed files with 152 additions and 216 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ repositories {

dependencies {
// cava
compile 'net.consensys.cava:cava:0.2.0-CB2B7C-snapshot'
compile 'net.consensys.cava:cava:0.2.0-BB058A-snapshot'

// vertx
compile 'io.vertx:vertx-core:3.5.2'
Expand Down
21 changes: 14 additions & 7 deletions src/main/java/net/consensys/orion/api/storage/Storage.java
Original file line number Diff line number Diff line change
@@ -1,25 +1,32 @@
package net.consensys.orion.api.storage;

import net.consensys.cava.concurrent.AsyncResult;

import java.util.Optional;

public interface Storage<T> {

/**
* Stores data in the store.
*
* @param data The data to store.
* @return the base64 encoded key, as an UTF-8 String
*/
String put(T data);
AsyncResult<String> put(T data);

/**
* @param key should be base64 encoded UTF-8 string
* @return The retrieved data.
* Generates digest for data without storing it.
*
* @param data the data to generate a digest for
* @return the digest of the data
*/
Optional<T> get(String key);
String generateDigest(T data);

/**
* Remove stored data.
* Gets data from the store.
*
* @param key The base64 encoded key.
* @param key should be base64 encoded UTF-8 string
* @return The retrieved data.
*/
void remove(String key);
AsyncResult<Optional<T>> get(String key);
}
15 changes: 8 additions & 7 deletions src/main/java/net/consensys/orion/api/storage/StorageEngine.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package net.consensys.orion.api.storage;

import java.util.Optional;

public interface StorageEngine<T> {
import net.consensys.cava.concurrent.AsyncCompletion;
import net.consensys.cava.concurrent.AsyncResult;

void put(String key, T data);
import java.io.Closeable;
import java.util.Optional;

Optional<T> get(String key);
public interface StorageEngine<T> extends Closeable {

void remove(String key);
AsyncCompletion put(String key, T data);

boolean isOpen();
AsyncResult<Optional<T>> get(String key);

@Override
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ public void handle(RoutingContext routingContext) {
EncryptedPayload pushRequest =
Serializer.deserialize(HttpContentType.CBOR, SodiumEncryptedPayload.class, routingContext.getBody().getBytes());

// we receive a EncryptedPayload and
String digest = storage.put(pushRequest);
log.debug("stored payload. resulting digest: {}", digest);
storage.put(pushRequest).thenAccept((digest) -> {
log.debug("stored payload. resulting digest: {}", digest);
routingContext.response().end(digest);
}).exceptionally(e -> routingContext.fail(e));

// return the digest (key)
routingContext.response().end(digest);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

import java.security.PublicKey;
import java.util.Collections;
import java.util.Optional;

import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
Expand Down Expand Up @@ -53,33 +52,35 @@ public void handle(RoutingContext routingContext) {
if (to == null) {
to = enclave.nodeKeys()[0];
}
PublicKey recipient = to;

Optional<EncryptedPayload> encryptedPayload = storage.get(key);
if (!encryptedPayload.isPresent()) {
log.info("unable to find payload with key {}", key);
routingContext.fail(404);
return;
}
storage.get(key).thenAccept(encryptedPayload -> {
if (!encryptedPayload.isPresent()) {
log.info("unable to find payload with key {}", key);
routingContext.fail(404);
return;
}

byte[] decryptedPayload;
try {
decryptedPayload = enclave.decrypt(encryptedPayload.get(), to);
} catch (EnclaveException e) {
byte[] decryptedPayload;
try {
decryptedPayload = enclave.decrypt(encryptedPayload.get(), recipient);
} catch (EnclaveException e) {

log.info("unable to decrypt payload with key {}", key);
routingContext.fail(404);
return;
}
log.info("unable to decrypt payload with key {}", key);
routingContext.fail(404);
return;
}

// configureRoutes a ReceiveResponse
Buffer toReturn;
if (contentType == JSON) {
toReturn = Buffer
.buffer(Serializer.serialize(JSON, Collections.singletonMap("payload", Base64.encode(decryptedPayload))));
} else {
toReturn = Buffer.buffer(decryptedPayload);
}
// configureRoutes a ReceiveResponse
Buffer toReturn;
if (contentType == JSON) {
toReturn = Buffer
.buffer(Serializer.serialize(JSON, Collections.singletonMap("payload", Base64.encode(decryptedPayload))));
} else {
toReturn = Buffer.buffer(decryptedPayload);
}

routingContext.response().end(toReturn);
routingContext.response().end(toReturn);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ public void handle(RoutingContext routingContext) {
}

// storing payload
log.debug("storing payload");
final String digest = storage.put(encryptedPayload);
log.debug("Generate payload digest");
final String digest = storage.generateDigest(encryptedPayload);

// propagate payload
log.debug("propagating payload");
Expand Down Expand Up @@ -136,28 +136,33 @@ public void handle(RoutingContext routingContext) {

CompletableFuture.allOf(cfs).whenComplete((all, ex) -> {
if (ex != null) {
log.warn("propagating the payload failed, removing stored encrypted payload");
storage.remove(digest);

Throwable cause = ex.getCause();
if (cause instanceof OrionException) {
routingContext.fail(cause);
} else {
routingContext.fail(new OrionException(OrionErrorCode.NODE_PROPAGATING_TO_ALL_PEERS, ex));
}
handleFailure(routingContext, ex);
return;
}
storage.put(encryptedPayload).thenAccept((result) -> {

final Buffer responseData;
if (contentType == JSON) {
responseData = Buffer.buffer(Serializer.serialize(JSON, Collections.singletonMap("key", digest)));
} else {
responseData = Buffer.buffer(digest);
}
routingContext.response().end(responseData);
final Buffer responseData;
if (contentType == JSON) {
responseData = Buffer.buffer(Serializer.serialize(JSON, Collections.singletonMap("key", digest)));
} else {
responseData = Buffer.buffer(digest);
}
routingContext.response().end(responseData);
}).exceptionally(e -> handleFailure(routingContext, e));
});
}

private void handleFailure(RoutingContext routingContext, Throwable ex) {
log.warn("propagating the payload failed");

Throwable cause = ex.getCause();
if (cause instanceof OrionException) {
routingContext.fail(cause);
} else {
routingContext.fail(new OrionException(OrionErrorCode.NODE_PROPAGATING_TO_ALL_PEERS, ex));
}
}

private SendRequest binaryRequest(RoutingContext routingContext) {
String from = routingContext.request().getHeader("c11n-from");
String[] to = routingContext.request().getHeader("c11n-to").split(",");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package net.consensys.orion.impl.storage;

import net.consensys.cava.concurrent.AsyncResult;
import net.consensys.orion.api.enclave.EncryptedPayload;
import net.consensys.orion.api.storage.Storage;
import net.consensys.orion.api.storage.StorageEngine;
Expand All @@ -19,20 +20,19 @@ public EncryptedPayloadStorage(StorageEngine<EncryptedPayload> storageEngine, St
}

@Override
public String put(EncryptedPayload data) {
// configureRoutes key
String key = Base64.encode(keyBuilder.build(data.cipherText()));
storageEngine.put(key, data);
return key;
public AsyncResult<String> put(EncryptedPayload data) {
String key = generateDigest(data);
return storageEngine.put(key, data).thenSupply(() -> key);
}

@Override
public Optional<EncryptedPayload> get(String key) {
return storageEngine.get(key);
public String generateDigest(EncryptedPayload data) {
return Base64.encode(keyBuilder.build(data.cipherText()));
}


@Override
public void remove(String key) {
storageEngine.remove(key);
public AsyncResult<Optional<EncryptedPayload>> get(String key) {
return storageEngine.get(key);
}
}
Original file line number Diff line number Diff line change
@@ -1,63 +1,47 @@
package net.consensys.orion.impl.storage.file;

import static java.nio.charset.StandardCharsets.UTF_8;
import static net.consensys.orion.impl.http.server.HttpContentType.CBOR;
import static org.mapdb.Serializer.BYTE_ARRAY;

import net.consensys.cava.bytes.Bytes;
import net.consensys.cava.concurrent.AsyncCompletion;
import net.consensys.cava.concurrent.AsyncResult;
import net.consensys.cava.kv.MapDBKeyValueStore;
import net.consensys.orion.api.storage.StorageEngine;
import net.consensys.orion.impl.http.server.HttpContentType;
import net.consensys.orion.impl.utils.Serializer;

import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.Optional;

import org.mapdb.DB;
import org.mapdb.DBMaker;
import org.mapdb.HTreeMap;

public class MapDbStorage<T> implements StorageEngine<T> {

private final Class<? extends T> typeParameterClass;
private final DB db;
private final HTreeMap<byte[], byte[]> storageData;
private MapDBKeyValueStore store;

public MapDbStorage(Class<? extends T> typeParameterClass, Path dbDir) {
this.typeParameterClass = typeParameterClass;
Path dbFile = dbDir.resolve("mapdb");
db = DBMaker.fileDB(dbFile.toFile()).transactionEnable().make();
storageData = db.hashMap("storageData", BYTE_ARRAY, BYTE_ARRAY).createOrOpen();
store = new MapDBKeyValueStore(dbDir.resolve("mapdb"));
}

@Override
public void put(String key, T data) {
// store data
storageData.put(key.getBytes(StandardCharsets.UTF_8), Serializer.serialize(CBOR, data));
db.commit();
}

@Override
public Optional<T> get(String key) {
byte[] bytes = storageData.get(key.getBytes(StandardCharsets.UTF_8));
if (bytes == null) {
return Optional.empty();
}
public AsyncCompletion put(String key, T data) {
return store
.putAsync(Bytes.wrap(key.getBytes(StandardCharsets.UTF_8)), Bytes.wrap(Serializer.serialize(CBOR, data)));

return Optional.of(Serializer.deserialize(CBOR, typeParameterClass, bytes));
}

@Override
public void remove(String key) {
if (storageData.remove(key.getBytes(StandardCharsets.UTF_8)) != null) {
db.commit();
}
}

@Override
public boolean isOpen() {
return !db.isClosed();
public AsyncResult<Optional<T>> get(String key) {
return store.getAsync(Bytes.wrap(key.getBytes(UTF_8))).thenApply(
optionalBytes -> optionalBytes
.map(bytes -> Serializer.deserialize(HttpContentType.CBOR, typeParameterClass, bytes.toArrayUnsafe())));
}

@Override
public void close() {
db.close();
store.close();
}
}
Loading

0 comments on commit a408859

Please sign in to comment.