Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
[PAN-2547] Refactored vertx thread handling to super + refactored tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
mark-terry committed Jun 6, 2019
1 parent c16a42b commit bb614fe
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -160,4 +161,22 @@ public void sendMessage(final Long subscriptionId, final JsonRpcResult msg) {
.findFirst()
.ifPresent(connectionId -> vertx.eventBus().send(connectionId, Json.encode(response)));
}

public <T> Void notifySubscribersOnWorkerThread(
final SubscriptionType subscriptionType,
final Class<T> clazz,
final Consumer<List<T>> runnable) {
vertx.executeBlocking(
future -> {
final List<T> syncingSubscriptions = subscriptionsOfType(subscriptionType, clazz);
runnable.accept(syncingSubscriptions);
future.complete();
},
result -> {
if (result.failed()) {
LOG.error("Failed to notify subscribers. Cause: {}", result.cause().getMessage());
}
});
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.SubscriptionManager;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType;

import java.util.List;

public class NewBlockHeadersSubscriptionService implements BlockAddedObserver {

private final SubscriptionManager subscriptionManager;
Expand All @@ -38,28 +36,21 @@ public NewBlockHeadersSubscriptionService(

@Override
public void onBlockAdded(final BlockAddedEvent event, final Blockchain blockchain) {
subscriptionManager
.getVertx()
.<Void>executeBlocking(
future -> {
final List<NewBlockHeadersSubscription> subscribers =
subscriptionManager.subscriptionsOfType(
SubscriptionType.NEW_BLOCK_HEADERS, NewBlockHeadersSubscription.class);

final Hash newBlockHash = event.getBlock().getHash();

for (final NewBlockHeadersSubscription subscription : subscribers) {
final BlockResult newBlock =
subscription.getIncludeTransactions()
? blockWithCompleteTransaction(newBlockHash)
: blockWithTransactionHash(newBlockHash);
subscriptionManager.notifySubscribersOnWorkerThread(
SubscriptionType.NEW_BLOCK_HEADERS,
NewBlockHeadersSubscription.class,
subscribers -> {
final Hash newBlockHash = event.getBlock().getHash();

subscriptionManager.sendMessage(subscription.getId(), newBlock);
}
for (final NewBlockHeadersSubscription subscription : subscribers) {
final BlockResult newBlock =
subscription.getIncludeTransactions()
? blockWithCompleteTransaction(newBlockHash)
: blockWithTransactionHash(newBlockHash);

future.complete();
},
result -> {});
subscriptionManager.sendMessage(subscription.getId(), newBlock);
}
});
}

private BlockResult blockWithCompleteTransaction(final Hash hash) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.SubscriptionManager;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType;

import java.util.List;

public class SyncingSubscriptionService {

private final SubscriptionManager subscriptionManager;
Expand All @@ -32,23 +30,17 @@ public SyncingSubscriptionService(
}

private void sendSyncingToMatchingSubscriptions(final SyncStatus syncStatus) {
subscriptionManager
.getVertx()
.<Void>executeBlocking(
future -> {
final List<Subscription> syncingSubscriptions =
subscriptionManager.subscriptionsOfType(
SubscriptionType.SYNCING, Subscription.class);

if (syncStatus.inSync()) {
syncingSubscriptions.forEach(
s -> subscriptionManager.sendMessage(s.getId(), new NotSynchronisingResult()));
} else {
syncingSubscriptions.forEach(
s -> subscriptionManager.sendMessage(s.getId(), new SyncingResult(syncStatus)));
}
future.complete();
},
result -> {});
subscriptionManager.notifySubscribersOnWorkerThread(
SubscriptionType.SYNCING,
Subscription.class,
syncingSubscriptions -> {
if (syncStatus.inSync()) {
syncingSubscriptions.forEach(
s -> subscriptionManager.sendMessage(s.getId(), new NotSynchronisingResult()));
} else {
syncingSubscriptions.forEach(
s -> subscriptionManager.sendMessage(s.getId(), new SyncingResult(syncStatus)));
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -40,9 +40,9 @@
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;

import com.google.common.collect.Lists;
import io.vertx.core.Vertx;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -66,18 +66,17 @@ public class NewBlockHeadersSubscriptionServiceTest {
private final TransactionTestFixture txTestFixture = new TransactionTestFixture();
private final BlockHeader blockHeader = blockHeaderTestFixture.buildHeader();
private final BlockResultFactory blockResultFactory = new BlockResultFactory();
private final Vertx vertx = Vertx.vertx();

@Before
public void before() {
newBlockHeadersSubscriptionService =
new NewBlockHeadersSubscriptionService(subscriptionManager, blockchainQueries);
when(subscriptionManager.getVertx()).thenReturn(vertx);
}

@Test
public void shouldSendMessageWhenBlockAdded() {
final NewBlockHeadersSubscription subscription = createSubscription(false);
final List<NewBlockHeadersSubscription> subscriptions = Collections.singletonList(subscription);
final BlockWithMetadata<Hash, Hash> testBlockWithMetadata =
new BlockWithMetadata<>(
blockHeader, Collections.emptyList(), Collections.emptyList(), UInt256.ONE, 1);
Expand All @@ -86,9 +85,18 @@ public void shouldSendMessageWhenBlockAdded() {
when(blockchainQueries.blockByHashWithTxHashes(testBlockWithMetadata.getHeader().getHash()))
.thenReturn(Optional.of(testBlockWithMetadata));

doAnswer(
invocation -> {
Consumer<List<NewBlockHeadersSubscription>> consumer = invocation.getArgument(2);
consumer.accept(subscriptions);
return null;
})
.when(subscriptionManager)
.notifySubscribersOnWorkerThread(any(), any(), any());

simulateAddingBlock();

verify(subscriptionManager, timeout(10000))
verify(subscriptionManager)
.sendMessage(subscriptionIdCaptor.capture(), responseCaptor.capture());
final Long actualSubscriptionId = subscriptionIdCaptor.getValue();
final Object actualBlock = responseCaptor.getValue();
Expand All @@ -102,6 +110,7 @@ public void shouldSendMessageWhenBlockAdded() {
@Test
public void shouldReturnTxHashesWhenIncludeTransactionsFalse() {
final NewBlockHeadersSubscription subscription = createSubscription(false);
final List<NewBlockHeadersSubscription> subscriptions = Collections.singletonList(subscription);
final List<Hash> txHashList = transactionsWithHashOnly();
final BlockWithMetadata<Hash, Hash> testBlockWithMetadata =
new BlockWithMetadata<>(blockHeader, txHashList, Collections.emptyList(), UInt256.ONE, 1);
Expand All @@ -110,9 +119,18 @@ public void shouldReturnTxHashesWhenIncludeTransactionsFalse() {
when(blockchainQueries.blockByHashWithTxHashes(testBlockWithMetadata.getHeader().getHash()))
.thenReturn(Optional.of(testBlockWithMetadata));

doAnswer(
invocation -> {
Consumer<List<NewBlockHeadersSubscription>> consumer = invocation.getArgument(2);
consumer.accept(subscriptions);
return null;
})
.when(subscriptionManager)
.notifySubscribersOnWorkerThread(any(), any(), any());

simulateAddingBlock();

verify(subscriptionManager, timeout(10000))
verify(subscriptionManager)
.sendMessage(subscriptionIdCaptor.capture(), responseCaptor.capture());
final Long actualSubscriptionId = subscriptionIdCaptor.getValue();
final Object actualBlock = responseCaptor.getValue();
Expand All @@ -131,6 +149,7 @@ public void shouldReturnTxHashesWhenIncludeTransactionsFalse() {
@Test
public void shouldReturnCompleteTxWhenParameterTrue() {
final NewBlockHeadersSubscription subscription = createSubscription(true);
final List<NewBlockHeadersSubscription> subscriptions = Collections.singletonList(subscription);
final List<TransactionWithMetadata> txHashList = transactionsWithMetadata();
final BlockWithMetadata<TransactionWithMetadata, Hash> testBlockWithMetadata =
new BlockWithMetadata<>(
Expand All @@ -141,9 +160,18 @@ public void shouldReturnCompleteTxWhenParameterTrue() {
when(blockchainQueries.blockByHash(testBlockWithMetadata.getHeader().getHash()))
.thenReturn(Optional.of(testBlockWithMetadata));

doAnswer(
invocation -> {
Consumer<List<NewBlockHeadersSubscription>> consumer = invocation.getArgument(2);
consumer.accept(subscriptions);
return null;
})
.when(subscriptionManager)
.notifySubscribersOnWorkerThread(any(), any(), any());

simulateAddingBlock();

verify(subscriptionManager, timeout(10000))
verify(subscriptionManager)
.sendMessage(subscriptionIdCaptor.capture(), responseCaptor.capture());
final Long actualSubscriptionId = subscriptionIdCaptor.getValue();
final Object actualBlock = responseCaptor.getValue();
Expand Down Expand Up @@ -188,8 +216,6 @@ private List<Hash> transactionsWithHashOnly() {
private NewBlockHeadersSubscription createSubscription(final boolean includeTransactions) {
final NewBlockHeadersSubscription headerSub =
new NewBlockHeadersSubscription(1L, includeTransactions);
when(subscriptionManager.subscriptionsOfType(any(), any()))
.thenReturn(Lists.newArrayList(headerSub));
return headerSub;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand All @@ -25,8 +25,10 @@
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.SubscriptionManager;
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType;

import com.google.common.collect.Lists;
import io.vertx.core.Vertx;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -40,7 +42,6 @@ public class SyncingSubscriptionServiceTest {
@Mock private SubscriptionManager subscriptionManager;
@Mock private Synchronizer synchronizer;
private SyncStatusListener syncStatusListener;
private final Vertx vertx = Vertx.vertx();

@Before
public void before() {
Expand All @@ -49,33 +50,47 @@ public void before() {
when(synchronizer.observeSyncStatus(captor.capture())).thenReturn(1L);
new SyncingSubscriptionService(subscriptionManager, synchronizer);
syncStatusListener = captor.getValue();
when(subscriptionManager.getVertx()).thenReturn(vertx);
}

@Test
public void shouldSendSyncStatusWhenReceiveSyncStatus() {
final SyncingSubscription subscription = new SyncingSubscription(9L, SubscriptionType.SYNCING);
when(subscriptionManager.subscriptionsOfType(any(), any()))
.thenReturn(Lists.newArrayList(subscription));
final List<SyncingSubscription> subscriptions = Collections.singletonList(subscription);
final SyncStatus syncStatus = new SyncStatus(0L, 1L, 3L);
final SyncingResult expectedSyncingResult = new SyncingResult(syncStatus);

doAnswer(
invocation -> {
Consumer<List<SyncingSubscription>> consumer = invocation.getArgument(2);
consumer.accept(subscriptions);
return null;
})
.when(subscriptionManager)
.notifySubscribersOnWorkerThread(any(), any(), any());

syncStatusListener.onSyncStatus(syncStatus);

verify(subscriptionManager, timeout(10000))
.sendMessage(eq(subscription.getId()), eq(expectedSyncingResult));
verify(subscriptionManager).sendMessage(eq(subscription.getId()), eq(expectedSyncingResult));
}

@Test
public void shouldSendNotSyncingStatusWhenReceiveSyncStatusAtHead() {
final SyncingSubscription subscription = new SyncingSubscription(9L, SubscriptionType.SYNCING);
when(subscriptionManager.subscriptionsOfType(any(), any()))
.thenReturn(Lists.newArrayList(subscription));
final List<SyncingSubscription> subscriptions = Collections.singletonList(subscription);
final SyncStatus syncStatus = new SyncStatus(0L, 1L, 1L);

doAnswer(
invocation -> {
Consumer<List<SyncingSubscription>> consumer = invocation.getArgument(2);
consumer.accept(subscriptions);
return null;
})
.when(subscriptionManager)
.notifySubscribersOnWorkerThread(any(), any(), any());

syncStatusListener.onSyncStatus(syncStatus);

verify(subscriptionManager, timeout(10000))
verify(subscriptionManager)
.sendMessage(eq(subscription.getId()), any(NotSynchronisingResult.class));
}
}

0 comments on commit bb614fe

Please sign in to comment.