From aee71b961d8e9b799020d819e370edfc51c82576 Mon Sep 17 00:00:00 2001 From: Matteo Vaccari Date: Wed, 17 Jan 2024 21:26:19 +0100 Subject: [PATCH] My own solution -- memory mapping the files, running in parallel threads, using a state machine to parse the file (#466) * Golang implementation * Speed up by avoiding copying the lines * Memory mapping * Add script for testing * Now passing most of the tests * Refactor to composed method * Now using integer math throughout * Now using a state machine for parsing! * Refactoring state names * Enabling profiling * Running in parallel! * Fully parallel! * Refactor * Improve type safety of methods * The rounding problem is due to difference between Javas and Gos printf implementation * Converting my solution to Java * Merging results * Splitting the file in several buffers * Made it parallel! * Removed test file * Removed go implementation * Removed unused files * Add header to .sh file --------- Co-authored-by: Matteo Vaccari --- calculate_average_xpmatteo.sh | 20 ++ .../onebrc/CalculateAverage_xpmatteo.java | 261 ++++++++++++++++++ 2 files changed, 281 insertions(+) create mode 100755 calculate_average_xpmatteo.sh create mode 100644 src/main/java/dev/morling/onebrc/CalculateAverage_xpmatteo.java diff --git a/calculate_average_xpmatteo.sh b/calculate_average_xpmatteo.sh new file mode 100755 index 000000000..d1cd87039 --- /dev/null +++ b/calculate_average_xpmatteo.sh @@ -0,0 +1,20 @@ +#!/bin/sh +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +JAVA_OPTS="--enable-preview" +java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_xpmatteo + diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_xpmatteo.java b/src/main/java/dev/morling/onebrc/CalculateAverage_xpmatteo.java new file mode 100644 index 000000000..94904ffb4 --- /dev/null +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_xpmatteo.java @@ -0,0 +1,261 @@ +/* + * Copyright 2023 The original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.morling.onebrc; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.TreeMap; +import java.util.stream.Collectors; + +@SuppressWarnings({ "ReassignedVariable", "StatementWithEmptyBody" }) +public class CalculateAverage_xpmatteo { + + private static final String FILE = "./measurements.txt"; + + public static void main(String[] args) throws IOException, InterruptedException { + var fileName = dataFileName(args); + + try ( + var file = new RandomAccessFile(new File(fileName), "r"); + var channel = file.getChannel()) { + var numCpus = Runtime.getRuntime().availableProcessors(); + var threads = split(channel, numCpus).stream() + .map(Worker::new) + .toList(); + threads.forEach(Thread::start); + for (Worker thread : threads) { + thread.join(); + } + var results = threads.stream().map(Worker::getResults) + .reduce(CalculateAverage_xpmatteo::merge) + .orElseThrow(); + printCities(results); + } + } + + public static class Worker extends Thread { + private final ByteBuffer buffer; + private Results results; + + public Worker(ByteBuffer buffer) { + this.buffer = buffer; + } + + @Override + public void run() { + this.results = parseData(this.buffer); + } + + public Results getResults() { + return results; + } + } + + protected static List split(FileChannel channel, int numCpus) throws IOException { + if (channel.size() < 10_000) { + return List.of(channel.map(FileChannel.MapMode.READ_ONLY, 0, channel.size())); + } + + long[] increments = new long[numCpus + 1]; + for (int i = 0; i < numCpus; i++) { + increments[i] = i * channel.size() / numCpus; + // adjust the increments so that they start on the beginning of a city + while (increments[i] > 0 && byteAt(channel, increments[i] - 1) != '\n') { + increments[i]--; + } + } + increments[numCpus] = channel.size(); + + List result = new ArrayList<>(numCpus); + for (int i = 0; i < numCpus; i++) { + long from = increments[i]; + long to = increments[i + 1]; + result.add(channel.map(FileChannel.MapMode.READ_ONLY, from, to - from)); + } + return result; + } + + private static byte byteAt(FileChannel channel, long offset) throws IOException { + ByteBuffer buf = ByteBuffer.allocate(1); + channel.position(offset); + channel.read(buf); + buf.flip(); + var bytes = new byte[1]; + buf.get(bytes); + return bytes[0]; + } + + public static String dataFileName(String[] args) { + if (args.length == 1) { + return args[0]; + } + return FILE; + } + + protected static byte[] readAllData(String fileName) throws IOException { + return Files.readAllBytes(Path.of(fileName)); + } + + protected static ByteBuffer memoryMap(String fileName) throws IOException { + try (RandomAccessFile file = new RandomAccessFile(new File(fileName), "r")) { + // Get file channel in read-only mode + FileChannel fileChannel = file.getChannel(); + + return fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileChannel.size()); + } + } + + protected enum State { + PARSING_CITY_NAME, + SKIPPING_SEMICOLON, + PARSING_TEMPERATURE + } + + protected static Results parseData(ByteBuffer data) { + var results = new Results(); + var state = State.PARSING_CITY_NAME; + int cityStartOffset = 0, cityEndOffset = 0; + int temp = 0, sign = 0; + + for (int i = 0; i < data.limit(); i++) { + byte currentChar = data.get(); + if (state == State.PARSING_CITY_NAME && currentChar == ';') { + state = State.SKIPPING_SEMICOLON; + cityEndOffset = i; + } + else if (state == State.PARSING_CITY_NAME) { + // do nothing + } + else if (state == State.SKIPPING_SEMICOLON && currentChar == '-') { + state = State.PARSING_TEMPERATURE; + temp = 0; + sign = -1; + } + else if (state == State.SKIPPING_SEMICOLON && currentChar >= '0' && currentChar <= '9') { + state = State.PARSING_TEMPERATURE; + temp = currentChar - '0'; + sign = 1; + } + else if (state == State.PARSING_TEMPERATURE && currentChar >= '0' && currentChar <= '9') { + temp = temp * 10 + currentChar - '0'; + } + else if (state == State.PARSING_TEMPERATURE && currentChar == '.') { + // do nothing + } + else if (state == State.PARSING_TEMPERATURE && currentChar == '\n') { + byte[] bytes = new byte[cityEndOffset - cityStartOffset]; + data.get(cityStartOffset, bytes); + var cityName = new String(bytes); + accumulate(results, cityName, temp * sign); + state = State.PARSING_CITY_NAME; + cityStartOffset = i + 1; + } + } + + return results; + } + + private static void accumulate(Results results, String cityName, int tempTimesTen) { + var existing = results.get(cityName); + if (existing == null) { + results.put(cityName, new CityData(tempTimesTen, tempTimesTen, tempTimesTen, 1)); + } + else { + existing.min = Math.min(existing.min, tempTimesTen); + existing.sum = existing.sum + tempTimesTen; + existing.max = Math.max(existing.max, tempTimesTen); + existing.count++; + } + } + + protected static Results merge(Results a, Results b) { + for (var entry : b.entrySet()) { + CityData valueInA = a.get(entry.getKey()); + if (null == valueInA) { + a.put(entry.getKey(), entry.getValue()); + } + else { + var valueInB = entry.getValue(); + valueInA.min = Math.min(valueInA.min, valueInB.min); + valueInA.sum += valueInB.sum; + valueInA.max = Math.max(valueInA.max, valueInB.max); + valueInA.count += valueInB.count; + } + } + + return a; + } + + protected static class Results extends TreeMap { + + } + + protected static class CityData { + int min, sum, max, count; + + public CityData(int min, int sum, int max, int count) { + this.min = min; + this.sum = sum; + this.max = max; + this.count = count; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + CityData cityData = (CityData) o; + return min == cityData.min && sum == cityData.sum && max == cityData.max && count == cityData.count; + } + + @Override + public int hashCode() { + return Objects.hash(min, sum, max, count); + } + + @Override + public String toString() { + return STR."CityData{min=\{min}, sum=\{sum}, max=\{max}, count=\{count}\{'}'}"; + } + } + + protected static void printCities(Results cities) { + System.out.print("{"); + for (String city : cities.keySet()) { + CityData data = cities.get(city); + var min = data.min / 10.0; + var mean = (data.sum * 10.0 / data.count) / 100.0; + var max = data.max / 10.0; + System.out.printf( + "%s=%.1f/%.1f/%.1f, ", + city, + min, + mean, + max); + } + System.out.print("}"); + } +}