Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel temp feature writes using --write-threads argument #213

Merged
merged 5 commits into from
May 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/performance.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,14 @@ jobs:

- name: 'Run branch'
run: |
rm -f data/out.mbtiles
rm -rf data/out.mbtiles data/tmp
cp branch/planetiler-dist/target/*with-deps.jar run.jar
java -Xms${{ env.RAM }} -Xmx${{ env.RAM }} -jar run.jar --area="${{ env.AREA }}" "${{ env.BOUNDS_ARG }}" --mbtiles=data/out.mbtiles 2>&1 | tee log
ls -alh run.jar | tee -a log
cat log | strip-ansi > build-info/branchlogs.txt
- name: 'Run base'
run: |
rm -f data/out.mbtiles
rm -rf data/out.mbtiles data/tmp
cp base/planetiler-dist/target/*with-deps.jar run.jar
java -Xms${{ env.RAM }} -Xmx${{ env.RAM }} -jar run.jar --area="${{ env.AREA }}" "${{ env.BOUNDS_ARG }}" --mbtiles=data/out.mbtiles 2>&1 | tee log
ls -alh run.jar | tee -a log
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ public static void main(String[] args) {
var config = PlanetilerConfig.defaults();
try {
List<Results> results = new ArrayList<>();
for (int limit : List.of(500_000_000, 2_000_000_000)) {
results.add(run(path, number, limit, false, true, true, config));
results.add(run(path, number, limit, true, true, true, config));
int limit = 2_000_000_000;
for (int writers : List.of(1, 2, 4)) {
results.add(run(path, writers, number, limit, false, true, true, config));
results.add(run(path, writers, number, limit, true, true, true, config));
}
for (var result : results) {
System.err.println(result);
Expand All @@ -51,17 +52,18 @@ public static void main(String[] args) {
}
}


private record Results(
String write, String read, String sort,
int chunks, long items, int chunkSizeLimit, boolean gzip, boolean mmap, boolean parallelSort,
int chunks,
int writeWorkers, int readWorkers,
long items, int chunkSizeLimit, boolean gzip, boolean mmap, boolean parallelSort,
boolean madvise
) {}

private static Results run(Path tmpDir, long items, int chunkSizeLimit, boolean mmap, boolean parallelSort,
private static Results run(Path tmpDir, int writeWorkers, long items, int chunkSizeLimit, boolean mmap,
boolean parallelSort,
boolean madvise, PlanetilerConfig config) {
boolean gzip = false;
int writeWorkers = 1;
int sortWorkers = Runtime.getRuntime().availableProcessors();
int readWorkers = 1;
FileUtils.delete(tmpDir);
Expand All @@ -86,6 +88,8 @@ private static Results run(Path tmpDir, long items, int chunkSizeLimit, boolean
FORMAT.numeric(items * NANOSECONDS_PER_SECOND / readTimer.elapsed().wall().toNanos()) + "/s",
FORMAT.duration(sortTimer.elapsed().wall()),
sorter.chunks(),
writeWorkers,
readWorkers,
items,
chunkSizeLimit,
gzip,
Expand Down Expand Up @@ -116,12 +120,14 @@ private static void doReads(int readWorkers, long items, ExternalMergeSort sorte
private static void doWrites(int writeWorkers, long items, ExternalMergeSort sorter) {
var counters = Counter.newMultiThreadCounter();
var writer = new Worker("write", Stats.inMemory(), writeWorkers, () -> {
var counter = counters.counterForThread();
var random = ThreadLocalRandom.current();
long toWrite = items / writeWorkers;
for (long i = 0; i < toWrite; i++) {
sorter.add(new SortableFeature(random.nextLong(), TEST_DATA));
counter.inc();
try (var writerForThread = sorter.writerForThread()) {
var counter = counters.counterForThread();
var random = ThreadLocalRandom.current();
long toWrite = items / writeWorkers;
for (long i = 0; i < toWrite; i++) {
writerForThread.accept(new SortableFeature(random.nextLong(), TEST_DATA));
counter.inc();
}
}
});
ProgressLoggers loggers = ProgressLoggers.create()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
import com.onthegomap.planetiler.stats.ProgressLoggers;
import com.onthegomap.planetiler.stats.Stats;
import com.onthegomap.planetiler.stats.Timer;
import com.onthegomap.planetiler.util.BinPack;
import com.onthegomap.planetiler.util.ByteBufferUtil;
import com.onthegomap.planetiler.util.CloseableConusmer;
import com.onthegomap.planetiler.util.FileUtils;
import com.onthegomap.planetiler.worker.WorkerPipeline;
import java.io.BufferedInputStream;
Expand All @@ -26,14 +28,17 @@
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.PriorityQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.zip.Deflater;
Expand Down Expand Up @@ -63,7 +68,8 @@ class ExternalMergeSort implements FeatureSort {
private final int chunkSizeLimit;
private final int workers;
private final AtomicLong features = new AtomicLong(0);
private final List<Chunk> chunks = new ArrayList<>();
private final List<Chunk> chunks = new CopyOnWriteArrayList<>();
private final AtomicInteger chunkNum = new AtomicInteger(0);
private final boolean gzip;
private final PlanetilerConfig config;
private final int readerLimit;
Expand All @@ -72,7 +78,6 @@ class ExternalMergeSort implements FeatureSort {
private final boolean parallelSort;
private final boolean madvise;
private final AtomicBoolean madviseFailed = new AtomicBoolean(false);
private Chunk currentChunk;
private volatile boolean sorted = false;

ExternalMergeSort(Path tempDir, PlanetilerConfig config, Stats stats) {
Expand Down Expand Up @@ -118,7 +123,6 @@ class ExternalMergeSort implements FeatureSort {
try {
FileUtils.deleteDirectory(dir);
Files.createDirectories(dir);
newChunk();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand All @@ -134,17 +138,8 @@ private static <T> T time(AtomicLong total, Supplier<T> func) {
}

@Override
public void add(SortableFeature item) {
try {
assert !sorted;
features.incrementAndGet();
currentChunk.add(item);
if (currentChunk.bytesInMemory > chunkSizeLimit) {
newChunk();
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
public CloseableConusmer<SortableFeature> writerForThread() {
return new ThreadLocalWriter();
}

@Override
Expand All @@ -160,9 +155,9 @@ public long estimateMemoryUsageBytes() {
@Override
public void sort() {
assert !sorted;
if (currentChunk != null) {
for (var chunk : chunks) {
try {
currentChunk.close();
chunk.close();
} catch (IOException e) {
// ok
}
Expand All @@ -175,12 +170,32 @@ public void sort() {
AtomicLong sorting = new AtomicLong(0);
AtomicLong doneCounter = new AtomicLong(0);

// we may end up with many small chunks because each thread-local writer starts a new one
// so group together smaller chunks that can be sorted together in-memory to minimize the
// number of chunks that the reader needs to deal with
List<List<ExternalMergeSort.Chunk>> groups = BinPack.pack(
chunks,
chunkSizeLimit,
chunk -> chunk.bytesInMemory
);

LOGGER.info("Grouped {} chunks into {}", chunks.size(), groups.size());

var pipeline = WorkerPipeline.start("sort", stats)
.readFromTiny("item_queue", chunks)
.sinkToConsumer("worker", workers, chunk -> {
.readFromTiny("item_queue", groups)
.sinkToConsumer("worker", workers, group -> {
try {
readSemaphore.acquire();
var toSort = time(reading, chunk::readAll);
var chunk = group.get(0);
var others = group.stream().skip(1).toList();
var toSort = time(reading, () -> {
// merge all chunks into first one, and remove the others
var result = chunk.readAllAndMergeIn(others);
for (var other : others) {
other.remove();
}
return result;
});
readSemaphore.release();

time(sorting, toSort::sort);
Expand Down Expand Up @@ -223,6 +238,10 @@ public long numFeaturesWritten() {
public Iterator<SortableFeature> iterator() {
assert sorted;

if (chunks.isEmpty()) {
return Collections.emptyIterator();
}

// k-way merge to interleave all the sorted chunks
PriorityQueue<Reader<?>> queue = new PriorityQueue<>(chunks.size());
for (Chunk chunk : chunks) {
Expand Down Expand Up @@ -250,15 +269,6 @@ public SortableFeature next() {
};
}

private void newChunk() throws IOException {
Path chunkPath = dir.resolve("chunk" + (chunks.size() + 1));
chunkPath.toFile().deleteOnExit();
if (currentChunk != null) {
currentChunk.close();
}
chunks.add(currentChunk = new Chunk(chunkPath));
}

public int chunks() {
return chunks.size();
}
Expand Down Expand Up @@ -400,6 +410,50 @@ public final int compareTo(T o) {
abstract SortableFeature readNextFeature();
}

/** Writer that a single thread can use to write features independent of writers used in other threads. */
@NotThreadSafe
private class ThreadLocalWriter implements CloseableConusmer<SortableFeature> {
private Chunk currentChunk;

private ThreadLocalWriter() {
try {
newChunk();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
public void accept(SortableFeature item) {
assert !sorted;
try {
features.incrementAndGet();
currentChunk.add(item);
if (currentChunk.bytesInMemory > chunkSizeLimit) {
newChunk();
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private void newChunk() throws IOException {
Path chunkPath = dir.resolve("chunk" + chunkNum.incrementAndGet());
FileUtils.deleteOnExit(chunkPath);
if (currentChunk != null) {
currentChunk.close();
}
chunks.add(currentChunk = new Chunk(chunkPath));
}

@Override
public void close() throws IOException {
if (currentChunk != null) {
currentChunk.close();
}
}
}

/** Write features to the chunk file through a memory-mapped file. */
private class WriterMmap implements Writer {
private final FileChannel channel;
Expand Down Expand Up @@ -467,19 +521,34 @@ public void add(SortableFeature entry) throws IOException {
itemCount++;
}

private SortableChunk readAll() {
try (var iterator = newReader()) {
SortableFeature[] featuresToSort = new SortableFeature[itemCount];
int i = 0;
while (iterator.hasNext()) {
featuresToSort[i] = iterator.next();
i++;
private SortableChunk readAllAndMergeIn(Collection<Chunk> others) {
// first, grow this chunk
int newItems = itemCount;
int newBytes = bytesInMemory;
for (var other : others) {
if (Integer.MAX_VALUE - newItems < other.itemCount) {
throw new IllegalStateException("Too many items in merged chunk: " + itemCount + "+" +
others.stream().map(c -> c.itemCount).toList());
}
if (i != itemCount) {
throw new IllegalStateException("Expected " + itemCount + " features in " + path + " got " + i);
if (Integer.MAX_VALUE - newBytes < other.bytesInMemory) {
throw new IllegalStateException("Too big merged chunk: " + bytesInMemory + "+" +
others.stream().map(c -> c.bytesInMemory).toList());
}
return new SortableChunk(featuresToSort);
newItems += other.itemCount;
newBytes += other.bytesInMemory;
}
// then read items from all chunks into memory
SortableChunk result = new SortableChunk(newItems);
result.readAll(this);
itemCount = newItems;
bytesInMemory = newBytes;
for (var other : others) {
result.readAll(other);
}
if (result.i != itemCount) {
throw new IllegalStateException("Expected " + itemCount + " features in " + path + " got " + result.i);
}
return result;
}

private Writer newWriter(Path path) {
Expand All @@ -495,15 +564,21 @@ public void close() throws IOException {
writer.close();
}

public void remove() {
chunks.remove(this);
FileUtils.delete(path);
}

/**
* A container for all features in a chunk read into memory for sorting.
*/
private class SortableChunk {

private SortableFeature[] featuresToSort;
private int i = 0;

private SortableChunk(SortableFeature[] featuresToSort) {
this.featuresToSort = featuresToSort;
private SortableChunk(int itemCount) {
this.featuresToSort = new SortableFeature[itemCount];
}

public SortableChunk sort() {
Expand All @@ -526,6 +601,14 @@ public SortableChunk flush() {
throw new UncheckedIOException(e);
}
}

private void readAll(Chunk chunk) {
try (var iterator = chunk.newReader()) {
while (iterator.hasNext()) {
featuresToSort[i++] = iterator.next();
}
}
}
}
}

Expand Down
Loading