Skip to content

Commit

Permalink
Add support for private log subscriptions (Pub-Sub API) (#858)
Browse files Browse the repository at this point in the history
* Created priv_subscribe and priv_unsubscribe methods

Signed-off-by: Lucas Saldanha <lucas.saldanha@consensys.net>
  • Loading branch information
lucassaldanha authored May 12, 2020
1 parent f2244ee commit 06ca344
Show file tree
Hide file tree
Showing 29 changed files with 1,159 additions and 53 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ What this means for you:
permissions on the directory allow other users and groups to r/w. Ideally this should be set to
`besu:besu` and `orion:orion` as the owners.

## 1.4.6

### Additions and Improvements

- Implemented WebSocket logs subscription for private contracts (`priv_subscribe`/`priv_unsubscribe`) [#762]

## 1.4.5

### Additions and Improvements
Expand Down
62 changes: 57 additions & 5 deletions besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketConfiguration;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketRequestHandler;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.WebSocketService;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.methods.PrivateWebSocketMethodsFactory;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.methods.WebSocketMethodsFactory;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.SubscriptionManager;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.blockheaders.NewBlockHeadersSubscriptionService;
Expand All @@ -50,6 +51,7 @@
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.pending.PendingTransactionSubscriptionService;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.syncing.SyncingSubscriptionService;
import org.hyperledger.besu.ethereum.api.query.BlockchainQueries;
import org.hyperledger.besu.ethereum.api.query.PrivacyQueries;
import org.hyperledger.besu.ethereum.blockcreation.MiningCoordinator;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.MiningParameters;
Expand Down Expand Up @@ -84,6 +86,7 @@
import org.hyperledger.besu.ethereum.permissioning.node.PeerPermissionsAdapter;
import org.hyperledger.besu.ethereum.stratum.StratumServer;
import org.hyperledger.besu.ethereum.transaction.TransactionSimulator;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
import org.hyperledger.besu.metrics.ObservableMetricsSystem;
import org.hyperledger.besu.metrics.prometheus.MetricsConfiguration;
import org.hyperledger.besu.metrics.prometheus.MetricsService;
Expand Down Expand Up @@ -523,7 +526,11 @@ public Runner build() {
final SubscriptionManager subscriptionManager =
createSubscriptionManager(vertx, transactionPool);

createLogsSubscriptionService(context.getBlockchain(), subscriptionManager);
createLogsSubscriptionService(
context.getBlockchain(),
context.getWorldStateArchive(),
subscriptionManager,
privacyParameters);

createNewBlockHeadersSubscriptionService(
context.getBlockchain(), blockchainQueries, subscriptionManager);
Expand All @@ -533,7 +540,14 @@ public Runner build() {
webSocketService =
Optional.of(
createWebsocketService(
vertx, webSocketConfiguration, subscriptionManager, webSocketsJsonRpcMethods));
vertx,
webSocketConfiguration,
subscriptionManager,
webSocketsJsonRpcMethods,
privacyParameters,
protocolSchedule,
blockchainQueries,
transactionPool));
}

Optional<MetricsService> metricsService = Optional.empty();
Expand Down Expand Up @@ -698,11 +712,31 @@ private SubscriptionManager createSubscriptionManager(
}

private void createLogsSubscriptionService(
final Blockchain blockchain, final SubscriptionManager subscriptionManager) {
final Blockchain blockchain,
final WorldStateArchive worldStateArchive,
final SubscriptionManager subscriptionManager,
final PrivacyParameters privacyParameters) {

Optional<PrivacyQueries> privacyQueries = Optional.empty();
if (privacyParameters.isEnabled()) {
final BlockchainQueries blockchainQueries =
new BlockchainQueries(blockchain, worldStateArchive);
privacyQueries =
Optional.of(
new PrivacyQueries(
blockchainQueries, privacyParameters.getPrivateWorldStateReader()));
}

final LogsSubscriptionService logsSubscriptionService =
new LogsSubscriptionService(subscriptionManager);
new LogsSubscriptionService(subscriptionManager, privacyQueries);

// monitoring public logs
blockchain.observeLogs(logsSubscriptionService);

// monitoring private logs
if (privacyParameters.isEnabled()) {
blockchain.observeBlockAdded(logsSubscriptionService::checkPrivateLogs);
}
}

private void createSyncingSubscriptionService(
Expand All @@ -724,9 +758,27 @@ private WebSocketService createWebsocketService(
final Vertx vertx,
final WebSocketConfiguration configuration,
final SubscriptionManager subscriptionManager,
final Map<String, JsonRpcMethod> jsonRpcMethods) {
final Map<String, JsonRpcMethod> jsonRpcMethods,
final PrivacyParameters privacyParameters,
final ProtocolSchedule<?> protocolSchedule,
final BlockchainQueries blockchainQueries,
final TransactionPool transactionPool) {

final WebSocketMethodsFactory websocketMethodsFactory =
new WebSocketMethodsFactory(subscriptionManager, jsonRpcMethods);

if (privacyParameters.isEnabled()) {
final PrivateWebSocketMethodsFactory privateWebSocketMethodsFactory =
new PrivateWebSocketMethodsFactory(
privacyParameters,
subscriptionManager,
protocolSchedule,
blockchainQueries,
transactionPool);

privateWebSocketMethodsFactory.methods().forEach(websocketMethodsFactory::addMethods);
}

final WebSocketRequestHandler websocketRequestHandler =
new WebSocketRequestHandler(vertx, websocketMethodsFactory.methods());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public enum RpcMethod {
PRIV_UNINSTALL_FILTER("priv_uninstallFilter"),
PRIV_GET_FILTER_CHANGES("priv_getFilterChanges"),
PRIV_GET_FILTER_LOGS("priv_getFilterLogs"),
PRIV_SUBSCRIBE("priv_subscribe"),
PRIV_UNSUBSCRIBE("priv_unsubscribe"),
PRIVX_FIND_PRIVACY_GROUP("privx_findOnChainPrivacyGroup"),
EEA_SEND_RAW_TRANSACTION("eea_sendRawTransaction"),
ETH_ACCOUNTS("eth_accounts"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,34 @@
*/
package org.hyperledger.besu.ethereum.api.jsonrpc.internal.privacy.methods;

import static org.hyperledger.besu.ethereum.api.jsonrpc.internal.privacy.methods.MultiTenancyUserUtil.enclavePublicKey;

import org.hyperledger.besu.ethereum.core.PrivacyParameters;

import java.util.Optional;

import io.vertx.ext.auth.User;

@FunctionalInterface
public interface EnclavePublicKeyProvider {

String getEnclaveKey(Optional<User> user);

static EnclavePublicKeyProvider build(final PrivacyParameters privacyParameters) {
return privacyParameters.isMultiTenancyEnabled()
? multiTenancyEnclavePublicKeyProvider()
: defaultEnclavePublicKeyProvider(privacyParameters);
}

private static EnclavePublicKeyProvider multiTenancyEnclavePublicKeyProvider() {
return user ->
enclavePublicKey(user)
.orElseThrow(
() -> new IllegalStateException("Request does not contain an authorization token"));
}

private static EnclavePublicKeyProvider defaultEnclavePublicKeyProvider(
final PrivacyParameters privacyParameters) {
return user -> privacyParameters.getEnclavePublicKey();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
*/
package org.hyperledger.besu.ethereum.api.jsonrpc.methods;

import static org.hyperledger.besu.ethereum.api.jsonrpc.internal.privacy.methods.MultiTenancyUserUtil.enclavePublicKey;

import org.hyperledger.besu.ethereum.api.jsonrpc.LatestNonceProvider;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.JsonRpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.privacy.methods.DisabledPrivacyRpcMethod;
Expand Down Expand Up @@ -94,7 +92,8 @@ protected Map<String, JsonRpcMethod> create() {
final PrivateMarkerTransactionFactory markerTransactionFactory =
createPrivateMarkerTransactionFactory(
privacyParameters, blockchainQueries, transactionPool.getPendingTransactions());
final EnclavePublicKeyProvider enclavePublicProvider = createEnclavePublicKeyProvider();
final EnclavePublicKeyProvider enclavePublicProvider =
EnclavePublicKeyProvider.build(privacyParameters);
final PrivacyController privacyController = createPrivacyController(markerTransactionFactory);
return create(privacyController, enclavePublicProvider).entrySet().stream()
.collect(
Expand Down Expand Up @@ -123,23 +122,6 @@ private PrivateMarkerTransactionFactory createPrivateMarkerTransactionFactory(
return new RandomSigningPrivateMarkerTransactionFactory(privateContractAddress);
}

private EnclavePublicKeyProvider createEnclavePublicKeyProvider() {
return privacyParameters.isMultiTenancyEnabled()
? multiTenancyEnclavePublicKeyProvider()
: defaultEnclavePublicKeyProvider();
}

private EnclavePublicKeyProvider multiTenancyEnclavePublicKeyProvider() {
return user ->
enclavePublicKey(user)
.orElseThrow(
() -> new IllegalStateException("Request does not contain an authorization token"));
}

private EnclavePublicKeyProvider defaultEnclavePublicKeyProvider() {
return user -> privacyParameters.getEnclavePublicKey();
}

private PrivacyController createPrivacyController(
final PrivateMarkerTransactionFactory markerTransactionFactory) {
final Optional<BigInteger> chainId = protocolSchedule.getChainId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.hyperledger.besu.ethereum.api.jsonrpc.authentication.AuthenticationService;
import org.hyperledger.besu.ethereum.api.jsonrpc.authentication.AuthenticationUtils;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequestContext;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.exception.InvalidJsonRpcParameters;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.methods.JsonRpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcError;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcErrorResponse;
Expand Down Expand Up @@ -83,6 +84,9 @@ public void handle(
future.complete(
new JsonRpcUnauthorizedResponse(request.getId(), JsonRpcError.UNAUTHORIZED));
}
} catch (final InvalidJsonRpcParameters e) {
LOG.debug("Invalid Params", e);
future.complete(new JsonRpcErrorResponse(request.getId(), JsonRpcError.INVALID_PARAMS));
} catch (final Exception e) {
LOG.error(JsonRpcError.INTERNAL_ERROR.getMessage(), e);
future.complete(new JsonRpcErrorResponse(request.getId(), JsonRpcError.INTERNAL_ERROR));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.api.jsonrpc.websocket.methods;

import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequestContext;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.privacy.methods.EnclavePublicKeyProvider;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.SubscriptionManager;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.request.SubscriptionRequestMapper;
import org.hyperledger.besu.ethereum.privacy.PrivacyController;

abstract class AbstractPrivateSubscriptionMethod extends AbstractSubscriptionMethod {

private final PrivacyController privacyController;
private final EnclavePublicKeyProvider enclavePublicKeyProvider;

AbstractPrivateSubscriptionMethod(
final SubscriptionManager subscriptionManager,
final SubscriptionRequestMapper mapper,
final PrivacyController privacyController,
final EnclavePublicKeyProvider enclavePublicKeyProvider) {
super(subscriptionManager, mapper);
this.privacyController = privacyController;
this.enclavePublicKeyProvider = enclavePublicKeyProvider;
}

void checkIfPrivacyGroupMatchesAuthenticatedEnclaveKey(
final JsonRpcRequestContext request, final String privacyGroupId) {
final String enclavePublicKey = enclavePublicKeyProvider.getEnclaveKey(request.getUser());
privacyController.verifyPrivacyGroupContainsEnclavePublicKey(privacyGroupId, enclavePublicKey);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.api.jsonrpc.websocket.methods;

import org.hyperledger.besu.ethereum.api.jsonrpc.RpcMethod;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.JsonRpcRequestContext;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.privacy.methods.EnclavePublicKeyProvider;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcError;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcErrorResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcSuccessResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.Quantity;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.SubscriptionManager;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.request.InvalidSubscriptionRequestException;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.request.PrivateSubscribeRequest;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.request.SubscriptionRequestMapper;
import org.hyperledger.besu.ethereum.privacy.PrivacyController;

public class PrivSubscribe extends AbstractPrivateSubscriptionMethod {

public PrivSubscribe(
final SubscriptionManager subscriptionManager,
final SubscriptionRequestMapper mapper,
final PrivacyController privacyController,
final EnclavePublicKeyProvider enclavePublicKeyProvider) {
super(subscriptionManager, mapper, privacyController, enclavePublicKeyProvider);
}

@Override
public String getName() {
return RpcMethod.PRIV_SUBSCRIBE.getMethodName();
}

@Override
public JsonRpcResponse response(final JsonRpcRequestContext requestContext) {
try {
final PrivateSubscribeRequest subscribeRequest =
getMapper().mapPrivateSubscribeRequest(requestContext);

checkIfPrivacyGroupMatchesAuthenticatedEnclaveKey(
requestContext, subscribeRequest.getPrivacyGroupId());

final Long subscriptionId = subscriptionManager().subscribe(subscribeRequest);

return new JsonRpcSuccessResponse(
requestContext.getRequest().getId(), Quantity.create(subscriptionId));
} catch (final InvalidSubscriptionRequestException isEx) {
return new JsonRpcErrorResponse(
requestContext.getRequest().getId(), JsonRpcError.INVALID_REQUEST);
}
}
}
Loading

0 comments on commit 06ca344

Please sign in to comment.