Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for private log subscriptions (Pub-Sub API) #858

Merged
merged 9 commits into from
May 12, 2020
Prev Previous commit
Next Next commit
Observing blocks and checking for private logs
Signed-off-by: Lucas Saldanha <lucas.saldanha@consensys.net>
  • Loading branch information
lucassaldanha committed May 12, 2020
commit 73d31867e5cebe53af45e39de8f026972007f4d5
32 changes: 29 additions & 3 deletions besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.pending.PendingTransactionSubscriptionService;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.syncing.SyncingSubscriptionService;
import org.hyperledger.besu.ethereum.api.query.BlockchainQueries;
import org.hyperledger.besu.ethereum.api.query.PrivacyQueries;
import org.hyperledger.besu.ethereum.blockcreation.MiningCoordinator;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.MiningParameters;
Expand Down Expand Up @@ -85,6 +86,7 @@
import org.hyperledger.besu.ethereum.permissioning.node.PeerPermissionsAdapter;
import org.hyperledger.besu.ethereum.stratum.StratumServer;
import org.hyperledger.besu.ethereum.transaction.TransactionSimulator;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
import org.hyperledger.besu.metrics.ObservableMetricsSystem;
import org.hyperledger.besu.metrics.prometheus.MetricsConfiguration;
import org.hyperledger.besu.metrics.prometheus.MetricsService;
Expand Down Expand Up @@ -524,7 +526,11 @@ public Runner build() {
final SubscriptionManager subscriptionManager =
createSubscriptionManager(vertx, transactionPool);

createLogsSubscriptionService(context.getBlockchain(), subscriptionManager);
createLogsSubscriptionService(
context.getBlockchain(),
context.getWorldStateArchive(),
subscriptionManager,
privacyParameters);

createNewBlockHeadersSubscriptionService(
context.getBlockchain(), blockchainQueries, subscriptionManager);
Expand Down Expand Up @@ -706,11 +712,31 @@ private SubscriptionManager createSubscriptionManager(
}

private void createLogsSubscriptionService(
final Blockchain blockchain, final SubscriptionManager subscriptionManager) {
final Blockchain blockchain,
final WorldStateArchive worldStateArchive,
final SubscriptionManager subscriptionManager,
final PrivacyParameters privacyParameters) {

Optional<PrivacyQueries> privacyQueries = Optional.empty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could do an if/else or even

privacyQueries = privacyParameters.isEnabled() ? x : y;

Copy link
Member Author

@lucassaldanha lucassaldanha May 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the way that it is implemented makes it more readable. PrivacyQueries start empty and, if privacy is enabled, we set it to the proper object.

I think in this case the ternary operator will make it less readable.

if (privacyParameters.isEnabled()) {
final BlockchainQueries blockchainQueries =
new BlockchainQueries(blockchain, worldStateArchive);
privacyQueries =
Optional.of(
new PrivacyQueries(
blockchainQueries, privacyParameters.getPrivateWorldStateReader()));
}

final LogsSubscriptionService logsSubscriptionService =
new LogsSubscriptionService(subscriptionManager);
new LogsSubscriptionService(subscriptionManager, privacyQueries);

// monitoring public logs
blockchain.observeLogs(logsSubscriptionService);

// monitoring private logs
if (privacyParameters.isEnabled()) {
blockchain.observeBlockAdded(logsSubscriptionService::checkPrivateLogs);
}
}

private void createSyncingSubscriptionService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ public <T> List<T> subscriptionsOfType(final SubscriptionType type, final Class<
}

public void sendMessage(final Long subscriptionId, final JsonRpcResult msg) {
final SubscriptionResponse response = new SubscriptionResponse(subscriptionId, msg);

final Subscription subscription = subscriptions.get(subscriptionId);
final SubscriptionResponse response = new SubscriptionResponse(subscription, msg);

if (subscription != null) {
vertx.eventBus().send(subscription.getConnectionId(), Json.encode(response));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,25 @@
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.LogResult;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.SubscriptionManager;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.request.SubscriptionType;
import org.hyperledger.besu.ethereum.api.query.LogsQuery;
import org.hyperledger.besu.ethereum.api.query.PrivacyQueries;
import org.hyperledger.besu.ethereum.chain.BlockAddedEvent;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.LogWithMetadata;

import java.util.Optional;
import java.util.function.Consumer;

public class LogsSubscriptionService implements Consumer<LogWithMetadata> {

private final SubscriptionManager subscriptionManager;
private final Optional<PrivacyQueries> privacyQueries;

public LogsSubscriptionService(final SubscriptionManager subscriptionManager) {
public LogsSubscriptionService(
final SubscriptionManager subscriptionManager,
final Optional<PrivacyQueries> privacyQueries) {
this.subscriptionManager = subscriptionManager;
this.privacyQueries = privacyQueries;
}

@Override
Expand All @@ -46,9 +54,34 @@ public void accept(final LogWithMetadata logWithMetadata) {
&& filterParameter.getToBlock().getNumber().orElse(Long.MAX_VALUE) >= blockNumber
&& filterParameter.getLogsQuery().matches(logWithMetadata);
})
.forEach(
logsSubscription ->
subscriptionManager.sendMessage(
logsSubscription.getSubscriptionId(), new LogResult(logWithMetadata)));
.forEach(logsSubscription -> sendLogToSubscription(logWithMetadata, logsSubscription));
}

public void checkPrivateLogs(final BlockAddedEvent event) {
privacyQueries.ifPresent(
pq ->
subscriptionManager.subscriptionsOfType(SubscriptionType.LOGS, LogsSubscription.class)
.stream()
.filter(PrivateLogsSubscription.class::isInstance)
.map(PrivateLogsSubscription.class::cast)
.forEach(queryPrivateEventForSubscription(pq, event)));
}

private Consumer<PrivateLogsSubscription> queryPrivateEventForSubscription(
final PrivacyQueries privacyQueries, final BlockAddedEvent event) {
return subscription -> {
final String privacyGroupId = subscription.getPrivacyGroupId();
final LogsQuery logsQuery = subscription.getFilterParameter().getLogsQuery();

privacyQueries
.matchingLogs(privacyGroupId, event.getBlock().getHash(), logsQuery)
.forEach(logWithMetadata -> sendLogToSubscription(logWithMetadata, subscription));
};
}

private void sendLogToSubscription(
final LogWithMetadata logWithMetadata, final LogsSubscription subscription) {
subscriptionManager.sendMessage(
subscription.getSubscriptionId(), new LogResult(logWithMetadata));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.JsonRpcResult;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.Quantity;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.Subscription;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.logs.PrivateLogsSubscription;

import com.fasterxml.jackson.annotation.JsonGetter;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
Expand All @@ -24,12 +26,24 @@
public class SubscriptionResponse {

private static final String JSON_RPC_VERSION = "2.0";
private static final String METHOD_NAME = "eth_subscription";
private static final String ETH_SUBSCRIPTION_METHOD = "eth_subscription";
private static final String PRIV_SUBSCRIPTION_METHOD = "priv_subscription";

private final String methodName;
private final SubscriptionResponseResult params;

public SubscriptionResponse(final long subscriptionId, final JsonRpcResult result) {
this.params = new SubscriptionResponseResult(Quantity.create(subscriptionId), result);
public SubscriptionResponse(final Subscription subscription, final JsonRpcResult result) {
if (subscription instanceof PrivateLogsSubscription) {
final String privacyGroupId = ((PrivateLogsSubscription) subscription).getPrivacyGroupId();
this.methodName = PRIV_SUBSCRIPTION_METHOD;
this.params =
new SubscriptionResponseResult(
Quantity.create(subscription.getSubscriptionId()), result, privacyGroupId);
} else {
this.methodName = ETH_SUBSCRIPTION_METHOD;
this.params =
new SubscriptionResponseResult(Quantity.create(subscription.getSubscriptionId()), result);
}
}

@JsonGetter("jsonrpc")
Expand All @@ -39,7 +53,7 @@ public String getJsonrpc() {

@JsonGetter("method")
public String getMethod() {
return METHOD_NAME;
return methodName;
}

@JsonGetter("params")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,28 @@
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.JsonRpcResult;

import com.fasterxml.jackson.annotation.JsonGetter;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;

@JsonPropertyOrder({"subscription", "result"})
@JsonPropertyOrder({"subscription", "privacyGroupId", "result"})
public class SubscriptionResponseResult {

private final String subscription;
private final JsonRpcResult result;

@JsonInclude(Include.NON_NULL)
private final String privacyGroupId;

SubscriptionResponseResult(final String subscription, final JsonRpcResult result) {
this(subscription, result, null);
}

SubscriptionResponseResult(
final String subscription, final JsonRpcResult result, final String privacyGroupId) {
this.subscription = subscription;
this.result = result;
this.privacyGroupId = privacyGroupId;
}

@JsonGetter("subscription")
Expand All @@ -39,4 +50,9 @@ public String getSubscription() {
public JsonRpcResult getResult() {
return result;
}

@JsonGetter("privacyGroupId")
public String getPrivacyGroupId() {
return privacyGroupId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ public void shouldSendMessageOnTheConnectionIdEventBusAddressForExistingSubscrip
new SubscribeRequest(SubscriptionType.SYNCING, null, null, connectionId);

final JsonRpcResult expectedResult = mock(JsonRpcResult.class);
final SubscriptionResponse expectedResponse = new SubscriptionResponse(1L, expectedResult);
final Subscription subscription =
new Subscription(1L, connectionId, SubscriptionType.SYNCING, false);
final SubscriptionResponse expectedResponse =
new SubscriptionResponse(subscription, expectedResult);

final Long subscriptionId = subscriptionManager.subscribe(subscribeRequest);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,27 @@
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.LogResult;
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.Quantity;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.SubscriptionManager;
import org.hyperledger.besu.ethereum.api.query.PrivacyQueries;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.Address;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockDataGenerator;
import org.hyperledger.besu.ethereum.core.BlockDataGenerator.BlockOptions;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.BlockWithReceipts;
import org.hyperledger.besu.ethereum.core.Hash;
import org.hyperledger.besu.ethereum.core.InMemoryStorageProvider;
import org.hyperledger.besu.ethereum.core.Log;
import org.hyperledger.besu.ethereum.core.LogTopic;
import org.hyperledger.besu.ethereum.core.LogWithMetadata;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand All @@ -69,10 +73,14 @@ public class LogsSubscriptionServiceTest {

@Mock private SubscriptionManager subscriptionManager;

@Mock private PrivacyQueries privacyQueries;

@Before
public void before() {
logsSubscriptionService = new LogsSubscriptionService(subscriptionManager);
logsSubscriptionService =
new LogsSubscriptionService(subscriptionManager, Optional.of(privacyQueries));
blockchain.observeLogs(logsSubscriptionService);
blockchain.observeBlockAdded(logsSubscriptionService::checkPrivateLogs);
}

@Test
Expand Down Expand Up @@ -283,6 +291,38 @@ public void noMatchingLogsEmitted() {
.sendMessage(eq(subscription.getSubscriptionId()), captor.capture());
}

@Test
public void whenExistsPrivateLogsSubscriptionPrivacyQueriesIsCalled() {
final String privacyGroupId = "privacy_group_id";
final Address address = Address.fromHexString("0x0");
final PrivateLogsSubscription subscription = createPrivateSubscription(privacyGroupId, address);
registerSubscriptions(subscription);

final BlockWithReceipts blockWithReceipts = generateBlock(2, 2, 2);
blockchain.appendBlock(blockWithReceipts.getBlock(), blockWithReceipts.getReceipts());

verify(privacyQueries)
.matchingLogs(
eq(subscription.getPrivacyGroupId()),
eq(blockWithReceipts.getHash()),
eq(subscription.getFilterParameter().getLogsQuery()));
}

@Test
public void whenPrivateLogsSubscriptionMatchesLogNotificationIsSent() {
final String privacyGroupId = "privacy_group_id";
final Address address = Address.fromHexString("0x0");
final PrivateLogsSubscription subscription = createPrivateSubscription(privacyGroupId, address);
registerSubscriptions(subscription);

when(privacyQueries.matchingLogs(any(), any(), any())).thenReturn(List.of(logWithMetadata()));

final BlockWithReceipts blockWithReceipts = generateBlock(2, 2, 2);
blockchain.appendBlock(blockWithReceipts.getBlock(), blockWithReceipts.getReceipts());

verify(subscriptionManager, times(1)).sendMessage(eq(subscription.getSubscriptionId()), any());
}

private void assertLogResultMatches(
final LogResult result,
final Block block,
Expand Down Expand Up @@ -347,6 +387,20 @@ private BlockWithReceipts generateBlock(
return new BlockWithReceipts(block, receipts);
}

private PrivateLogsSubscription createPrivateSubscription(
final String privacyGroupId, final Address address) {
return new PrivateLogsSubscription(
nextSubscriptionId.incrementAndGet(),
"conn",
new FilterParameter(
BlockParameter.LATEST,
BlockParameter.LATEST,
Arrays.asList(address),
Collections.emptyList(),
null),
privacyGroupId);
}

private LogsSubscription createSubscription(final Address address) {
return createSubscription(Arrays.asList(address), Collections.emptyList());
}
Expand All @@ -369,4 +423,17 @@ private void registerSubscriptions(final List<LogsSubscription> subscriptions) {
when(subscriptionManager.subscriptionsOfType(any(), any()))
.thenReturn(Lists.newArrayList(subscriptions));
}

private LogWithMetadata logWithMetadata() {
return new LogWithMetadata(
0,
100L,
Hash.ZERO,
Hash.ZERO,
0,
Address.fromHexString("0x0"),
Bytes.EMPTY,
Lists.newArrayList(),
false);
}
}