Skip to content

Commit

Permalink
Added event listener to remove unsubscribe websocket if user removed … (
Browse files Browse the repository at this point in the history
#1352)

* Added event listener to unsubscribe websocket if user removed from privacy group.

Signed-off-by: Mark Terry <mark.terry@consensys.net>
  • Loading branch information
mark-terry authored Sep 10, 2020
1 parent 30ccddc commit 8118330
Show file tree
Hide file tree
Showing 14 changed files with 180 additions and 34 deletions.
20 changes: 13 additions & 7 deletions besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.hyperledger.besu.ethereum.permissioning.node.InsufficientPeersPermissioningProvider;
import org.hyperledger.besu.ethereum.permissioning.node.NodePermissioningController;
import org.hyperledger.besu.ethereum.permissioning.node.PeerPermissionsAdapter;
import org.hyperledger.besu.ethereum.privacy.PrivateTransactionObserver;
import org.hyperledger.besu.ethereum.stratum.StratumServer;
import org.hyperledger.besu.ethereum.transaction.TransactionSimulator;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
Expand Down Expand Up @@ -562,7 +563,7 @@ public Runner build() {
besuPluginContext.getNamedPlugins());

final SubscriptionManager subscriptionManager =
createSubscriptionManager(vertx, transactionPool);
createSubscriptionManager(vertx, transactionPool, blockchainQueries);

createLogsSubscriptionService(
context.getBlockchain(),
Expand All @@ -586,6 +587,8 @@ public Runner build() {
protocolSchedule,
blockchainQueries,
transactionPool));

createPrivateTransactionObserver(subscriptionManager, privacyParameters);
}

Optional<MetricsService> metricsService = Optional.empty();
Expand Down Expand Up @@ -753,8 +756,11 @@ private Map<String, JsonRpcMethod> jsonRpcMethods(
}

