diff --git a/ethereum/eth/build.gradle b/ethereum/eth/build.gradle index 455ed8d1448..89817a2853a 100644 --- a/ethereum/eth/build.gradle +++ b/ethereum/eth/build.gradle @@ -33,7 +33,7 @@ dependencies { implementation project(':ethereum:permissioning') implementation project(':metrics') implementation project(':services:kvstore') - implementation project(':services:queue') + implementation project(':services:tasks') implementation 'io.vertx:vertx-core' implementation 'com.google.guava:guava' diff --git a/ethereum/eth/src/jmh/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderBenchmark.java b/ethereum/eth/src/jmh/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderBenchmark.java index c796ab6b9b4..ee328bfb9a4 100644 --- a/ethereum/eth/src/jmh/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderBenchmark.java +++ b/ethereum/eth/src/jmh/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderBenchmark.java @@ -32,8 +32,8 @@ import tech.pegasys.pantheon.metrics.MetricsSystem; import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage; -import tech.pegasys.pantheon.services.queue.RocksDbTaskQueue; -import tech.pegasys.pantheon.services.queue.TaskQueue; +import tech.pegasys.pantheon.services.tasks.CachingTaskCollection; +import tech.pegasys.pantheon.services.tasks.RocksDbTaskQueue; import tech.pegasys.pantheon.util.bytes.BytesValue; import java.nio.file.Path; @@ -61,7 +61,7 @@ public class WorldStateDownloaderBenchmark { private WorldStateStorage worldStateStorage; private RespondingEthPeer peer; private Responder responder; - private TaskQueue pendingRequests; + private CachingTaskCollection pendingRequests; private StorageProvider storageProvider; private EthProtocolManager ethProtocolManager; private InMemoryKeyValueStorage remoteKeyValueStorage; @@ -89,11 +89,13 @@ public void setUpUnchangedState() throws Exception { worldStateStorage = storageProvider.createWorldStateStorage(); pendingRequests = - RocksDbTaskQueue.create( - tempDir.resolve("fastsync"), - NodeDataRequest::serialize, - NodeDataRequest::deserialize, - metricsSystem); + new CachingTaskCollection<>( + RocksDbTaskQueue.create( + tempDir.resolve("fastsync"), + NodeDataRequest::serialize, + NodeDataRequest::deserialize, + metricsSystem), + 0); worldStateDownloader = new WorldStateDownloader( ethContext, diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java index 1af576dec04..2f603d5bd6f 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/FastSynchronizer.java @@ -25,9 +25,10 @@ import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule; import tech.pegasys.pantheon.ethereum.mainnet.ScheduleBasedBlockHashFunction; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage; +import tech.pegasys.pantheon.metrics.MetricCategory; import tech.pegasys.pantheon.metrics.MetricsSystem; -import tech.pegasys.pantheon.services.queue.RocksDbTaskQueue; -import tech.pegasys.pantheon.services.queue.TaskQueue; +import tech.pegasys.pantheon.services.tasks.CachingTaskCollection; +import tech.pegasys.pantheon.services.tasks.RocksDbTaskQueue; import java.io.File; import java.io.IOException; @@ -45,19 +46,19 @@ class FastSynchronizer { private final FastSyncDownloader fastSyncDownloader; private final Path fastSyncDataDirectory; - private final TaskQueue stateQueue; + private final CachingTaskCollection taskCollection; private final WorldStateDownloader worldStateDownloader; private final FastSyncState initialSyncState; private FastSynchronizer( final FastSyncDownloader fastSyncDownloader, final Path fastSyncDataDirectory, - final TaskQueue stateQueue, + final CachingTaskCollection taskCollection, final WorldStateDownloader worldStateDownloader, final FastSyncState initialSyncState) { this.fastSyncDownloader = fastSyncDownloader; this.fastSyncDataDirectory = fastSyncDataDirectory; - this.stateQueue = stateQueue; + this.taskCollection = taskCollection; this.worldStateDownloader = worldStateDownloader; this.initialSyncState = initialSyncState; } @@ -88,13 +89,14 @@ public static Optional> create( return Optional.empty(); } - final TaskQueue stateQueue = - createWorldStateDownloaderQueue(getStateQueueDirectory(dataDirectory), metricsSystem); + final CachingTaskCollection taskCollection = + createWorldStateDownloaderTaskCollection( + getStateQueueDirectory(dataDirectory), metricsSystem); final WorldStateDownloader worldStateDownloader = new WorldStateDownloader( ethContext, worldStateStorage, - stateQueue, + taskCollection, syncConfig.getWorldStateHashCountPerRequest(), syncConfig.getWorldStateRequestParallelism(), syncConfig.getWorldStateMaxRequestsWithoutProgress(), @@ -114,7 +116,7 @@ public static Optional> create( new FastSynchronizer<>( fastSyncDownloader, fastSyncDataDirectory, - stateQueue, + taskCollection, worldStateDownloader, fastSyncState)); } @@ -128,7 +130,7 @@ public void deleteFastSyncState() { // Make sure downloader is stopped before we start cleaning up its dependencies worldStateDownloader.cancel(); try { - stateQueue.close(); + taskCollection.close(); if (fastSyncDataDirectory.toFile().exists()) { // Clean up this data for now (until fast sync resume functionality is in place) MoreFiles.deleteRecursively(fastSyncDataDirectory, RecursiveDeleteOption.ALLOW_INSECURE); @@ -156,9 +158,33 @@ private static void ensureDirectoryExists(final File dir) { } } - private static TaskQueue createWorldStateDownloaderQueue( + private static CachingTaskCollection createWorldStateDownloaderTaskCollection( final Path dataDirectory, final MetricsSystem metricsSystem) { - return RocksDbTaskQueue.create( - dataDirectory, NodeDataRequest::serialize, NodeDataRequest::deserialize, metricsSystem); + final CachingTaskCollection taskCollection = + new CachingTaskCollection<>( + RocksDbTaskQueue.create( + dataDirectory, + NodeDataRequest::serialize, + NodeDataRequest::deserialize, + metricsSystem)); + + metricsSystem.createLongGauge( + MetricCategory.SYNCHRONIZER, + "world_state_pending_requests_current", + "Number of pending requests for fast sync world state download", + taskCollection::size); + + metricsSystem.createIntegerGauge( + MetricCategory.SYNCHRONIZER, + "world_state_pending_requests_cache_size", + "Pending request cache size for fast sync world state download", + taskCollection::cacheSize); + + // We're using the CachingTaskCollection which isn't designed to reliably persist all + // added tasks. We therefore can't resume from previously added tasks. + // So for now, clear tasks when we start up. + taskCollection.clear(); + + return taskCollection; } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadState.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadState.java index 9a154cab13b..968c84df7b5 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadState.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadState.java @@ -16,8 +16,8 @@ import tech.pegasys.pantheon.ethereum.eth.manager.task.EthTask; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater; -import tech.pegasys.pantheon.services.queue.TaskQueue; -import tech.pegasys.pantheon.services.queue.TaskQueue.Task; +import tech.pegasys.pantheon.services.tasks.CachingTaskCollection; +import tech.pegasys.pantheon.services.tasks.Task; import tech.pegasys.pantheon.util.ExceptionUtils; import tech.pegasys.pantheon.util.bytes.BytesValue; @@ -38,7 +38,7 @@ class WorldDownloadState { private static final Logger LOG = LogManager.getLogger(); private final boolean downloadWasResumed; - private final TaskQueue pendingRequests; + private final CachingTaskCollection pendingRequests; private final ArrayBlockingQueue> requestsToPersist; private final int maxOutstandingRequests; private final int maxRequestsWithoutProgress; @@ -54,7 +54,7 @@ class WorldDownloadState { private EthTask persistenceTask; public WorldDownloadState( - final TaskQueue pendingRequests, + final CachingTaskCollection pendingRequests, final ArrayBlockingQueue> requestsToPersist, final int maxOutstandingRequests, final int maxRequestsWithoutProgress) { @@ -151,13 +151,13 @@ public synchronized void setPersistenceTask(final EthTask persistenceTask) { public synchronized void enqueueRequest(final NodeDataRequest request) { if (!internalFuture.isDone()) { - pendingRequests.enqueue(request); + pendingRequests.add(request); } } public synchronized void enqueueRequests(final Collection requests) { if (!internalFuture.isDone()) { - requests.forEach(pendingRequests::enqueue); + requests.forEach(pendingRequests::add); } } @@ -165,7 +165,7 @@ public synchronized Task dequeueRequest() { if (internalFuture.isDone()) { return null; } - return pendingRequests.dequeue(); + return pendingRequests.remove(); } public synchronized void setRootNodeData(final BytesValue rootNodeData) { diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java index 46f768f83f1..391e24a44db 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java @@ -27,8 +27,8 @@ import tech.pegasys.pantheon.metrics.Counter; import tech.pegasys.pantheon.metrics.MetricCategory; import tech.pegasys.pantheon.metrics.MetricsSystem; -import tech.pegasys.pantheon.services.queue.TaskQueue; -import tech.pegasys.pantheon.services.queue.TaskQueue.Task; +import tech.pegasys.pantheon.services.tasks.CachingTaskCollection; +import tech.pegasys.pantheon.services.tasks.Task; import tech.pegasys.pantheon.util.ExceptionUtils; import tech.pegasys.pantheon.util.bytes.BytesValue; @@ -61,7 +61,7 @@ public class WorldStateDownloader { private final MetricsSystem metricsSystem; private final EthContext ethContext; - private final TaskQueue taskQueue; + private final CachingTaskCollection taskCollection; private final int hashCountPerRequest; private final int maxOutstandingRequests; private final int maxNodeRequestsWithoutProgress; @@ -72,25 +72,19 @@ public class WorldStateDownloader { public WorldStateDownloader( final EthContext ethContext, final WorldStateStorage worldStateStorage, - final TaskQueue taskQueue, + final CachingTaskCollection taskCollection, final int hashCountPerRequest, final int maxOutstandingRequests, final int maxNodeRequestsWithoutProgress, final MetricsSystem metricsSystem) { this.ethContext = ethContext; this.worldStateStorage = worldStateStorage; - this.taskQueue = taskQueue; + this.taskCollection = taskCollection; this.hashCountPerRequest = hashCountPerRequest; this.maxOutstandingRequests = maxOutstandingRequests; this.maxNodeRequestsWithoutProgress = maxNodeRequestsWithoutProgress; this.metricsSystem = metricsSystem; - metricsSystem.createLongGauge( - MetricCategory.SYNCHRONIZER, - "world_state_pending_requests_current", - "Number of pending requests for fast sync world state download", - taskQueue::size); - completedRequestsCounter = metricsSystem.createCounter( MetricCategory.SYNCHRONIZER, @@ -159,7 +153,7 @@ public CompletableFuture run(final BlockHeader header) { final int persistenceQueueCapacity = hashCountPerRequest * maxOutstandingRequests * 2; final WorldDownloadState newDownloadState = new WorldDownloadState( - taskQueue, + taskCollection, new ArrayBlockingQueue<>(persistenceQueueCapacity), maxOutstandingRequests, maxNodeRequestsWithoutProgress); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadStateTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadStateTest.java index e5872d231e8..691f4b76ca4 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadStateTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldDownloadStateTest.java @@ -28,8 +28,9 @@ import tech.pegasys.pantheon.ethereum.storage.keyvalue.KeyValueStorageWorldStateStorage; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage; import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage; -import tech.pegasys.pantheon.services.queue.InMemoryTaskQueue; -import tech.pegasys.pantheon.services.queue.TaskQueue.Task; +import tech.pegasys.pantheon.services.tasks.CachingTaskCollection; +import tech.pegasys.pantheon.services.tasks.InMemoryTaskQueue; +import tech.pegasys.pantheon.services.tasks.Task; import tech.pegasys.pantheon.util.bytes.BytesValue; import java.util.Arrays; @@ -51,7 +52,8 @@ public class WorldDownloadStateTest { private final BlockHeader header = new BlockHeaderTestFixture().stateRoot(ROOT_NODE_HASH).buildHeader(); - private final InMemoryTaskQueue pendingRequests = new InMemoryTaskQueue<>(); + private final CachingTaskCollection pendingRequests = + new CachingTaskCollection<>(new InMemoryTaskQueue<>()); private final ArrayBlockingQueue> requestsToPersist = new ArrayBlockingQueue<>(100); @@ -94,7 +96,7 @@ public void shouldStoreRootNodeBeforeReturnedFutureCompletes() { @Test public void shouldNotCompleteWhenThereArePendingTasks() { - pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY_TRIE_HASH)); + pendingRequests.add(createAccountDataRequest(Hash.EMPTY_TRIE_HASH)); downloadState.checkCompletion(worldStateStorage, header); @@ -115,8 +117,8 @@ public void shouldCancelOutstandingTasksWhenFutureIsCancelled() { downloadState.addOutstandingTask(outstandingTask1); downloadState.addOutstandingTask(outstandingTask2); - pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY_TRIE_HASH)); - pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY)); + pendingRequests.add(createAccountDataRequest(Hash.EMPTY_TRIE_HASH)); + pendingRequests.add(createAccountDataRequest(Hash.EMPTY)); requestsToPersist.add(toPersist1); requestsToPersist.add(toPersist2); @@ -133,7 +135,7 @@ public void shouldCancelOutstandingTasksWhenFutureIsCancelled() { @Test public void shouldNotSendAdditionalRequestsWhenWaitingForANewPeer() { - pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY_TRIE_HASH)); + pendingRequests.add(createAccountDataRequest(Hash.EMPTY_TRIE_HASH)); downloadState.setWaitingForNewPeer(true); downloadState.whileAdditionalRequestsCanBeSent(mustNotBeCalled()); @@ -141,7 +143,7 @@ public void shouldNotSendAdditionalRequestsWhenWaitingForANewPeer() { @Test public void shouldResumeSendingAdditionalRequestsWhenNoLongerWaitingForPeer() { - pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY_TRIE_HASH)); + pendingRequests.add(createAccountDataRequest(Hash.EMPTY_TRIE_HASH)); final Runnable sendRequest = mockWithAction(() -> downloadState.addOutstandingTask(mock(EthTask.class))); @@ -155,10 +157,10 @@ public void shouldResumeSendingAdditionalRequestsWhenNoLongerWaitingForPeer() { @Test public void shouldStopSendingAdditionalRequestsWhenPendingRequestsIsEmpty() { - pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY_TRIE_HASH)); - pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY_TRIE_HASH)); + pendingRequests.add(createAccountDataRequest(Hash.EMPTY_TRIE_HASH)); + pendingRequests.add(createAccountDataRequest(Hash.EMPTY_TRIE_HASH)); - final Runnable sendRequest = mockWithAction(pendingRequests::dequeue); + final Runnable sendRequest = mockWithAction(pendingRequests::remove); downloadState.whileAdditionalRequestsCanBeSent(sendRequest); verify(sendRequest, times(2)).run(); @@ -166,7 +168,7 @@ public void shouldStopSendingAdditionalRequestsWhenPendingRequestsIsEmpty() { @Test public void shouldStopSendingAdditionalRequestsWhenMaximumOutstandingRequestCountReached() { - pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY_TRIE_HASH)); + pendingRequests.add(createAccountDataRequest(Hash.EMPTY_TRIE_HASH)); final Runnable sendRequest = mockWithAction(() -> downloadState.addOutstandingTask(mock(EthTask.class))); @@ -176,7 +178,7 @@ public void shouldStopSendingAdditionalRequestsWhenMaximumOutstandingRequestCoun @Test public void shouldStopSendingAdditionalRequestsWhenFutureIsCancelled() { - pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY_TRIE_HASH)); + pendingRequests.add(createAccountDataRequest(Hash.EMPTY_TRIE_HASH)); final Runnable sendRequest = mockWithAction(() -> future.cancel(true)); downloadState.whileAdditionalRequestsCanBeSent(sendRequest); @@ -185,7 +187,7 @@ public void shouldStopSendingAdditionalRequestsWhenFutureIsCancelled() { @Test public void shouldStopSendingAdditionalRequestsWhenDownloadIsMarkedAsStalled() { - pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY_TRIE_HASH)); + pendingRequests.add(createAccountDataRequest(Hash.EMPTY_TRIE_HASH)); final Runnable sendRequest = mockWithAction(() -> downloadState.requestComplete(false)); downloadState.whileAdditionalRequestsCanBeSent(sendRequest); @@ -210,7 +212,7 @@ public void shouldResetRequestsSinceProgressCountWhenProgressIsMade() { @Test public void shouldNotAllowMultipleCallsToSendAdditionalRequestsAtOnce() { - pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY_TRIE_HASH)); + pendingRequests.add(createAccountDataRequest(Hash.EMPTY_TRIE_HASH)); final Runnable sendRequest = mockWithAction( () -> { @@ -223,7 +225,7 @@ public void shouldNotAllowMultipleCallsToSendAdditionalRequestsAtOnce() { } @Test - public void shouldNotEnqueueRequestsAfterDownloadIsStalled() { + public void shouldNotAddRequestsAfterDownloadIsStalled() { downloadState.checkCompletion(worldStateStorage, header); downloadState.enqueueRequests(Arrays.asList(createAccountDataRequest(Hash.EMPTY_TRIE_HASH))); @@ -235,7 +237,7 @@ public void shouldNotEnqueueRequestsAfterDownloadIsStalled() { @Test // Sanity check for the test structure public void shouldFailWhenMustNotBeCalledIsCalled() { - pendingRequests.enqueue(createAccountDataRequest(Hash.EMPTY_TRIE_HASH)); + pendingRequests.add(createAccountDataRequest(Hash.EMPTY_TRIE_HASH)); assertThatThrownBy(() -> downloadState.whileAdditionalRequestsCanBeSent(mustNotBeCalled())) .hasMessage("Unexpected invocation"); } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java index 68a08bdf78b..66606abd770 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java @@ -58,8 +58,8 @@ import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater; import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage; -import tech.pegasys.pantheon.services.queue.InMemoryTaskQueue; -import tech.pegasys.pantheon.services.queue.TaskQueue; +import tech.pegasys.pantheon.services.tasks.CachingTaskCollection; +import tech.pegasys.pantheon.services.tasks.InMemoryTaskQueue; import tech.pegasys.pantheon.util.bytes.Bytes32; import tech.pegasys.pantheon.util.bytes.BytesValue; import tech.pegasys.pantheon.util.uint.UInt256; @@ -152,11 +152,12 @@ public void downloadEmptyWorldState() { .limit(5) .collect(Collectors.toList()); - final TaskQueue queue = new InMemoryTaskQueue<>(); + final CachingTaskCollection taskCollection = + new CachingTaskCollection<>(new InMemoryTaskQueue<>()); final WorldStateStorage localStorage = new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); final WorldStateDownloader downloader = - createDownloader(ethProtocolManager.ethContext(), localStorage, queue); + createDownloader(ethProtocolManager.ethContext(), localStorage, taskCollection); final CompletableFuture future = downloader.run(header); assertThat(future).isDone(); @@ -192,9 +193,10 @@ public void downloadAlreadyAvailableWorldState() { .limit(5) .collect(Collectors.toList()); - final TaskQueue queue = new InMemoryTaskQueue<>(); + final CachingTaskCollection taskCollection = + new CachingTaskCollection<>(new InMemoryTaskQueue<>()); final WorldStateDownloader downloader = - createDownloader(ethProtocolManager.ethContext(), storage, queue); + createDownloader(ethProtocolManager.ethContext(), storage, taskCollection); final CompletableFuture future = downloader.run(header); assertThat(future).isDone(); @@ -234,11 +236,12 @@ public void canRecoverFromTimeouts() { .limit(5) .collect(Collectors.toList()); - final TaskQueue queue = new InMemoryTaskQueue<>(); + final CachingTaskCollection taskCollection = + new CachingTaskCollection<>(new InMemoryTaskQueue<>()); final WorldStateStorage localStorage = new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); final WorldStateDownloader downloader = - createDownloader(ethProtocolManager.ethContext(), localStorage, queue); + createDownloader(ethProtocolManager.ethContext(), localStorage, taskCollection); final CompletableFuture result = downloader.run(header); @@ -292,7 +295,8 @@ public void doesNotRequestKnownCodeFromNetwork() { .limit(5) .collect(Collectors.toList()); - final TaskQueue queue = new InMemoryTaskQueue<>(); + final CachingTaskCollection taskCollection = + new CachingTaskCollection<>(new InMemoryTaskQueue<>()); final WorldStateStorage localStorage = new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); @@ -304,7 +308,7 @@ public void doesNotRequestKnownCodeFromNetwork() { localStorageUpdater.commit(); final WorldStateDownloader downloader = - createDownloader(ethProtocolManager.ethContext(), localStorage, queue); + createDownloader(ethProtocolManager.ethContext(), localStorage, taskCollection); final CompletableFuture result = downloader.run(header); @@ -377,12 +381,13 @@ private void testCancellation(final boolean shouldCancelFuture) { .limit(5) .collect(Collectors.toList()); - final TaskQueue queue = spy(new InMemoryTaskQueue<>()); + final CachingTaskCollection taskCollection = + spy(new CachingTaskCollection<>(new InMemoryTaskQueue<>())); final WorldStateStorage localStorage = new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); final WorldStateDownloader downloader = - createDownloader(ethProtocolManager.ethContext(), localStorage, queue); + createDownloader(ethProtocolManager.ethContext(), localStorage, taskCollection); final CompletableFuture result = downloader.run(header); @@ -398,8 +403,8 @@ private void testCancellation(final boolean shouldCancelFuture) { } assertThat(result.isDone()).isFalse(); // Sanity check - // Reset queue so we can track interactions after the cancellation - reset(queue); + // Reset taskCollection so we can track interactions after the cancellation + reset(taskCollection); if (shouldCancelFuture) { result.cancel(true); } else { @@ -418,9 +423,9 @@ private void testCancellation(final boolean shouldCancelFuture) { // Now allow the persistence service to run which should exit immediately serviceExecutor.runPendingFutures(); - verify(queue, times(1)).clear(); - verify(queue, never()).dequeue(); - verify(queue, never()).enqueue(any()); + verify(taskCollection, times(1)).clear(); + verify(taskCollection, never()).remove(); + verify(taskCollection, never()).add(any()); // Target world state should not be available assertThat(localStorage.isWorldStateAvailable(header.getStateRoot())).isFalse(); } @@ -450,7 +455,8 @@ public void doesNotRequestKnownAccountTrieNodesFromNetwork() { .limit(5) .collect(Collectors.toList()); - final TaskQueue queue = new InMemoryTaskQueue<>(); + final CachingTaskCollection taskCollection = + new CachingTaskCollection<>(new InMemoryTaskQueue<>()); final WorldStateStorage localStorage = new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); @@ -475,7 +481,7 @@ public void doesNotRequestKnownAccountTrieNodesFromNetwork() { localStorageUpdater.commit(); final WorldStateDownloader downloader = - createDownloader(ethProtocolManager.ethContext(), localStorage, queue); + createDownloader(ethProtocolManager.ethContext(), localStorage, taskCollection); final CompletableFuture result = downloader.run(header); @@ -536,7 +542,8 @@ public void doesNotRequestKnownStorageTrieNodesFromNetwork() { .limit(5) .collect(Collectors.toList()); - final TaskQueue queue = new InMemoryTaskQueue<>(); + final CachingTaskCollection taskCollection = + new CachingTaskCollection<>(new InMemoryTaskQueue<>()); final WorldStateStorage localStorage = new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); @@ -576,7 +583,7 @@ public void doesNotRequestKnownStorageTrieNodesFromNetwork() { localStorageUpdater.commit(); final WorldStateDownloader downloader = - createDownloader(ethProtocolManager.ethContext(), localStorage, queue); + createDownloader(ethProtocolManager.ethContext(), localStorage, taskCollection); final CompletableFuture result = downloader.run(header); @@ -632,13 +639,14 @@ public void stalledDownloader() { final BlockHeader header = dataGen.block(BlockOptions.create().setStateRoot(stateRoot).setBlockNumber(10)).getHeader(); - final TaskQueue queue = new InMemoryTaskQueue<>(); + final CachingTaskCollection taskCollection = + new CachingTaskCollection<>(new InMemoryTaskQueue<>()); final WorldStateStorage localStorage = new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); final SynchronizerConfiguration syncConfig = SynchronizerConfiguration.builder().worldStateMaxRequestsWithoutProgress(10).build(); final WorldStateDownloader downloader = - createDownloader(syncConfig, ethProtocolManager.ethContext(), localStorage, queue); + createDownloader(syncConfig, ethProtocolManager.ethContext(), localStorage, taskCollection); // Create a peer that can respond final RespondingEthPeer peer = @@ -687,17 +695,18 @@ public void resumesFromNonEmptyQueue() { final BlockHeader header = dataGen.block(BlockOptions.create().setStateRoot(stateRoot).setBlockNumber(10)).getHeader(); - // Add some nodes to the queue - final TaskQueue queue = spy(new InMemoryTaskQueue<>()); + // Add some nodes to the taskCollection + final CachingTaskCollection taskCollection = + spy(new CachingTaskCollection<>(new InMemoryTaskQueue<>())); List queuedHashes = getFirstSetOfChildNodeRequests(remoteStorage, stateRoot); assertThat(queuedHashes.size()).isGreaterThan(0); // Sanity check for (Bytes32 bytes32 : queuedHashes) { - queue.enqueue(new AccountTrieNodeDataRequest(Hash.wrap(bytes32))); + taskCollection.add(new AccountTrieNodeDataRequest(Hash.wrap(bytes32))); } // Sanity check for (Bytes32 bytes32 : queuedHashes) { final Hash hash = Hash.wrap(bytes32); - verify(queue, times(1)).enqueue(argThat((r) -> r.getHash().equals(hash))); + verify(taskCollection, times(1)).add(argThat((r) -> r.getHash().equals(hash))); } final WorldStateStorage localStorage = @@ -705,7 +714,7 @@ public void resumesFromNonEmptyQueue() { final SynchronizerConfiguration syncConfig = SynchronizerConfiguration.builder().worldStateMaxRequestsWithoutProgress(10).build(); final WorldStateDownloader downloader = - createDownloader(syncConfig, ethProtocolManager.ethContext(), localStorage, queue); + createDownloader(syncConfig, ethProtocolManager.ethContext(), localStorage, taskCollection); // Create a peer that can respond final RespondingEthPeer peer = @@ -738,7 +747,7 @@ public void resumesFromNonEmptyQueue() { // Check that already enqueued requests were not enqueued more than once for (Bytes32 bytes32 : queuedHashes) { final Hash hash = Hash.wrap(bytes32); - verify(queue, times(1)).enqueue(argThat((r) -> r.getHash().equals(hash))); + verify(taskCollection, times(1)).add(argThat((r) -> r.getHash().equals(hash))); } // Check that all expected account data was downloaded @@ -837,7 +846,8 @@ private void downloadAvailableWorldStateFromPeers( .getHeader(); assertThat(otherStateRoot).isNotEqualTo(stateRoot); // Sanity check - final TaskQueue queue = new InMemoryTaskQueue<>(); + final CachingTaskCollection taskCollection = + new CachingTaskCollection<>(new InMemoryTaskQueue<>()); final WorldStateStorage localStorage = new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); final WorldStateArchive localWorldStateArchive = new WorldStateArchive(localStorage); @@ -847,7 +857,7 @@ private void downloadAvailableWorldStateFromPeers( .worldStateRequestParallelism(maxOutstandingRequests) .build(); final WorldStateDownloader downloader = - createDownloader(syncConfig, ethProtocolManager.ethContext(), localStorage, queue); + createDownloader(syncConfig, ethProtocolManager.ethContext(), localStorage, taskCollection); // Create some peers that can respond final List usefulPeers = @@ -977,19 +987,20 @@ private void assertAccountsMatch( private WorldStateDownloader createDownloader( final EthContext context, final WorldStateStorage storage, - final TaskQueue queue) { - return createDownloader(SynchronizerConfiguration.builder().build(), context, storage, queue); + final CachingTaskCollection taskCollection) { + return createDownloader( + SynchronizerConfiguration.builder().build(), context, storage, taskCollection); } private WorldStateDownloader createDownloader( final SynchronizerConfiguration config, final EthContext context, final WorldStateStorage storage, - final TaskQueue queue) { + final CachingTaskCollection taskCollection) { return new WorldStateDownloader( context, storage, - queue, + taskCollection, config.getWorldStateHashCountPerRequest(), config.getWorldStateRequestParallelism(), config.getWorldStateMaxRequestsWithoutProgress(), diff --git a/services/queue/build.gradle b/services/tasks/build.gradle similarity index 97% rename from services/queue/build.gradle rename to services/tasks/build.gradle index cae665e5b86..679ac85eefc 100644 --- a/services/queue/build.gradle +++ b/services/tasks/build.gradle @@ -14,7 +14,7 @@ apply plugin: 'java-library' jar { - baseName 'pantheon-queue' + baseName 'pantheon-tasks' manifest { attributes( 'Specification-Title': baseName, diff --git a/services/queue/src/jmh/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueueBenchmark.java b/services/tasks/src/jmh/java/tech/pegasys/pantheon/services/tasks/RocksDbTaskQueueBenchmark.java similarity index 91% rename from services/queue/src/jmh/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueueBenchmark.java rename to services/tasks/src/jmh/java/tech/pegasys/pantheon/services/tasks/RocksDbTaskQueueBenchmark.java index 850b536a072..c28643159c0 100644 --- a/services/queue/src/jmh/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueueBenchmark.java +++ b/services/tasks/src/jmh/java/tech/pegasys/pantheon/services/tasks/RocksDbTaskQueueBenchmark.java @@ -10,10 +10,9 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package tech.pegasys.pantheon.services.queue; +package tech.pegasys.pantheon.services.tasks; import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; -import tech.pegasys.pantheon.services.queue.TaskQueue.Task; import tech.pegasys.pantheon.util.bytes.BytesValue; import tech.pegasys.pantheon.util.uint.UInt256; @@ -44,7 +43,7 @@ public void prepare() { RocksDbTaskQueue.create( tempDir.toPath(), Function.identity(), Function.identity(), new NoOpMetricsSystem()); for (int i = 0; i < 1_000_000; i++) { - queue.enqueue(UInt256.of(i).getBytes()); + queue.add(UInt256.of(i).getBytes()); } } @@ -56,6 +55,6 @@ public void tearDown() throws IOException { @Benchmark public Task dequeue() { - return queue.dequeue(); + return queue.remove(); } } diff --git a/services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/CachingTaskCollection.java b/services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/CachingTaskCollection.java new file mode 100644 index 00000000000..2d3c7aca265 --- /dev/null +++ b/services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/CachingTaskCollection.java @@ -0,0 +1,146 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.tasks; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.HashSet; +import java.util.Queue; +import java.util.Set; + +public class CachingTaskCollection implements TaskCollection { + private static final int DEFAULT_CACHE_SIZE = 1_000_000; + private final int maxCacheSize; + + // The underlying collection + private final TaskCollection wrappedCollection; + /** + * A cache of tasks to operate on before going to {@link CachingTaskCollection#wrappedCollection} + */ + private final Queue> cache = new ArrayDeque<>(); + // Tasks that have been removed, but not marked completed yet + private final Set> outstandingTasks = new HashSet<>(); + + private boolean closed = false; + + public CachingTaskCollection(final TaskCollection collection, final int maxCacheSize) { + this.wrappedCollection = collection; + this.maxCacheSize = maxCacheSize; + } + + public CachingTaskCollection(final TaskCollection collection) { + this(collection, DEFAULT_CACHE_SIZE); + } + + @Override + public synchronized void add(final T taskData) { + assertNotClosed(); + if (cacheSize() >= maxCacheSize) { + // Too many tasks in the cache, push this to the underlying collection + wrappedCollection.add(taskData); + return; + } + + Task newTask = new CachedTask<>(this, taskData); + cache.add(newTask); + } + + @Override + public synchronized Task remove() { + assertNotClosed(); + if (cache.size() == 0) { + return wrappedCollection.remove(); + } + + final Task pendingTask = cache.remove(); + outstandingTasks.add(pendingTask); + return pendingTask; + } + + @Override + public synchronized void clear() { + assertNotClosed(); + wrappedCollection.clear(); + outstandingTasks.clear(); + cache.clear(); + } + + @Override + public synchronized long size() { + return wrappedCollection.size() + cache.size(); + } + + public synchronized int cacheSize() { + return outstandingTasks.size() + cache.size(); + } + + @Override + public synchronized boolean isEmpty() { + return size() == 0; + } + + /** @return True if all tasks have been removed and processed. */ + @Override + public synchronized boolean allTasksCompleted() { + return cacheSize() == 0 && wrappedCollection.allTasksCompleted(); + } + + private synchronized boolean completePendingTask(final CachedTask cachedTask) { + return outstandingTasks.remove(cachedTask); + } + + private synchronized void failPendingTask(final CachedTask cachedTask) { + if (completePendingTask(cachedTask)) { + cache.add(cachedTask); + } + } + + @Override + public synchronized void close() throws IOException { + outstandingTasks.clear(); + cache.clear(); + wrappedCollection.close(); + closed = true; + } + + private void assertNotClosed() { + if (closed) { + throw new IllegalStateException("Attempt to access closed " + getClass().getSimpleName()); + } + } + + private static class CachedTask implements Task { + private final CachingTaskCollection cachingTaskCollection; + private final T data; + + private CachedTask(final CachingTaskCollection cachingTaskCollection, final T data) { + this.cachingTaskCollection = cachingTaskCollection; + this.data = data; + } + + @Override + public T getData() { + return data; + } + + @Override + public void markCompleted() { + cachingTaskCollection.completePendingTask(this); + } + + @Override + public void markFailed() { + cachingTaskCollection.failPendingTask(this); + } + } +} diff --git a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/InMemoryTaskQueue.java b/services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/InMemoryTaskQueue.java similarity index 92% rename from services/queue/src/main/java/tech/pegasys/pantheon/services/queue/InMemoryTaskQueue.java rename to services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/InMemoryTaskQueue.java index 351980b4cfe..00979560230 100644 --- a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/InMemoryTaskQueue.java +++ b/services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/InMemoryTaskQueue.java @@ -10,7 +10,7 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package tech.pegasys.pantheon.services.queue; +package tech.pegasys.pantheon.services.tasks; import java.util.ArrayDeque; import java.util.HashSet; @@ -18,19 +18,19 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; -public class InMemoryTaskQueue implements TaskQueue { +public class InMemoryTaskQueue implements TaskCollection { private final Queue internalQueue = new ArrayDeque<>(); private final Set> unfinishedOutstandingTasks = new HashSet<>(); private final AtomicBoolean closed = new AtomicBoolean(false); @Override - public synchronized void enqueue(final T taskData) { + public synchronized void add(final T taskData) { assertNotClosed(); internalQueue.add(taskData); } @Override - public synchronized Task dequeue() { + public synchronized Task remove() { assertNotClosed(); T data = internalQueue.poll(); if (data == null) { @@ -81,7 +81,7 @@ private void assertNotClosed() { private synchronized void handleFailedTask(final InMemoryTask task) { if (markTaskCompleted(task)) { - enqueue(task.getData()); + add(task.getData()); } } diff --git a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueue.java b/services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/RocksDbTaskQueue.java similarity index 97% rename from services/queue/src/main/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueue.java rename to services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/RocksDbTaskQueue.java index 95401e3499c..9147a717c96 100644 --- a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueue.java +++ b/services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/RocksDbTaskQueue.java @@ -10,7 +10,7 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package tech.pegasys.pantheon.services.queue; +package tech.pegasys.pantheon.services.tasks; import tech.pegasys.pantheon.metrics.MetricCategory; import tech.pegasys.pantheon.metrics.MetricsSystem; @@ -31,7 +31,7 @@ import org.rocksdb.RocksDBException; import org.rocksdb.RocksIterator; -public class RocksDbTaskQueue implements TaskQueue { +public class RocksDbTaskQueue implements TaskCollection { private final Options options; private final RocksDB db; @@ -106,7 +106,7 @@ public static RocksDbTaskQueue create( } @Override - public synchronized void enqueue(final T taskData) { + public synchronized void add(final T taskData) { assertNotClosed(); try (final OperationTimer.TimingContext ignored = enqueueLatency.startTimer()) { final long key = ++lastEnqueuedKey; @@ -117,7 +117,7 @@ public synchronized void enqueue(final T taskData) { } @Override - public synchronized Task dequeue() { + public synchronized Task remove() { assertNotClosed(); if (isEmpty()) { return null; @@ -237,7 +237,7 @@ private synchronized boolean markTaskCompleted(final RocksDbTask task) { private synchronized void handleFailedTask(final RocksDbTask task) { if (markTaskCompleted(task)) { - enqueue(task.getData()); + add(task.getData()); } } diff --git a/services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/Task.java b/services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/Task.java new file mode 100644 index 00000000000..2b3067a47fc --- /dev/null +++ b/services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/Task.java @@ -0,0 +1,23 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.tasks; + +public interface Task { + T getData(); + + /** Mark this task as completed. */ + void markCompleted(); + + /** Mark this task as failed and requeue. */ + void markFailed(); +} diff --git a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/TaskQueue.java b/services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/TaskCollection.java similarity index 62% rename from services/queue/src/main/java/tech/pegasys/pantheon/services/queue/TaskQueue.java rename to services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/TaskCollection.java index 20a5482a7b9..5468ffb69d7 100644 --- a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/TaskQueue.java +++ b/services/tasks/src/main/java/tech/pegasys/pantheon/services/tasks/TaskCollection.java @@ -1,5 +1,5 @@ /* - * Copyright 2018 ConsenSys AG. + * Copyright 2019 ConsenSys AG. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -10,31 +10,25 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package tech.pegasys.pantheon.services.queue; +package tech.pegasys.pantheon.services.tasks; import java.io.Closeable; -/** - * Represents a very large thread-safe task queue that may exceed memory limits. - * - * @param the type of data held in the queue - */ -public interface TaskQueue extends Closeable { - +public interface TaskCollection extends Closeable { /** - * Enqueue some data for processing. + * Add some data that needs to be processed. * * @param taskData The data to be processed. */ - void enqueue(T taskData); + void add(T taskData); /** - * Dequeue a task for processing. This task will be tracked as a pending task until either {@code + * Get a task for processing. This task will be tracked as a pending task until either {@code * Task.markCompleted} or {@code Task.requeue} is called. * * @return The task to be processed. */ - Task dequeue(); + Task remove(); /** @return The number of tasks in the queue. */ long size(); @@ -47,14 +41,4 @@ public interface TaskQueue extends Closeable { /** @return True if all tasks have been dequeued and processed. */ boolean allTasksCompleted(); - - interface Task { - T getData(); - - /** Mark this task as completed. */ - void markCompleted(); - - /** Mark this task as failed and requeue. */ - void markFailed(); - } } diff --git a/services/queue/src/main/resources/log4j2.xml b/services/tasks/src/main/resources/log4j2.xml similarity index 100% rename from services/queue/src/main/resources/log4j2.xml rename to services/tasks/src/main/resources/log4j2.xml diff --git a/services/queue/src/test/java/tech/pegasys/pantheon/services/queue/AbstractTaskQueueTest.java b/services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/AbstractTaskQueueTest.java similarity index 81% rename from services/queue/src/test/java/tech/pegasys/pantheon/services/queue/AbstractTaskQueueTest.java rename to services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/AbstractTaskQueueTest.java index af4852433f5..a02a7b2ee2b 100644 --- a/services/queue/src/test/java/tech/pegasys/pantheon/services/queue/AbstractTaskQueueTest.java +++ b/services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/AbstractTaskQueueTest.java @@ -10,11 +10,10 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package tech.pegasys.pantheon.services.queue; +package tech.pegasys.pantheon.services.tasks; import static org.assertj.core.api.Assertions.assertThat; -import tech.pegasys.pantheon.services.queue.TaskQueue.Task; import tech.pegasys.pantheon.util.bytes.BytesValue; import java.util.ArrayList; @@ -25,7 +24,7 @@ import org.junit.Test; -abstract class AbstractTaskQueueTest> { +abstract class AbstractTaskQueueTest> { protected abstract T createQueue() throws Exception; @@ -36,20 +35,20 @@ public void enqueueAndDequeue() throws Exception { BytesValue two = BytesValue.of(2); BytesValue three = BytesValue.of(3); - assertThat(queue.dequeue()).isNull(); + assertThat(queue.remove()).isNull(); - queue.enqueue(one); - queue.enqueue(two); - assertThat(queue.dequeue().getData()).isEqualTo(one); + queue.add(one); + queue.add(two); + assertThat(queue.remove().getData()).isEqualTo(one); - queue.enqueue(three); - assertThat(queue.dequeue().getData()).isEqualTo(two); - assertThat(queue.dequeue().getData()).isEqualTo(three); - assertThat(queue.dequeue()).isNull(); - assertThat(queue.dequeue()).isNull(); + queue.add(three); + assertThat(queue.remove().getData()).isEqualTo(two); + assertThat(queue.remove().getData()).isEqualTo(three); + assertThat(queue.remove()).isNull(); + assertThat(queue.remove()).isNull(); - queue.enqueue(three); - assertThat(queue.dequeue().getData()).isEqualTo(three); + queue.add(three); + assertThat(queue.remove().getData()).isEqualTo(three); } } @@ -61,12 +60,12 @@ public void markTaskFailed() throws Exception { assertThat(queue.isEmpty()).isTrue(); assertThat(queue.allTasksCompleted()).isTrue(); - queue.enqueue(value); + queue.add(value); assertThat(queue.isEmpty()).isFalse(); assertThat(queue.allTasksCompleted()).isFalse(); - Task task = queue.dequeue(); + Task task = queue.remove(); assertThat(task).isNotNull(); assertThat(task.getData()).isEqualTo(value); assertThat(queue.isEmpty()).isTrue(); @@ -91,12 +90,12 @@ public void markTaskCompleted() throws Exception { assertThat(queue.isEmpty()).isTrue(); assertThat(queue.allTasksCompleted()).isTrue(); - queue.enqueue(value); + queue.add(value); assertThat(queue.isEmpty()).isFalse(); assertThat(queue.allTasksCompleted()).isFalse(); - Task task = queue.dequeue(); + Task task = queue.remove(); assertThat(task).isNotNull(); assertThat(task.getData()).isEqualTo(value); assertThat(queue.isEmpty()).isTrue(); @@ -122,8 +121,8 @@ public void clear() throws Exception { BytesValue four = BytesValue.of(4); // Fill queue - queue.enqueue(one); - queue.enqueue(two); + queue.add(one); + queue.add(two); assertThat(queue.size()).isEqualTo(2); assertThat(queue.isEmpty()).isFalse(); assertThat(queue.allTasksCompleted()).isFalse(); @@ -133,14 +132,14 @@ public void clear() throws Exception { assertThat(queue.size()).isEqualTo(0); assertThat(queue.isEmpty()).isTrue(); assertThat(queue.allTasksCompleted()).isTrue(); - assertThat(queue.dequeue()).isNull(); + assertThat(queue.remove()).isNull(); // Subsequent operations should work as expected - queue.enqueue(three); + queue.add(three); assertThat(queue.size()).isEqualTo(1); - queue.enqueue(four); + queue.add(four); assertThat(queue.size()).isEqualTo(2); - assertThat(queue.dequeue().getData()).isEqualTo(three); + assertThat(queue.remove().getData()).isEqualTo(three); } } @@ -149,9 +148,9 @@ public void clear_emptyQueueWithOutstandingTasks() throws Exception { try (T queue = createQueue()) { BytesValue one = BytesValue.of(1); - // Add and then dequeue task - queue.enqueue(one); - Task task = queue.dequeue(); + // Add and then remove task + queue.add(one); + Task task = queue.remove(); assertThat(task.getData()).isEqualTo(one); assertThat(queue.isEmpty()).isTrue(); assertThat(queue.allTasksCompleted()).isFalse(); @@ -161,14 +160,14 @@ public void clear_emptyQueueWithOutstandingTasks() throws Exception { assertThat(queue.size()).isEqualTo(0); assertThat(queue.isEmpty()).isTrue(); assertThat(queue.allTasksCompleted()).isTrue(); - assertThat(queue.dequeue()).isNull(); + assertThat(queue.remove()).isNull(); // Marking old task as failed should not requeue task task.markFailed(); assertThat(queue.size()).isEqualTo(0); assertThat(queue.isEmpty()).isTrue(); assertThat(queue.allTasksCompleted()).isTrue(); - assertThat(queue.dequeue()).isNull(); + assertThat(queue.remove()).isNull(); } } @@ -188,7 +187,7 @@ public void handlesConcurrentQueuing() throws Exception { () -> { while (queuingFinished.getCount() > 0 || !queue.isEmpty()) { if (!queue.isEmpty()) { - Task value = queue.dequeue(); + Task value = queue.remove(); value.markCompleted(); dequeued.add(value); } @@ -203,7 +202,7 @@ public void handlesConcurrentQueuing() throws Exception { () -> { try { for (int i = 0; i < itemsPerThread; i++) { - queue.enqueue(value); + queue.add(value); } } finally { queuingFinished.countDown(); diff --git a/services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/CachingTaskCollectionTest.java b/services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/CachingTaskCollectionTest.java new file mode 100644 index 00000000000..7775586dcb9 --- /dev/null +++ b/services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/CachingTaskCollectionTest.java @@ -0,0 +1,210 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.tasks; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import tech.pegasys.pantheon.util.bytes.BytesValue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import org.junit.Before; +import org.junit.Test; + +public class CachingTaskCollectionTest { + private TaskCollection wrappedTaskCollection; + + @Before + public void setup() { + wrappedTaskCollection = new InMemoryTaskQueue<>(); + } + + @Test + public void failTasksFromCache() { + testFailTasks(10, 5); + } + + @Test + public void failTasksOverflowingCache() { + testFailTasks(10, 20); + } + + @Test + public void failTasksWithNoCache() { + testFailTasks(0, 5); + } + + private void testFailTasks(final int cacheSize, final int taskCount) { + final CachingTaskCollection taskCollection = createCachingCollection(cacheSize); + + final List taskData = generateTasks(taskCollection, taskCount); + assertThat(taskCollection.size()).isEqualTo(taskCount); + assertThat(taskCollection.allTasksCompleted()).isFalse(); + + List> tasks = getAllTasks(taskCollection); + assertThat(taskCollection.size()).isEqualTo(0); + assertThat(taskCollection.allTasksCompleted()).isFalse(); + + // Check tasks match what we added + assertThat(getTaskData(tasks)).containsExactlyInAnyOrder(taskData.toArray(new BytesValue[0])); + + // Fail all tasks + tasks.forEach(Task::markFailed); + assertThat(taskCollection.size()).isEqualTo(taskCount); + assertThat(taskCollection.allTasksCompleted()).isFalse(); + + // Collect tasks again - they should have all been re-added + tasks = getAllTasks(taskCollection); + // Check tasks match what we added + assertThat(getTaskData(tasks)).containsExactlyInAnyOrder(taskData.toArray(new BytesValue[0])); + + // Clear tasks and then fail all outstanding tasks + taskCollection.clear(); + assertThat(taskCollection.isEmpty()).isTrue(); + assertThat(taskCollection.allTasksCompleted()).isTrue(); + // Old failed tasks should not be re-added + tasks.forEach(Task::markFailed); + assertThat(taskCollection.isEmpty()).isTrue(); + assertThat(taskCollection.allTasksCompleted()).isTrue(); + assertThat(taskCollection.size()).isEqualTo(0); + } + + @Test + public void completeTasksFromCache() { + testCompleteTasks(10, 9); + } + + @Test + public void completeTasksThatOverflowCache() { + testCompleteTasks(10, 20); + } + + @Test + public void completeTasksWithNoCache() { + testCompleteTasks(0, 20); + } + + private void testCompleteTasks(final int cacheSize, final int taskCount) { + final CachingTaskCollection taskCollection = createCachingCollection(cacheSize); + + final List taskData = generateTasks(taskCollection, taskCount); + assertThat(taskCollection.size()).isEqualTo(taskCount); + assertThat(taskCollection.allTasksCompleted()).isFalse(); + + final List> tasks = getAllTasks(taskCollection); + assertThat(taskCollection.size()).isEqualTo(0); + assertThat(taskCollection.allTasksCompleted()).isFalse(); + + // Complete all but last task + tasks.subList(0, tasks.size() - 1).forEach(Task::markCompleted); + assertThat(taskCollection.allTasksCompleted()).isFalse(); + + // Process last task + tasks.get(tasks.size() - 1).markCompleted(); + assertThat(taskCollection.size()).isEqualTo(0); + assertThat(taskCollection.allTasksCompleted()).isTrue(); + + assertThat(getTaskData(tasks)).containsExactlyInAnyOrder(taskData.toArray(new BytesValue[0])); + } + + @Test + public void processTasksWithMixedSuccess_cachedTasks() { + testProcessTasksWithMixedSuccess(10, 5); + } + + @Test + public void processTasksWithMixedSuccess_tasksOverflowCache() { + testProcessTasksWithMixedSuccess(10, 20); + } + + @Test + public void processTasksWithMixedSuccess_noCache() { + testProcessTasksWithMixedSuccess(10, 20); + } + + private CachingTaskCollection createCachingCollection(final int cacheSize) { + return new CachingTaskCollection<>(wrappedTaskCollection, cacheSize); + } + + private void testProcessTasksWithMixedSuccess(final int cacheSize, final int taskCount) { + final CachingTaskCollection taskCollection = createCachingCollection(cacheSize); + + final List taskData = generateTasks(taskCollection, taskCount); + assertThat(taskCollection.size()).isEqualTo(taskCount); + assertThat(taskCollection.allTasksCompleted()).isFalse(); + + final List> tasks = getAllTasks(taskCollection); + + final List> failedTasks = new ArrayList<>(); + boolean shouldFail = false; + for (Task task : tasks) { + if (shouldFail) { + task.markFailed(); + failedTasks.add(task); + } else { + task.markCompleted(); + } + shouldFail = !shouldFail; + } + assertThat(taskCollection.allTasksCompleted()).isFalse(); + assertThat(taskCollection.size()).isEqualTo(failedTasks.size()); + + final List actualTaskData = + tasks.stream().map(Task::getData).collect(Collectors.toList()); + assertThat(actualTaskData).containsExactlyInAnyOrder(taskData.toArray(new BytesValue[0])); + + final List> remainingTasks = getAllTasks(taskCollection); + assertThat(remainingTasks.size()).isEqualTo(failedTasks.size()); + assertThat(getTaskData(remainingTasks)) + .containsExactlyInAnyOrder(getTaskData(failedTasks).toArray(new BytesValue[0])); + } + + @Test + public void close() throws IOException { + final CachingTaskCollection taskCollection = createCachingCollection(10); + taskCollection.close(); + assertThatThrownBy(() -> taskCollection.add(BytesValue.of(1))) + .isInstanceOf(IllegalStateException.class); + } + + private List generateTasks( + final TaskCollection taskCollection, final int taskCount) { + final List taskData = new ArrayList<>(); + for (int i = 0; i < taskCount; i++) { + final BytesValue value = BytesValue.of(i & 0xff); + taskData.add(value); + taskCollection.add(value); + } + return taskData; + } + + private List getTaskData(final List> tasks) { + return tasks.stream().map(Task::getData).collect(Collectors.toList()); + } + + private List> getAllTasks(final TaskCollection taskCollection) { + final List> tasks = new ArrayList<>(); + while (taskCollection.size() > 0) { + tasks.add(taskCollection.remove()); + } + return tasks; + } + + private interface TaskCollectionSupplier { + TaskCollection get() throws Exception; + } +} diff --git a/services/queue/src/test/java/tech/pegasys/pantheon/services/queue/InMemoryTaskQueueTest.java b/services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/InMemoryTaskQueueTest.java similarity index 94% rename from services/queue/src/test/java/tech/pegasys/pantheon/services/queue/InMemoryTaskQueueTest.java rename to services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/InMemoryTaskQueueTest.java index f7994ef1b38..b552ffdf36c 100644 --- a/services/queue/src/test/java/tech/pegasys/pantheon/services/queue/InMemoryTaskQueueTest.java +++ b/services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/InMemoryTaskQueueTest.java @@ -10,7 +10,7 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package tech.pegasys.pantheon.services.queue; +package tech.pegasys.pantheon.services.tasks; import tech.pegasys.pantheon.util.bytes.BytesValue; diff --git a/services/queue/src/test/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueueTest.java b/services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/RocksDbTaskQueueTest.java similarity index 89% rename from services/queue/src/test/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueueTest.java rename to services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/RocksDbTaskQueueTest.java index 5f355b37433..732787894af 100644 --- a/services/queue/src/test/java/tech/pegasys/pantheon/services/queue/RocksDbTaskQueueTest.java +++ b/services/tasks/src/test/java/tech/pegasys/pantheon/services/tasks/RocksDbTaskQueueTest.java @@ -10,7 +10,7 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package tech.pegasys.pantheon.services.queue; +package tech.pegasys.pantheon.services.tasks; import static org.assertj.core.api.Assertions.assertThat; @@ -59,21 +59,21 @@ private void testResumeFromExistingQueue(final int elementCount) throws Exceptio final Path dataDir = folder.newFolder().toPath(); try (final RocksDbTaskQueue queue = createQueue(dataDir)) { for (int i = 0; i < elementCount; i++) { - queue.enqueue(BytesValue.of(i)); + queue.add(BytesValue.of(i)); } } try (final RocksDbTaskQueue resumedQueue = createQueue(dataDir)) { assertThat(resumedQueue.size()).isEqualTo(elementCount); // Queue an additional element - resumedQueue.enqueue(BytesValue.of(99)); + resumedQueue.add(BytesValue.of(99)); assertThat(resumedQueue.size()).isEqualTo(elementCount + 1); // Check that everything dequeues in order as expected for (int i = 0; i < elementCount; i++) { - assertThat(resumedQueue.dequeue().getData()).isEqualTo(BytesValue.of(i)); + assertThat(resumedQueue.remove().getData()).isEqualTo(BytesValue.of(i)); } - assertThat(resumedQueue.dequeue().getData()).isEqualTo(BytesValue.of(99)); + assertThat(resumedQueue.remove().getData()).isEqualTo(BytesValue.of(99)); assertThat(resumedQueue.size()).isEqualTo(0); } diff --git a/settings.gradle b/settings.gradle index c690988d7a0..49c7dbc4ea3 100644 --- a/settings.gradle +++ b/settings.gradle @@ -36,6 +36,7 @@ include 'metrics' include 'pantheon' include 'services:kvstore' include 'services:pipeline' +include 'services:tasks' include 'services:queue' include 'services:util' include 'testutil'