From bb614fe91739ae6d632f2ae81167d824768961f2 Mon Sep 17 00:00:00 2001 From: Mark Terry Date: Thu, 6 Jun 2019 14:20:02 +1000 Subject: [PATCH] [PAN-2547] Refactored vertx thread handling to super + refactored tests. --- .../subscription/SubscriptionManager.java | 19 ++++++++ .../NewBlockHeadersSubscriptionService.java | 35 ++++++--------- .../syncing/SyncingSubscriptionService.java | 32 +++++--------- ...ewBlockHeadersSubscriptionServiceTest.java | 44 +++++++++++++++---- .../SyncingSubscriptionServiceTest.java | 39 +++++++++++----- 5 files changed, 106 insertions(+), 63 deletions(-) diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionManager.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionManager.java index 7cdb383fab..c581b197de 100644 --- a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionManager.java +++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/SubscriptionManager.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; @@ -160,4 +161,22 @@ public void sendMessage(final Long subscriptionId, final JsonRpcResult msg) { .findFirst() .ifPresent(connectionId -> vertx.eventBus().send(connectionId, Json.encode(response))); } + + public Void notifySubscribersOnWorkerThread( + final SubscriptionType subscriptionType, + final Class clazz, + final Consumer> runnable) { + vertx.executeBlocking( + future -> { + final List syncingSubscriptions = subscriptionsOfType(subscriptionType, clazz); + runnable.accept(syncingSubscriptions); + future.complete(); + }, + result -> { + if (result.failed()) { + LOG.error("Failed to notify subscribers. Cause: {}", result.cause().getMessage()); + } + }); + return null; + } } diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/blockheaders/NewBlockHeadersSubscriptionService.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/blockheaders/NewBlockHeadersSubscriptionService.java index 44e6a91191..90b2cc905b 100644 --- a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/blockheaders/NewBlockHeadersSubscriptionService.java +++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/blockheaders/NewBlockHeadersSubscriptionService.java @@ -22,8 +22,6 @@ import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.SubscriptionManager; import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType; -import java.util.List; - public class NewBlockHeadersSubscriptionService implements BlockAddedObserver { private final SubscriptionManager subscriptionManager; @@ -38,28 +36,21 @@ public NewBlockHeadersSubscriptionService( @Override public void onBlockAdded(final BlockAddedEvent event, final Blockchain blockchain) { - subscriptionManager - .getVertx() - .executeBlocking( - future -> { - final List subscribers = - subscriptionManager.subscriptionsOfType( - SubscriptionType.NEW_BLOCK_HEADERS, NewBlockHeadersSubscription.class); - - final Hash newBlockHash = event.getBlock().getHash(); - - for (final NewBlockHeadersSubscription subscription : subscribers) { - final BlockResult newBlock = - subscription.getIncludeTransactions() - ? blockWithCompleteTransaction(newBlockHash) - : blockWithTransactionHash(newBlockHash); + subscriptionManager.notifySubscribersOnWorkerThread( + SubscriptionType.NEW_BLOCK_HEADERS, + NewBlockHeadersSubscription.class, + subscribers -> { + final Hash newBlockHash = event.getBlock().getHash(); - subscriptionManager.sendMessage(subscription.getId(), newBlock); - } + for (final NewBlockHeadersSubscription subscription : subscribers) { + final BlockResult newBlock = + subscription.getIncludeTransactions() + ? blockWithCompleteTransaction(newBlockHash) + : blockWithTransactionHash(newBlockHash); - future.complete(); - }, - result -> {}); + subscriptionManager.sendMessage(subscription.getId(), newBlock); + } + }); } private BlockResult blockWithCompleteTransaction(final Hash hash) { diff --git a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionService.java b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionService.java index 17467ce564..fe86d61612 100644 --- a/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionService.java +++ b/ethereum/jsonrpc/src/main/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionService.java @@ -19,8 +19,6 @@ import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.SubscriptionManager; import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType; -import java.util.List; - public class SyncingSubscriptionService { private final SubscriptionManager subscriptionManager; @@ -32,23 +30,17 @@ public SyncingSubscriptionService( } private void sendSyncingToMatchingSubscriptions(final SyncStatus syncStatus) { - subscriptionManager - .getVertx() - .executeBlocking( - future -> { - final List syncingSubscriptions = - subscriptionManager.subscriptionsOfType( - SubscriptionType.SYNCING, Subscription.class); - - if (syncStatus.inSync()) { - syncingSubscriptions.forEach( - s -> subscriptionManager.sendMessage(s.getId(), new NotSynchronisingResult())); - } else { - syncingSubscriptions.forEach( - s -> subscriptionManager.sendMessage(s.getId(), new SyncingResult(syncStatus))); - } - future.complete(); - }, - result -> {}); + subscriptionManager.notifySubscribersOnWorkerThread( + SubscriptionType.SYNCING, + Subscription.class, + syncingSubscriptions -> { + if (syncStatus.inSync()) { + syncingSubscriptions.forEach( + s -> subscriptionManager.sendMessage(s.getId(), new NotSynchronisingResult())); + } else { + syncingSubscriptions.forEach( + s -> subscriptionManager.sendMessage(s.getId(), new SyncingResult(syncStatus))); + } + }); } } diff --git a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/blockheaders/NewBlockHeadersSubscriptionServiceTest.java b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/blockheaders/NewBlockHeadersSubscriptionServiceTest.java index 7279a4476f..fb2f0271e3 100644 --- a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/blockheaders/NewBlockHeadersSubscriptionServiceTest.java +++ b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/blockheaders/NewBlockHeadersSubscriptionServiceTest.java @@ -14,7 +14,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -40,9 +40,9 @@ import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.function.Consumer; import com.google.common.collect.Lists; -import io.vertx.core.Vertx; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -66,18 +66,17 @@ public class NewBlockHeadersSubscriptionServiceTest { private final TransactionTestFixture txTestFixture = new TransactionTestFixture(); private final BlockHeader blockHeader = blockHeaderTestFixture.buildHeader(); private final BlockResultFactory blockResultFactory = new BlockResultFactory(); - private final Vertx vertx = Vertx.vertx(); @Before public void before() { newBlockHeadersSubscriptionService = new NewBlockHeadersSubscriptionService(subscriptionManager, blockchainQueries); - when(subscriptionManager.getVertx()).thenReturn(vertx); } @Test public void shouldSendMessageWhenBlockAdded() { final NewBlockHeadersSubscription subscription = createSubscription(false); + final List subscriptions = Collections.singletonList(subscription); final BlockWithMetadata testBlockWithMetadata = new BlockWithMetadata<>( blockHeader, Collections.emptyList(), Collections.emptyList(), UInt256.ONE, 1); @@ -86,9 +85,18 @@ public void shouldSendMessageWhenBlockAdded() { when(blockchainQueries.blockByHashWithTxHashes(testBlockWithMetadata.getHeader().getHash())) .thenReturn(Optional.of(testBlockWithMetadata)); + doAnswer( + invocation -> { + Consumer> consumer = invocation.getArgument(2); + consumer.accept(subscriptions); + return null; + }) + .when(subscriptionManager) + .notifySubscribersOnWorkerThread(any(), any(), any()); + simulateAddingBlock(); - verify(subscriptionManager, timeout(10000)) + verify(subscriptionManager) .sendMessage(subscriptionIdCaptor.capture(), responseCaptor.capture()); final Long actualSubscriptionId = subscriptionIdCaptor.getValue(); final Object actualBlock = responseCaptor.getValue(); @@ -102,6 +110,7 @@ public void shouldSendMessageWhenBlockAdded() { @Test public void shouldReturnTxHashesWhenIncludeTransactionsFalse() { final NewBlockHeadersSubscription subscription = createSubscription(false); + final List subscriptions = Collections.singletonList(subscription); final List txHashList = transactionsWithHashOnly(); final BlockWithMetadata testBlockWithMetadata = new BlockWithMetadata<>(blockHeader, txHashList, Collections.emptyList(), UInt256.ONE, 1); @@ -110,9 +119,18 @@ public void shouldReturnTxHashesWhenIncludeTransactionsFalse() { when(blockchainQueries.blockByHashWithTxHashes(testBlockWithMetadata.getHeader().getHash())) .thenReturn(Optional.of(testBlockWithMetadata)); + doAnswer( + invocation -> { + Consumer> consumer = invocation.getArgument(2); + consumer.accept(subscriptions); + return null; + }) + .when(subscriptionManager) + .notifySubscribersOnWorkerThread(any(), any(), any()); + simulateAddingBlock(); - verify(subscriptionManager, timeout(10000)) + verify(subscriptionManager) .sendMessage(subscriptionIdCaptor.capture(), responseCaptor.capture()); final Long actualSubscriptionId = subscriptionIdCaptor.getValue(); final Object actualBlock = responseCaptor.getValue(); @@ -131,6 +149,7 @@ public void shouldReturnTxHashesWhenIncludeTransactionsFalse() { @Test public void shouldReturnCompleteTxWhenParameterTrue() { final NewBlockHeadersSubscription subscription = createSubscription(true); + final List subscriptions = Collections.singletonList(subscription); final List txHashList = transactionsWithMetadata(); final BlockWithMetadata testBlockWithMetadata = new BlockWithMetadata<>( @@ -141,9 +160,18 @@ public void shouldReturnCompleteTxWhenParameterTrue() { when(blockchainQueries.blockByHash(testBlockWithMetadata.getHeader().getHash())) .thenReturn(Optional.of(testBlockWithMetadata)); + doAnswer( + invocation -> { + Consumer> consumer = invocation.getArgument(2); + consumer.accept(subscriptions); + return null; + }) + .when(subscriptionManager) + .notifySubscribersOnWorkerThread(any(), any(), any()); + simulateAddingBlock(); - verify(subscriptionManager, timeout(10000)) + verify(subscriptionManager) .sendMessage(subscriptionIdCaptor.capture(), responseCaptor.capture()); final Long actualSubscriptionId = subscriptionIdCaptor.getValue(); final Object actualBlock = responseCaptor.getValue(); @@ -188,8 +216,6 @@ private List transactionsWithHashOnly() { private NewBlockHeadersSubscription createSubscription(final boolean includeTransactions) { final NewBlockHeadersSubscription headerSub = new NewBlockHeadersSubscription(1L, includeTransactions); - when(subscriptionManager.subscriptionsOfType(any(), any())) - .thenReturn(Lists.newArrayList(headerSub)); return headerSub; } } diff --git a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionServiceTest.java b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionServiceTest.java index 84c618ed73..4ea8d23881 100644 --- a/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionServiceTest.java +++ b/ethereum/jsonrpc/src/test/java/tech/pegasys/pantheon/ethereum/jsonrpc/websocket/subscription/syncing/SyncingSubscriptionServiceTest.java @@ -14,7 +14,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -25,8 +25,10 @@ import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.SubscriptionManager; import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType; -import com.google.common.collect.Lists; -import io.vertx.core.Vertx; +import java.util.Collections; +import java.util.List; +import java.util.function.Consumer; + import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -40,7 +42,6 @@ public class SyncingSubscriptionServiceTest { @Mock private SubscriptionManager subscriptionManager; @Mock private Synchronizer synchronizer; private SyncStatusListener syncStatusListener; - private final Vertx vertx = Vertx.vertx(); @Before public void before() { @@ -49,33 +50,47 @@ public void before() { when(synchronizer.observeSyncStatus(captor.capture())).thenReturn(1L); new SyncingSubscriptionService(subscriptionManager, synchronizer); syncStatusListener = captor.getValue(); - when(subscriptionManager.getVertx()).thenReturn(vertx); } @Test public void shouldSendSyncStatusWhenReceiveSyncStatus() { final SyncingSubscription subscription = new SyncingSubscription(9L, SubscriptionType.SYNCING); - when(subscriptionManager.subscriptionsOfType(any(), any())) - .thenReturn(Lists.newArrayList(subscription)); + final List subscriptions = Collections.singletonList(subscription); final SyncStatus syncStatus = new SyncStatus(0L, 1L, 3L); final SyncingResult expectedSyncingResult = new SyncingResult(syncStatus); + doAnswer( + invocation -> { + Consumer> consumer = invocation.getArgument(2); + consumer.accept(subscriptions); + return null; + }) + .when(subscriptionManager) + .notifySubscribersOnWorkerThread(any(), any(), any()); + syncStatusListener.onSyncStatus(syncStatus); - verify(subscriptionManager, timeout(10000)) - .sendMessage(eq(subscription.getId()), eq(expectedSyncingResult)); + verify(subscriptionManager).sendMessage(eq(subscription.getId()), eq(expectedSyncingResult)); } @Test public void shouldSendNotSyncingStatusWhenReceiveSyncStatusAtHead() { final SyncingSubscription subscription = new SyncingSubscription(9L, SubscriptionType.SYNCING); - when(subscriptionManager.subscriptionsOfType(any(), any())) - .thenReturn(Lists.newArrayList(subscription)); + final List subscriptions = Collections.singletonList(subscription); final SyncStatus syncStatus = new SyncStatus(0L, 1L, 1L); + doAnswer( + invocation -> { + Consumer> consumer = invocation.getArgument(2); + consumer.accept(subscriptions); + return null; + }) + .when(subscriptionManager) + .notifySubscribersOnWorkerThread(any(), any(), any()); + syncStatusListener.onSyncStatus(syncStatus); - verify(subscriptionManager, timeout(10000)) + verify(subscriptionManager) .sendMessage(eq(subscription.getId()), any(NotSynchronisingResult.class)); } }