private SubscriptionManager createSubscriptionManager(
final Vertx vertx, final TransactionPool transactionPool) {
final SubscriptionManager subscriptionManager = new SubscriptionManager(metricsSystem);
final Vertx vertx,
final TransactionPool transactionPool,
final BlockchainQueries blockchainQueries) {
final SubscriptionManager subscriptionManager =
new SubscriptionManager(metricsSystem, blockchainQueries.getBlockchain());
final PendingTransactionSubscriptionService pendingTransactions =
new PendingTransactionSubscriptionService(subscriptionManager);
final PendingTransactionDroppedSubscriptionService pendingTransactionsRemoved =
Expand Down Expand Up @@ -795,9 +801,9 @@ private void createLogsSubscriptionService(
}

private void createPrivateTransactionObserver(
final FilterManager filterManager, final PrivacyParameters privacyParameters) {
// register filterManager as observer of events fired by the onchain precompile.
// filterManager needs to remove filters when the creator is removed from onchain group
final PrivateTransactionObserver privateTransactionObserver,
final PrivacyParameters privacyParameters) {
// register privateTransactionObserver as observer of events fired by the onchain precompile.
if (privacyParameters.isOnchainPrivacyGroupsEnabled()
&& privacyParameters.isMultiTenancyEnabled()) {
final OnChainPrivacyPrecompiledContract onchainPrivacyPrecompiledContract =
Expand All @@ -807,7 +813,7 @@ private void createPrivateTransactionObserver(
.getByBlockNumber(1)
.getPrecompileContractRegistry()
.get(Address.ONCHAIN_PRIVACY, Account.DEFAULT_VERSION);
onchainPrivacyPrecompiledContract.addPrivateTransactionObserver(filterManager);
onchainPrivacyPrecompiledContract.addPrivateTransactionObserver(privateTransactionObserver);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
abstract class AbstractPrivateSubscriptionMethod extends AbstractSubscriptionMethod {

private final PrivacyController privacyController;
private final EnclavePublicKeyProvider enclavePublicKeyProvider;
protected final EnclavePublicKeyProvider enclavePublicKeyProvider;

AbstractPrivateSubscriptionMethod(
final SubscriptionManager subscriptionManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ public String getName() {
@Override
public JsonRpcResponse response(final JsonRpcRequestContext requestContext) {
try {
final String enclavePublicKey =
enclavePublicKeyProvider.getEnclaveKey(requestContext.getUser());
final PrivateSubscribeRequest subscribeRequest =
getMapper().mapPrivateSubscribeRequest(requestContext);
getMapper().mapPrivateSubscribeRequest(requestContext, enclavePublicKey);

checkIfPrivacyGroupMatchesAuthenticatedEnclaveKey(
requestContext, subscribeRequest.getPrivacyGroupId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,13 @@ public Subscription build(
private Subscription logsSubscription(
final long subscriptionId, final String connectionId, final SubscribeRequest request) {
if (request instanceof PrivateSubscribeRequest) {
final PrivateSubscribeRequest privateSubscribeRequest = (PrivateSubscribeRequest) request;
return new PrivateLogsSubscription(
subscriptionId,
connectionId,
request.getFilterParameter(),
((PrivateSubscribeRequest) request).getPrivacyGroupId());
privateSubscribeRequest.getFilterParameter(),
privateSubscribeRequest.getPrivacyGroupId(),
privateSubscribeRequest.getEnclavePublicKey());
} else {
return new LogsSubscription(subscriptionId, connectionId, request.getFilterParameter());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,20 @@
package org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription;

import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.JsonRpcResult;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.logs.PrivateLogsSubscription;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.request.SubscribeRequest;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.request.SubscriptionType;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.request.UnsubscribeRequest;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.response.SubscriptionResponse;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.privacy.PrivateTransactionEvent;
import org.hyperledger.besu.ethereum.privacy.PrivateTransactionObserver;
import org.hyperledger.besu.metrics.BesuMetricCategory;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -41,7 +46,7 @@
* The SubscriptionManager is responsible for managing subscriptions and sending messages to the
* clients that have an active subscription.
*/
public class SubscriptionManager extends AbstractVerticle {
public class SubscriptionManager extends AbstractVerticle implements PrivateTransactionObserver {

private static final Logger LOG = LogManager.getLogger();

Expand All @@ -53,6 +58,13 @@ public class SubscriptionManager extends AbstractVerticle {
private final SubscriptionBuilder subscriptionBuilder = new SubscriptionBuilder();
private final LabelledMetric<Counter> subscribeCounter;
private final LabelledMetric<Counter> unsubscribeCounter;
private final List<PrivateTransactionEvent> privateTransactionEvents = new ArrayList<>();

public SubscriptionManager(
final MetricsSystem metricsSystem, final Blockchain blockchainQueries) {
this(metricsSystem);
blockchainQueries.observeBlockAdded(event -> onBlockAdded());
}

public SubscriptionManager(final MetricsSystem metricsSystem) {
subscribeCounter =
Expand Down Expand Up @@ -158,4 +170,28 @@ public <T> void notifySubscribersOnWorkerThread(
}
});
}

@Override
public void onPrivateTransactionProcessed(final PrivateTransactionEvent event) {
privateTransactionEvents.add(event);
}

void onBlockAdded() {
privateTransactionEvents.forEach(this::processPrivateTransactionEvents);
}

private void processPrivateTransactionEvents(final PrivateTransactionEvent event) {
// When a user is removed from a privacy group, remove all subscriptions from that user to that
// group
subscriptionsOfType(SubscriptionType.LOGS, PrivateLogsSubscription.class).stream()
.filter(
subscription ->
subscription.getEnclavePublicKey().equals(event.getEnclavePublicKey())
&& subscription.getPrivacyGroupId().equals(event.getPrivacyGroupId()))
.forEach(
subscription ->
this.unsubscribe(
new UnsubscribeRequest(
subscription.getSubscriptionId(), subscription.getConnectionId())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,24 @@
public class PrivateLogsSubscription extends LogsSubscription {

private final String privacyGroupId;
private final String enclavePublicKey;

public PrivateLogsSubscription(
final Long subscriptionId,
final String connectionId,
final FilterParameter filterParameter,
final String privacyGroupId) {
final String privacyGroupId,
final String enclavePublicKey) {
super(subscriptionId, connectionId, filterParameter);
this.privacyGroupId = privacyGroupId;
this.enclavePublicKey = enclavePublicKey;
}

public String getPrivacyGroupId() {
return privacyGroupId;
}

public String getEnclavePublicKey() {
return enclavePublicKey;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,28 @@
public class PrivateSubscribeRequest extends SubscribeRequest {

private final String privacyGroupId;
private final String enclavePublicKey;

public PrivateSubscribeRequest(
final SubscriptionType subscriptionType,
final FilterParameter filterParameter,
final Boolean includeTransaction,
final String connectionId,
final String privacyGroupId) {
final String privacyGroupId,
final String enclavePublicKey) {
super(subscriptionType, filterParameter, includeTransaction, connectionId);
this.privacyGroupId = privacyGroupId;
this.enclavePublicKey = enclavePublicKey;
}

public String getPrivacyGroupId() {
return privacyGroupId;
}

public String getEnclavePublicKey() {
return enclavePublicKey;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand All @@ -47,8 +54,9 @@ public boolean equals(final Object o) {
if (!super.equals(o)) {
return false;
}
PrivateSubscribeRequest that = (PrivateSubscribeRequest) o;
return privacyGroupId.equals(that.privacyGroupId);
final PrivateSubscribeRequest that = (PrivateSubscribeRequest) o;
return privacyGroupId.equals(that.privacyGroupId)
&& enclavePublicKey.equals(that.enclavePublicKey);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public UnsubscribeRequest mapUnsubscribeRequest(final JsonRpcRequestContext json
}

public PrivateSubscribeRequest mapPrivateSubscribeRequest(
final JsonRpcRequestContext jsonRpcRequestContext)
final JsonRpcRequestContext jsonRpcRequestContext, final String enclavePublicKey)
throws InvalidSubscriptionRequestException {
try {
final WebSocketRpcRequest webSocketRpcRequestBody = validateRequest(jsonRpcRequestContext);
Expand All @@ -108,7 +108,8 @@ public PrivateSubscribeRequest mapPrivateSubscribeRequest(
filterParameter,
null,
webSocketRpcRequestBody.getConnectionId(),
privacyGroupId);
privacyGroupId,
enclavePublicKey);
}
default:
throw new InvalidSubscriptionRequestException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.hyperledger.besu.ethereum.api.jsonrpc.websocket;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verifyNoInteractions;
Expand Down Expand Up @@ -47,7 +48,6 @@
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;

@RunWith(VertxUnitRunner.class)
public class WebSocketServiceTest {
Expand Down Expand Up @@ -77,7 +77,7 @@ public void before() {
new WebSocketRequestHandler(
vertx,
websocketMethods,
Mockito.mock(EthScheduler.class),
mock(EthScheduler.class),
TimeoutOptions.defaultOptions().getTimeoutSeconds()));

websocketService =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.privacy.methods.EnclavePublicKeyProvider;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcError;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcErrorResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.response.JsonRpcSuccessResponse;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.Quantity;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.SubscriptionManager;
Expand Down Expand Up @@ -80,9 +81,10 @@ public void responseContainsSubscriptionId() {
null,
null,
webSocketRequest.getConnectionId(),
PRIVACY_GROUP_ID);
PRIVACY_GROUP_ID,
"public_key");

when(mapperMock.mapPrivateSubscribeRequest(eq(jsonRpcrequestContext)))
when(mapperMock.mapPrivateSubscribeRequest(eq(jsonRpcrequestContext), any()))
.thenReturn(subscribeRequest);
when(subscriptionManagerMock.subscribe(eq(subscribeRequest))).thenReturn(1L);

Expand All @@ -98,7 +100,7 @@ public void invalidSubscribeRequestRespondsInvalidRequestResponse() {
final WebSocketRpcRequest webSocketRequest = createWebSocketRpcRequest();
final JsonRpcRequestContext jsonRpcrequestContext = new JsonRpcRequestContext(webSocketRequest);

when(mapperMock.mapPrivateSubscribeRequest(any()))
when(mapperMock.mapPrivateSubscribeRequest(any(), any()))
.thenThrow(new InvalidSubscriptionRequestException());

final JsonRpcErrorResponse expectedResponse =
Expand All @@ -121,9 +123,10 @@ public void multiTenancyCheckFailure() {
null,
null,
webSocketRequest.getConnectionId(),
PRIVACY_GROUP_ID);
PRIVACY_GROUP_ID,
"public_key");

when(mapperMock.mapPrivateSubscribeRequest(any())).thenReturn(subscribeRequest);
when(mapperMock.mapPrivateSubscribeRequest(any(), any())).thenReturn(subscribeRequest);
when(enclavePublicKeyProvider.getEnclaveKey(any())).thenReturn(ENCLAVE_KEY);
doThrow(new MultiTenancyValidationException("msg"))
.when(privacyController)
Expand All @@ -134,6 +137,31 @@ public void multiTenancyCheckFailure() {
.hasMessageContaining("msg");
}

@Test
public void multiTenancyCheckSuccess() {
final User user = mock(User.class);
final WebSocketRpcRequest webSocketRequest = createWebSocketRpcRequest();
final JsonRpcRequestContext jsonRpcrequestContext =
new JsonRpcRequestContext(webSocketRequest, user);

final PrivateSubscribeRequest subscribeRequest =
new PrivateSubscribeRequest(
SubscriptionType.LOGS,
null,
null,
webSocketRequest.getConnectionId(),
PRIVACY_GROUP_ID,
ENCLAVE_KEY);

when(mapperMock.mapPrivateSubscribeRequest(any(), any())).thenReturn(subscribeRequest);
when(enclavePublicKeyProvider.getEnclaveKey(any())).thenReturn(ENCLAVE_KEY);

// This should pass if a MultiTenancyMultiTenancyValidationException isn't thrown

final JsonRpcResponse response = privSubscribe.response(jsonRpcrequestContext);
assertThat(response).isInstanceOf(JsonRpcSuccessResponse.class);
}

private WebSocketRpcRequest createWebSocketRpcRequest() {
return Json.decodeValue(
"{\"id\": 1, \"method\": \"priv_subscribe\", \"params\": [\""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,19 @@ public void shouldBuildLogsSubscriptionWhenSubscribeRequestTypeIsLogs() {
@Test
public void shouldBuildPrivateLogsSubscriptionWhenSubscribeRequestTypeIsPrivateLogs() {
final String privacyGroupId = "ZDmkMK7CyxA1F1rktItzKFTfRwApg7aWzsTtm2IOZ5Y=";
final String enclavePublicKey = "C1bVtMxLCUHmBVHXoZzzBgPbW/wj5axDpW9X8l91SGo=";
final FilterParameter filterParameter = filterParameter();
final PrivateSubscribeRequest subscribeRequest =
new PrivateSubscribeRequest(
SubscriptionType.LOGS, filterParameter, null, CONNECTION_ID, privacyGroupId);
SubscriptionType.LOGS,
filterParameter,
null,
CONNECTION_ID,
privacyGroupId,
enclavePublicKey);
final PrivateLogsSubscription expectedSubscription =
new PrivateLogsSubscription(1L, CONNECTION_ID, filterParameter, privacyGroupId);
new PrivateLogsSubscription(
1L, CONNECTION_ID, filterParameter, privacyGroupId, enclavePublicKey);

final Subscription builtSubscription =
subscriptionBuilder.build(1L, CONNECTION_ID, subscribeRequest);
Expand Down
Loading

0 comments on commit 8118330

Please sign in to comment.