From 8c98388e6cfedf4b862e7821b67bcb01bfcd6cee Mon Sep 17 00:00:00 2001 From: mbaxter Date: Wed, 30 Jan 2019 19:42:47 -0500 Subject: [PATCH] [NC-2207] Add a basic queue on top of RocksDB (#698) Signed-off-by: Adrian Sutton --- .../pantheon/metrics/MetricCategory.java | 13 +- services/kvstore/build.gradle | 1 + .../kvstore/RocksDbKeyValueStorage.java | 19 +-- services/queue/build.gradle | 3 + .../pantheon/services/queue/RocksDbQueue.java | 133 ++++++++++++++++++ .../services/queue/AbstractBigQueueTest.java | 104 ++++++++++++++ .../services/queue/InMemoryBigQueueTest.java | 23 +++ .../services/queue/RocksDbQueueTest.java | 30 ++++ services/util/build.gradle | 40 ++++++ .../pantheon/services/util/RocksDbUtil.java | 39 +++++ settings.gradle | 1 + 11 files changed, 383 insertions(+), 23 deletions(-) create mode 100644 services/queue/src/main/java/tech/pegasys/pantheon/services/queue/RocksDbQueue.java create mode 100644 services/queue/src/test/tech/pegasys/pantheon/services/queue/AbstractBigQueueTest.java create mode 100644 services/queue/src/test/tech/pegasys/pantheon/services/queue/InMemoryBigQueueTest.java create mode 100644 services/queue/src/test/tech/pegasys/pantheon/services/queue/RocksDbQueueTest.java create mode 100644 services/util/build.gradle create mode 100644 services/util/src/main/java/tech/pegasys/pantheon/services/util/RocksDbUtil.java diff --git a/metrics/src/main/java/tech/pegasys/pantheon/metrics/MetricCategory.java b/metrics/src/main/java/tech/pegasys/pantheon/metrics/MetricCategory.java index b7e0db0eb82..110f25a88d0 100644 --- a/metrics/src/main/java/tech/pegasys/pantheon/metrics/MetricCategory.java +++ b/metrics/src/main/java/tech/pegasys/pantheon/metrics/MetricCategory.java @@ -13,14 +13,15 @@ package tech.pegasys.pantheon.metrics; public enum MetricCategory { - PEERS("peers"), - RPC("rpc"), - JVM("jvm", false), - PROCESS("process", false), + BIG_QUEUE("big_queue"), BLOCKCHAIN("blockchain"), - SYNCHRONIZER("synchronizer"), + JVM("jvm", false), NETWORK("network"), - ROCKSDB("rocksdb"); + PEERS("peers"), + PROCESS("process", false), + ROCKSDB("rocksdb"), + RPC("rpc"), + SYNCHRONIZER("synchronizer"); private final String name; private final boolean pantheonSpecific; diff --git a/services/kvstore/build.gradle b/services/kvstore/build.gradle index 6ed1fbb02dd..49f1695c60c 100644 --- a/services/kvstore/build.gradle +++ b/services/kvstore/build.gradle @@ -28,6 +28,7 @@ jar { dependencies { api project(':util') implementation project(':metrics') + implementation project(':services:util') implementation 'org.apache.logging.log4j:log4j-api' implementation 'com.google.guava:guava' diff --git a/services/kvstore/src/main/java/tech/pegasys/pantheon/services/kvstore/RocksDbKeyValueStorage.java b/services/kvstore/src/main/java/tech/pegasys/pantheon/services/kvstore/RocksDbKeyValueStorage.java index 7fd757fc1e8..14bdc1b02ff 100644 --- a/services/kvstore/src/main/java/tech/pegasys/pantheon/services/kvstore/RocksDbKeyValueStorage.java +++ b/services/kvstore/src/main/java/tech/pegasys/pantheon/services/kvstore/RocksDbKeyValueStorage.java @@ -16,7 +16,7 @@ import tech.pegasys.pantheon.metrics.MetricCategory; import tech.pegasys.pantheon.metrics.MetricsSystem; import tech.pegasys.pantheon.metrics.OperationTimer; -import tech.pegasys.pantheon.util.InvalidConfigurationException; +import tech.pegasys.pantheon.services.util.RocksDbUtil; import tech.pegasys.pantheon.util.bytes.BytesValue; import java.io.Closeable; @@ -33,7 +33,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.rocksdb.Options; -import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; import org.rocksdb.RocksIterator; import org.rocksdb.TransactionDB; @@ -57,25 +56,11 @@ public class RocksDbKeyValueStorage implements KeyValueStorage, Closeable { public static KeyValueStorage create( final Path storageDirectory, final MetricsSystem metricsSystem) throws StorageException { - loadNativeLibrary(); return new RocksDbKeyValueStorage(storageDirectory, metricsSystem); } - private static void loadNativeLibrary() { - try { - RocksDB.loadLibrary(); - } catch (final ExceptionInInitializerError e) { - if (e.getCause() instanceof UnsupportedOperationException) { - LOG.info("Unable to load RocksDB library", e); - throw new InvalidConfigurationException( - "Unsupported platform detected. On Windows, ensure you have 64bit Java installed."); - } else { - throw e; - } - } - } - private RocksDbKeyValueStorage(final Path storageDirectory, final MetricsSystem metricsSystem) { + RocksDbUtil.loadNativeLibrary(); try { options = new Options().setCreateIfMissing(true); txOptions = new TransactionDBOptions(); diff --git a/services/queue/build.gradle b/services/queue/build.gradle index 616f648e2f3..df05ae86983 100644 --- a/services/queue/build.gradle +++ b/services/queue/build.gradle @@ -28,11 +28,14 @@ jar { dependencies { api project(':util') implementation project(':metrics') + implementation project(':services:util') implementation 'org.apache.logging.log4j:log4j-api' implementation 'com.google.guava:guava' + implementation 'org.rocksdb:rocksdbjni' runtime 'org.apache.logging.log4j:log4j-core' testImplementation 'junit:junit' + testImplementation 'org.assertj:assertj-core' } diff --git a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/RocksDbQueue.java b/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/RocksDbQueue.java new file mode 100644 index 00000000000..e2cb8e3623a --- /dev/null +++ b/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/RocksDbQueue.java @@ -0,0 +1,133 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.queue; + +import tech.pegasys.pantheon.metrics.MetricCategory; +import tech.pegasys.pantheon.metrics.MetricsSystem; +import tech.pegasys.pantheon.metrics.OperationTimer; +import tech.pegasys.pantheon.services.util.RocksDbUtil; +import tech.pegasys.pantheon.util.bytes.BytesValue; + +import java.nio.file.Path; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.primitives.Longs; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.rocksdb.Options; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; + +public class RocksDbQueue implements BigQueue { + + private static final Logger LOG = LogManager.getLogger(); + + private final Options options; + private final RocksDB db; + + private final AtomicLong lastEnqueuedKey = new AtomicLong(0); + private final AtomicLong lastDequeuedKey = new AtomicLong(0); + private final AtomicBoolean closed = new AtomicBoolean(false); + + private final OperationTimer enqueueLatency; + private final OperationTimer dequeueLatency; + + private RocksDbQueue(final Path storageDirectory, final MetricsSystem metricsSystem) { + try { + RocksDbUtil.loadNativeLibrary(); + options = + new Options() + .setCreateIfMissing(true) + // TODO: Support restoration from a previously persisted queue + .setErrorIfExists(true); + db = RocksDB.open(options, storageDirectory.toString()); + + enqueueLatency = + metricsSystem.createTimer( + MetricCategory.BIG_QUEUE, + "enqueue_latency_seconds", + "Latency for enqueuing an item."); + dequeueLatency = + metricsSystem.createTimer( + MetricCategory.BIG_QUEUE, + "dequeue_latency_seconds", + "Latency for dequeuing an item."); + } catch (final RocksDBException e) { + throw new StorageException(e); + } + } + + public static RocksDbQueue create( + final Path storageDirectory, final MetricsSystem metricsSystem) { + return new RocksDbQueue(storageDirectory, metricsSystem); + } + + @Override + public synchronized void enqueue(final BytesValue value) { + assertNotClosed(); + try (final OperationTimer.TimingContext ignored = enqueueLatency.startTimer()) { + byte[] key = Longs.toByteArray(lastEnqueuedKey.incrementAndGet()); + db.put(key, value.getArrayUnsafe()); + } catch (RocksDBException e) { + throw new StorageException(e); + } + } + + @Override + public synchronized BytesValue dequeue() { + assertNotClosed(); + if (size() == 0) { + return null; + } + try (final OperationTimer.TimingContext ignored = dequeueLatency.startTimer()) { + byte[] key = Longs.toByteArray(lastDequeuedKey.incrementAndGet()); + byte[] value = db.get(key); + if (value == null) { + throw new IllegalStateException("Next expected value is missing"); + } + db.delete(key); + + return BytesValue.of(value); + } catch (RocksDBException e) { + throw new StorageException(e); + } + } + + @Override + public synchronized long size() { + assertNotClosed(); + return lastEnqueuedKey.get() - lastDequeuedKey.get(); + } + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + options.close(); + db.close(); + } + } + + private void assertNotClosed() { + if (closed.get()) { + throw new IllegalStateException( + "Attempt to access closed " + RocksDbQueue.class.getSimpleName()); + } + } + + public static class StorageException extends RuntimeException { + StorageException(final Throwable t) { + super(t); + } + } +} diff --git a/services/queue/src/test/tech/pegasys/pantheon/services/queue/AbstractBigQueueTest.java b/services/queue/src/test/tech/pegasys/pantheon/services/queue/AbstractBigQueueTest.java new file mode 100644 index 00000000000..0d8ba296fd7 --- /dev/null +++ b/services/queue/src/test/tech/pegasys/pantheon/services/queue/AbstractBigQueueTest.java @@ -0,0 +1,104 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.queue; + +import static org.assertj.core.api.Assertions.assertThat; + +import tech.pegasys.pantheon.util.bytes.BytesValue; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CountDownLatch; +import java.util.function.Function; + +import org.junit.Test; + +abstract class AbstractBigQueueTest> { + + protected abstract T createQueue() throws Exception; + + @Test + public void enqueueAndDequeue() throws Exception { + try (T queue = createQueue()) { + BytesValue one = BytesValue.of(1); + BytesValue two = BytesValue.of(2); + BytesValue three = BytesValue.of(3); + + assertThat(queue.dequeue()).isNull(); + + queue.enqueue(one); + queue.enqueue(two); + assertThat(queue.dequeue()).isEqualTo(one); + + queue.enqueue(three); + assertThat(queue.dequeue()).isEqualTo(two); + assertThat(queue.dequeue()).isEqualTo(three); + assertThat(queue.dequeue()).isNull(); + assertThat(queue.dequeue()).isNull(); + + queue.enqueue(three); + assertThat(queue.dequeue()).isEqualTo(three); + } + } + + @Test + public void handlesConcurrentQueuing() throws Exception { + final int threadCount = 5; + final int itemsPerThread = 1000; + final T queue = createQueue(); + + final CountDownLatch dequeueingFinished = new CountDownLatch(1); + final CountDownLatch queuingFinished = new CountDownLatch(threadCount); + + // Start thread for reading values + List dequeued = new ArrayList<>(); + Thread reader = + new Thread( + () -> { + while (queuingFinished.getCount() > 0 || !queue.isEmpty()) { + if (!queue.isEmpty()) { + BytesValue value = queue.dequeue(); + dequeued.add(value); + } + } + dequeueingFinished.countDown(); + }); + reader.start(); + + final Function queueingThreadFactory = + (value) -> + new Thread( + () -> { + try { + for (int i = 0; i < itemsPerThread; i++) { + queue.enqueue(value); + } + } finally { + queuingFinished.countDown(); + } + }); + + // Start threads to queue values + for (int i = 0; i < threadCount; i++) { + queueingThreadFactory.apply(BytesValue.of(i)).start(); + } + + queuingFinished.await(); + dequeueingFinished.await(); + + assertThat(dequeued.size()).isEqualTo(threadCount * itemsPerThread); + assertThat(dequeued.stream().filter(Objects::isNull).count()).isEqualTo(0); + assertThat(queue.size()).isEqualTo(0); + } +} diff --git a/services/queue/src/test/tech/pegasys/pantheon/services/queue/InMemoryBigQueueTest.java b/services/queue/src/test/tech/pegasys/pantheon/services/queue/InMemoryBigQueueTest.java new file mode 100644 index 00000000000..3d4c2fb6266 --- /dev/null +++ b/services/queue/src/test/tech/pegasys/pantheon/services/queue/InMemoryBigQueueTest.java @@ -0,0 +1,23 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.queue; + +import tech.pegasys.pantheon.util.bytes.BytesValue; + +public class InMemoryBigQueueTest extends AbstractBigQueueTest> { + + @Override + protected InMemoryBigQueue createQueue() throws Exception { + return new InMemoryBigQueue<>(); + } +} diff --git a/services/queue/src/test/tech/pegasys/pantheon/services/queue/RocksDbQueueTest.java b/services/queue/src/test/tech/pegasys/pantheon/services/queue/RocksDbQueueTest.java new file mode 100644 index 00000000000..1fd25ae239c --- /dev/null +++ b/services/queue/src/test/tech/pegasys/pantheon/services/queue/RocksDbQueueTest.java @@ -0,0 +1,30 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.queue; + +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; + +import java.io.IOException; + +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; + +public class RocksDbQueueTest extends AbstractBigQueueTest { + + @Rule public final TemporaryFolder folder = new TemporaryFolder(); + + @Override + protected RocksDbQueue createQueue() throws IOException { + return RocksDbQueue.create(folder.newFolder().toPath(), new NoOpMetricsSystem()); + } +} diff --git a/services/util/build.gradle b/services/util/build.gradle new file mode 100644 index 00000000000..a0ac91b6bdb --- /dev/null +++ b/services/util/build.gradle @@ -0,0 +1,40 @@ +/* + * Copyright 2018 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +apply plugin: 'java-library' + +jar { + baseName 'services-util' + manifest { + attributes( + 'Specification-Title': baseName, + 'Specification-Version': project.version, + 'Implementation-Title': baseName, + 'Implementation-Version': calculateVersion() + ) + } +} + +dependencies { + api project(':util') + implementation project(':metrics') + + implementation 'org.apache.logging.log4j:log4j-api' + implementation 'com.google.guava:guava' + implementation 'org.rocksdb:rocksdbjni' + + runtime 'org.apache.logging.log4j:log4j-core' + + testImplementation 'junit:junit' + testImplementation 'org.assertj:assertj-core' +} diff --git a/services/util/src/main/java/tech/pegasys/pantheon/services/util/RocksDbUtil.java b/services/util/src/main/java/tech/pegasys/pantheon/services/util/RocksDbUtil.java new file mode 100644 index 00000000000..0a1db8ba41c --- /dev/null +++ b/services/util/src/main/java/tech/pegasys/pantheon/services/util/RocksDbUtil.java @@ -0,0 +1,39 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.util; + +import tech.pegasys.pantheon.util.InvalidConfigurationException; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.rocksdb.RocksDB; + +public class RocksDbUtil { + private static final Logger LOG = LogManager.getLogger(); + + private RocksDbUtil() {} + + public static void loadNativeLibrary() { + try { + RocksDB.loadLibrary(); + } catch (final ExceptionInInitializerError e) { + if (e.getCause() instanceof UnsupportedOperationException) { + LOG.info("Unable to load RocksDB library", e); + throw new InvalidConfigurationException( + "Unsupported platform detected. On Windows, ensure you have 64bit Java installed."); + } else { + throw e; + } + } + } +} diff --git a/settings.gradle b/settings.gradle index 42fc2c1ec57..1fb2bff6688 100644 --- a/settings.gradle +++ b/settings.gradle @@ -34,6 +34,7 @@ include 'metrics' include 'pantheon' include 'services:kvstore' include 'services:queue' +include 'services:util' include 'testutil' include 'util' include 'errorprone-checks'