Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
Don't delete completed tasks from RocksDbTaskQueue (#1099)
Browse files Browse the repository at this point in the history
  • Loading branch information
ajsutton authored Mar 14, 2019
1 parent 88521e2 commit 1202c21
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,6 @@ private static CachingTaskCollection<NodeDataRequest> createWorldStateDownloader
"Pending request cache size for fast sync world state download",
taskCollection::cacheSize);

// We're using the CachingTaskCollection which isn't designed to reliably persist all
// added tasks. We therefore can't resume from previously added tasks.
// So for now, clear tasks when we start up.
taskCollection.clear();

return taskCollection;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.nio.file.Path;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -38,7 +37,6 @@ public class RocksDbTaskQueue<T> implements TaskCollection<T> {

private long lastEnqueuedKey = 0;
private long lastDequeuedKey = 0;
private long oldestKey = 0;
private RocksIterator dequeueIterator;
private long lastValidKeyFromIterator;
private final Set<RocksDbTask<T>> outstandingTasks = new HashSet<>();
Expand All @@ -60,7 +58,9 @@ private RocksDbTaskQueue(
this.deserializer = deserializer;
try {
RocksDbUtil.loadNativeLibrary();
options = new Options().setCreateIfMissing(true);
// We don't support reloading data so ensure we're starting from a clean slate.
RocksDB.destroyDB(storageDirectory.toString(), new Options());
options = new Options().setCreateIfMissing(true).setErrorIfExists(true);
db = RocksDB.open(options, storageDirectory.toString());

enqueueLatency =
Expand All @@ -74,29 +74,11 @@ private RocksDbTaskQueue(
"dequeue_latency_seconds",
"Latency for dequeuing an item.");

// Initialize queue from existing db
initializeQueue();
} catch (final RocksDBException e) {
throw new StorageException(e);
}
}

private void initializeQueue() {
RocksIterator iter = db.newIterator();
iter.seekToFirst();
if (!iter.isValid()) {
// There is no data yet, nothing to do
return;
}
long firstKey = Longs.fromByteArray(iter.key());
iter.seekToLast();
long lastKey = Longs.fromByteArray(iter.key());

lastDequeuedKey = firstKey - 1;
oldestKey = firstKey;
lastEnqueuedKey = lastKey;
}

public static <T> RocksDbTaskQueue<T> create(
final Path storageDirectory,
final Function<T, BytesValue> serializer,
Expand Down Expand Up @@ -167,7 +149,7 @@ public synchronized boolean isEmpty() {
public synchronized void clear() {
assertNotClosed();
outstandingTasks.clear();
final byte[] from = Longs.toByteArray(oldestKey);
final byte[] from = Longs.toByteArray(0);
final byte[] to = Longs.toByteArray(lastEnqueuedKey + 1);
try {
db.deleteRange(from, to);
Expand All @@ -177,7 +159,6 @@ public synchronized void clear() {
}
lastDequeuedKey = 0;
lastEnqueuedKey = 0;
oldestKey = 0;
} catch (final RocksDBException e) {
throw new StorageException(e);
}
Expand All @@ -188,26 +169,6 @@ public synchronized boolean allTasksCompleted() {
return isEmpty() && outstandingTasks.isEmpty();
}

private synchronized void deleteCompletedTasks() {
final long oldestOutstandingKey =
outstandingTasks.stream()
.min(Comparator.comparingLong(RocksDbTask::getKey))
.map(RocksDbTask::getKey)
.orElse(lastDequeuedKey + 1);

if (oldestKey < oldestOutstandingKey) {
// Delete all contiguous completed tasks
final byte[] fromKey = Longs.toByteArray(oldestKey);
final byte[] toKey = Longs.toByteArray(oldestOutstandingKey);
try {
db.deleteRange(fromKey, toKey);
oldestKey = oldestOutstandingKey;
} catch (final RocksDBException e) {
throw new StorageException(e);
}
}
}

@Override
public synchronized void close() {
if (closed) {
Expand All @@ -228,11 +189,7 @@ private void assertNotClosed() {
}

private synchronized boolean markTaskCompleted(final RocksDbTask<T> task) {
if (outstandingTasks.remove(task)) {
deleteCompletedTasks();
return true;
}
return false;
return outstandingTasks.remove(task);
}

private synchronized void handleFailedTask(final RocksDbTask<T> task) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
*/
package tech.pegasys.pantheon.services.tasks;

import static org.assertj.core.api.Assertions.assertThat;

import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;
import tech.pegasys.pantheon.util.bytes.BytesValue;

Expand All @@ -22,7 +20,6 @@
import java.util.function.Function;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class RocksDbTaskQueueTest extends AbstractTaskQueueTest<RocksDbTaskQueue<BytesValue>> {
Expand All @@ -39,43 +36,4 @@ private RocksDbTaskQueue<BytesValue> createQueue(final Path dataDir) {
return RocksDbTaskQueue.create(
dataDir, Function.identity(), Function.identity(), new NoOpMetricsSystem());
}

@Test
public void shouldResumeFromExistingQueue() throws Exception {
testResumeFromExistingQueue(10);
}

@Test
public void shouldResumeFromExistingQueueWithOneElement() throws Exception {
testResumeFromExistingQueue(1);
}

@Test
public void shouldResumeFromExistingQueueWithNoElements() throws Exception {
testResumeFromExistingQueue(0);
}

private void testResumeFromExistingQueue(final int elementCount) throws Exception {
final Path dataDir = folder.newFolder().toPath();
try (final RocksDbTaskQueue<BytesValue> queue = createQueue(dataDir)) {
for (int i = 0; i < elementCount; i++) {
queue.add(BytesValue.of(i));
}
}

try (final RocksDbTaskQueue<BytesValue> resumedQueue = createQueue(dataDir)) {
assertThat(resumedQueue.size()).isEqualTo(elementCount);
// Queue an additional element
resumedQueue.add(BytesValue.of(99));
assertThat(resumedQueue.size()).isEqualTo(elementCount + 1);

// Check that everything dequeues in order as expected
for (int i = 0; i < elementCount; i++) {
assertThat(resumedQueue.remove().getData()).isEqualTo(BytesValue.of(i));
}
assertThat(resumedQueue.remove().getData()).isEqualTo(BytesValue.of(99));

assertThat(resumedQueue.size()).isEqualTo(0);
}
}
}

0 comments on commit 1202c21

Please sign in to comment.