Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
[PAN-2547] Changed executor to Vertx managed.
Browse files Browse the repository at this point in the history
  • Loading branch information
mark-terry committed Jun 6, 2019
1 parent d6cec72 commit c16a42b
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class NewBlockHeadersSubscriptionService implements BlockAddedObserver {

Expand All @@ -40,23 +38,28 @@ public NewBlockHeadersSubscriptionService(

@Override
public void onBlockAdded(final BlockAddedEvent event, final Blockchain blockchain) {
final ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() ->{
final List<NewBlockHeadersSubscription> subscribers =
subscriptionManager.subscriptionsOfType(
subscriptionManager
.getVertx()
.<Void>executeBlocking(
future -> {
final List<NewBlockHeadersSubscription> subscribers =
subscriptionManager.subscriptionsOfType(
SubscriptionType.NEW_BLOCK_HEADERS, NewBlockHeadersSubscription.class);

final Hash newBlockHash = event.getBlock().getHash();
final Hash newBlockHash = event.getBlock().getHash();

for (final NewBlockHeadersSubscription subscription : subscribers) {
final BlockResult newBlock =
subscription.getIncludeTransactions()
for (final NewBlockHeadersSubscription subscription : subscribers) {
final BlockResult newBlock =
subscription.getIncludeTransactions()
? blockWithCompleteTransaction(newBlockHash)
: blockWithTransactionHash(newBlockHash);

subscriptionManager.sendMessage(subscription.getId(), newBlock);
}
});
subscriptionManager.sendMessage(subscription.getId(), newBlock);
}

future.complete();
},
result -> {});
}

private BlockResult blockWithCompleteTransaction(final Hash hash) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SyncingSubscriptionService {

Expand All @@ -34,18 +32,23 @@ public SyncingSubscriptionService(
}

private void sendSyncingToMatchingSubscriptions(final SyncStatus syncStatus) {
final ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
final List<Subscription> syncingSubscriptions =
subscriptionManager.subscriptionsOfType(SubscriptionType.SYNCING, Subscription.class);
subscriptionManager
.getVertx()
.<Void>executeBlocking(
future -> {
final List<Subscription> syncingSubscriptions =
subscriptionManager.subscriptionsOfType(
SubscriptionType.SYNCING, Subscription.class);

if (syncStatus.inSync()) {
syncingSubscriptions.forEach(
s -> subscriptionManager.sendMessage(s.getId(), new NotSynchronisingResult()));
} else {
syncingSubscriptions.forEach(
s -> subscriptionManager.sendMessage(s.getId(), new SyncingResult(syncStatus)));
}
});
if (syncStatus.inSync()) {
syncingSubscriptions.forEach(
s -> subscriptionManager.sendMessage(s.getId(), new NotSynchronisingResult()));
} else {
syncingSubscriptions.forEach(
s -> subscriptionManager.sendMessage(s.getId(), new SyncingResult(syncStatus)));
}
future.complete();
},
result -> {});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.Optional;

import com.google.common.collect.Lists;
import io.vertx.core.Vertx;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -65,11 +66,13 @@ public class NewBlockHeadersSubscriptionServiceTest {
private final TransactionTestFixture txTestFixture = new TransactionTestFixture();
private final BlockHeader blockHeader = blockHeaderTestFixture.buildHeader();
private final BlockResultFactory blockResultFactory = new BlockResultFactory();
private final Vertx vertx = Vertx.vertx();

@Before
public void before() {
newBlockHeadersSubscriptionService =
new NewBlockHeadersSubscriptionService(subscriptionManager, blockchainQueries);
when(subscriptionManager.getVertx()).thenReturn(vertx);
}

@Test
Expand All @@ -85,7 +88,7 @@ public void shouldSendMessageWhenBlockAdded() {

simulateAddingBlock();

verify(subscriptionManager, timeout(100))
verify(subscriptionManager, timeout(10000))
.sendMessage(subscriptionIdCaptor.capture(), responseCaptor.capture());
final Long actualSubscriptionId = subscriptionIdCaptor.getValue();
final Object actualBlock = responseCaptor.getValue();
Expand All @@ -109,7 +112,7 @@ public void shouldReturnTxHashesWhenIncludeTransactionsFalse() {

simulateAddingBlock();

verify(subscriptionManager, timeout(100))
verify(subscriptionManager, timeout(10000))
.sendMessage(subscriptionIdCaptor.capture(), responseCaptor.capture());
final Long actualSubscriptionId = subscriptionIdCaptor.getValue();
final Object actualBlock = responseCaptor.getValue();
Expand Down Expand Up @@ -140,7 +143,7 @@ public void shouldReturnCompleteTxWhenParameterTrue() {

simulateAddingBlock();

verify(subscriptionManager, timeout(100))
verify(subscriptionManager, timeout(10000))
.sendMessage(subscriptionIdCaptor.capture(), responseCaptor.capture());
final Long actualSubscriptionId = subscriptionIdCaptor.getValue();
final Object actualBlock = responseCaptor.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import tech.pegasys.pantheon.ethereum.jsonrpc.websocket.subscription.request.SubscriptionType;

import com.google.common.collect.Lists;
import io.vertx.core.Vertx;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -39,6 +40,7 @@ public class SyncingSubscriptionServiceTest {
@Mock private SubscriptionManager subscriptionManager;
@Mock private Synchronizer synchronizer;
private SyncStatusListener syncStatusListener;
private final Vertx vertx = Vertx.vertx();

@Before
public void before() {
Expand All @@ -47,6 +49,7 @@ public void before() {
when(synchronizer.observeSyncStatus(captor.capture())).thenReturn(1L);
new SyncingSubscriptionService(subscriptionManager, synchronizer);
syncStatusListener = captor.getValue();
when(subscriptionManager.getVertx()).thenReturn(vertx);
}

@Test
Expand All @@ -59,7 +62,8 @@ public void shouldSendSyncStatusWhenReceiveSyncStatus() {

syncStatusListener.onSyncStatus(syncStatus);

verify(subscriptionManager, timeout(100)).sendMessage(eq(subscription.getId()), eq(expectedSyncingResult));
verify(subscriptionManager, timeout(10000))
.sendMessage(eq(subscription.getId()), eq(expectedSyncingResult));
}

@Test
Expand All @@ -71,7 +75,7 @@ public void shouldSendNotSyncingStatusWhenReceiveSyncStatusAtHead() {

syncStatusListener.onSyncStatus(syncStatus);

verify(subscriptionManager, timeout(100))
verify(subscriptionManager, timeout(10000))
.sendMessage(eq(subscription.getId()), any(NotSynchronisingResult.class));
}
}

0 comments on commit c16a42b

Please sign in to comment.