Skip to content

Commit

Permalink
[NC-2207] Add a basic queue on top of RocksDB (hyperledger#698)
Browse files Browse the repository at this point in the history
Signed-off-by: Adrian Sutton <adrian.sutton@consensys.net>
  • Loading branch information
mbaxter authored Jan 31, 2019
1 parent 75f1475 commit 8c98388
Show file tree
Hide file tree
Showing 11 changed files with 383 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@
package tech.pegasys.pantheon.metrics;

public enum MetricCategory {
PEERS("peers"),
RPC("rpc"),
JVM("jvm", false),
PROCESS("process", false),
BIG_QUEUE("big_queue"),
BLOCKCHAIN("blockchain"),
SYNCHRONIZER("synchronizer"),
JVM("jvm", false),
NETWORK("network"),
ROCKSDB("rocksdb");
PEERS("peers"),
PROCESS("process", false),
ROCKSDB("rocksdb"),
RPC("rpc"),
SYNCHRONIZER("synchronizer");

private final String name;
private final boolean pantheonSpecific;
Expand Down
1 change: 1 addition & 0 deletions services/kvstore/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ jar {
dependencies {
api project(':util')
implementation project(':metrics')
implementation project(':services:util')

implementation 'org.apache.logging.log4j:log4j-api'
implementation 'com.google.guava:guava'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.util.InvalidConfigurationException;
import tech.pegasys.pantheon.services.util.RocksDbUtil;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.io.Closeable;
Expand All @@ -33,7 +33,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.RocksIterator;
import org.rocksdb.TransactionDB;
Expand All @@ -57,25 +56,11 @@ public class RocksDbKeyValueStorage implements KeyValueStorage, Closeable {

public static KeyValueStorage create(
final Path storageDirectory, final MetricsSystem metricsSystem) throws StorageException {
loadNativeLibrary();
return new RocksDbKeyValueStorage(storageDirectory, metricsSystem);
}

private static void loadNativeLibrary() {
try {
RocksDB.loadLibrary();
} catch (final ExceptionInInitializerError e) {
if (e.getCause() instanceof UnsupportedOperationException) {
LOG.info("Unable to load RocksDB library", e);
throw new InvalidConfigurationException(
"Unsupported platform detected. On Windows, ensure you have 64bit Java installed.");
} else {
throw e;
}
}
}

private RocksDbKeyValueStorage(final Path storageDirectory, final MetricsSystem metricsSystem) {
RocksDbUtil.loadNativeLibrary();
try {
options = new Options().setCreateIfMissing(true);
txOptions = new TransactionDBOptions();
Expand Down
3 changes: 3 additions & 0 deletions services/queue/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@ jar {
dependencies {
api project(':util')
implementation project(':metrics')
implementation project(':services:util')

implementation 'org.apache.logging.log4j:log4j-api'
implementation 'com.google.guava:guava'
implementation 'org.rocksdb:rocksdbjni'

runtime 'org.apache.logging.log4j:log4j-core'

testImplementation 'junit:junit'
testImplementation 'org.assertj:assertj-core'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.queue;

import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.services.util.RocksDbUtil;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.nio.file.Path;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import com.google.common.primitives.Longs;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;

public class RocksDbQueue implements BigQueue<BytesValue> {

private static final Logger LOG = LogManager.getLogger();

private final Options options;
private final RocksDB db;

private final AtomicLong lastEnqueuedKey = new AtomicLong(0);
private final AtomicLong lastDequeuedKey = new AtomicLong(0);
private final AtomicBoolean closed = new AtomicBoolean(false);

private final OperationTimer enqueueLatency;
private final OperationTimer dequeueLatency;

private RocksDbQueue(final Path storageDirectory, final MetricsSystem metricsSystem) {
try {
RocksDbUtil.loadNativeLibrary();
options =
new Options()
.setCreateIfMissing(true)
// TODO: Support restoration from a previously persisted queue
.setErrorIfExists(true);
db = RocksDB.open(options, storageDirectory.toString());

enqueueLatency =
metricsSystem.createTimer(
MetricCategory.BIG_QUEUE,
"enqueue_latency_seconds",
"Latency for enqueuing an item.");
dequeueLatency =
metricsSystem.createTimer(
MetricCategory.BIG_QUEUE,
"dequeue_latency_seconds",
"Latency for dequeuing an item.");
} catch (final RocksDBException e) {
throw new StorageException(e);
}
}

public static RocksDbQueue create(
final Path storageDirectory, final MetricsSystem metricsSystem) {
return new RocksDbQueue(storageDirectory, metricsSystem);
}

@Override
public synchronized void enqueue(final BytesValue value) {
assertNotClosed();
try (final OperationTimer.TimingContext ignored = enqueueLatency.startTimer()) {
byte[] key = Longs.toByteArray(lastEnqueuedKey.incrementAndGet());
db.put(key, value.getArrayUnsafe());
} catch (RocksDBException e) {
throw new StorageException(e);
}
}

@Override
public synchronized BytesValue dequeue() {
assertNotClosed();
if (size() == 0) {
return null;
}
try (final OperationTimer.TimingContext ignored = dequeueLatency.startTimer()) {
byte[] key = Longs.toByteArray(lastDequeuedKey.incrementAndGet());
byte[] value = db.get(key);
if (value == null) {
throw new IllegalStateException("Next expected value is missing");
}
db.delete(key);

return BytesValue.of(value);
} catch (RocksDBException e) {
throw new StorageException(e);
}
}

@Override
public synchronized long size() {
assertNotClosed();
return lastEnqueuedKey.get() - lastDequeuedKey.get();
}

@Override
public void close() {
if (closed.compareAndSet(false, true)) {
options.close();
db.close();
}
}

private void assertNotClosed() {
if (closed.get()) {
throw new IllegalStateException(
"Attempt to access closed " + RocksDbQueue.class.getSimpleName());
}
}

public static class StorageException extends RuntimeException {
StorageException(final Throwable t) {
super(t);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.queue;

import static org.assertj.core.api.Assertions.assertThat;

import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;

import org.junit.Test;

abstract class AbstractBigQueueTest<T extends BigQueue<BytesValue>> {

protected abstract T createQueue() throws Exception;

@Test
public void enqueueAndDequeue() throws Exception {
try (T queue = createQueue()) {
BytesValue one = BytesValue.of(1);
BytesValue two = BytesValue.of(2);
BytesValue three = BytesValue.of(3);

assertThat(queue.dequeue()).isNull();

queue.enqueue(one);
queue.enqueue(two);
assertThat(queue.dequeue()).isEqualTo(one);

queue.enqueue(three);
assertThat(queue.dequeue()).isEqualTo(two);
assertThat(queue.dequeue()).isEqualTo(three);
assertThat(queue.dequeue()).isNull();
assertThat(queue.dequeue()).isNull();

queue.enqueue(three);
assertThat(queue.dequeue()).isEqualTo(three);
}
}

@Test
public void handlesConcurrentQueuing() throws Exception {
final int threadCount = 5;
final int itemsPerThread = 1000;
final T queue = createQueue();

final CountDownLatch dequeueingFinished = new CountDownLatch(1);
final CountDownLatch queuingFinished = new CountDownLatch(threadCount);

// Start thread for reading values
List<BytesValue> dequeued = new ArrayList<>();
Thread reader =
new Thread(
() -> {
while (queuingFinished.getCount() > 0 || !queue.isEmpty()) {
if (!queue.isEmpty()) {
BytesValue value = queue.dequeue();
dequeued.add(value);
}
}
dequeueingFinished.countDown();
});
reader.start();

final Function<BytesValue, Thread> queueingThreadFactory =
(value) ->
new Thread(
() -> {
try {
for (int i = 0; i < itemsPerThread; i++) {
queue.enqueue(value);
}
} finally {
queuingFinished.countDown();
}
});

// Start threads to queue values
for (int i = 0; i < threadCount; i++) {
queueingThreadFactory.apply(BytesValue.of(i)).start();
}

queuingFinished.await();
dequeueingFinished.await();

assertThat(dequeued.size()).isEqualTo(threadCount * itemsPerThread);
assertThat(dequeued.stream().filter(Objects::isNull).count()).isEqualTo(0);
assertThat(queue.size()).isEqualTo(0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.queue;

import tech.pegasys.pantheon.util.bytes.BytesValue;

public class InMemoryBigQueueTest extends AbstractBigQueueTest<InMemoryBigQueue<BytesValue>> {

@Override
protected InMemoryBigQueue<BytesValue> createQueue() throws Exception {
return new InMemoryBigQueue<>();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package tech.pegasys.pantheon.services.queue;

import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;

import java.io.IOException;

import org.junit.Rule;
import org.junit.rules.TemporaryFolder;

public class RocksDbQueueTest extends AbstractBigQueueTest<RocksDbQueue> {

@Rule public final TemporaryFolder folder = new TemporaryFolder();

@Override
protected RocksDbQueue createQueue() throws IOException {
return RocksDbQueue.create(folder.newFolder().toPath(), new NoOpMetricsSystem());
}
}
Loading

0 comments on commit 8c98388

Please sign in to comment.