Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add linl33 v2 #678

Merged
merged 1 commit into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,13 @@ jobs:
id: sdkman

- name: 'Build project'
shell: bash
run: |
source "$HOME/.sdkman/bin/sdkman-init.sh"
if [ -f ${{ format('src/main/java-22/dev/morling/onebrc/CalculateAverage_{0}.java', github.event.pull_request.user.login || '') }} ]; then
sdk install java 22.ea.32-open || true
sdk use java 22.ea.32-open
fi
Comment on lines +59 to +62
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be needed, the evaluate.sh script takes care of installing all required JDKs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_ci.sh installs the JDKs but it runs after maven so without that line, my class under java-22/ wouldn't be compiled.

Here's the original run without the change
https://github.com/gunnarmorling/1brc/actions/runs/7724135717/job/21055662835

./mvnw --version
./mvnw -B clean verify -Pci

Expand All @@ -63,5 +68,3 @@ jobs:
run: |
./test_ci.sh ${{ github.event.pull_request.user.login }}
if: github.event_name == 'pull_request'


3 changes: 1 addition & 2 deletions prepare_linl33.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@


source "$HOME/.sdkman/bin/sdkman-init.sh"
# TODO: bump to ea 32 when available
sdk use java 22.ea.31-open 1>&2
sdk use java 22.ea.32-open 1>&2

CLASS_NAME="CalculateAverage_linl33"

Expand Down
103 changes: 51 additions & 52 deletions src/main/java-22/dev/morling/onebrc/CalculateAverage_linl33.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public static void main() throws InterruptedException, IOException {
final var inputMapped = channel.map(FileChannel.MapMode.READ_ONLY, 0, channel.size(), Arena.global());

final var chunkBounds = calcChunkBounds(inputMapped.address(), inputMapped.byteSize());
final var maps = new SparseMap[N_THREADS];
final var maps = new HashTable[N_THREADS];

try (final var threadPool = Executors.newFixedThreadPool(N_THREADS, THREAD_BUILDER.factory());
final var singleThreadExecutor = Executors.newSingleThreadExecutor(Thread.ofVirtual().factory())) {
Expand Down Expand Up @@ -104,11 +104,12 @@ private static long[] calcChunkBounds(final long mappedAddr, final long fileSize
return chunkBounds;
}

private static void printSorted(final SparseMap temperatureMeasurements) {
private static void printSorted(final HashTable temperatureMeasurements) {
final var weatherStations = new AggregatedMeasurement[(int) temperatureMeasurements.size];
final var nameBuffer = new byte[WEATHER_STATION_LENGTH_MAX];
var offset = temperatureMeasurements.denseAddress;
for (int i = 0; i < weatherStations.length; i++, offset += SparseMap.DATA_SCALE * Long.BYTES) {

for (int i = 0; i < weatherStations.length; i++) {
final var offset = temperatureMeasurements.getOffset(i);
final var nameAddr = UNSAFE.getLong(offset);
final var nameLength = UNSAFE.getInt(offset + Integer.BYTES * 7);
MemorySegment.copy(ALL, ValueLayout.JAVA_BYTE, nameAddr, nameBuffer, 0, nameLength);
Expand All @@ -129,8 +130,8 @@ private static void printSorted(final SparseMap temperatureMeasurements) {
}

private static void printAggMeasurement(final AggregatedMeasurement aggMeasurement,
final SparseMap temperatureMeasurements) {
final var offset = temperatureMeasurements.denseAddress + SparseMap.DATA_SCALE * Long.BYTES * aggMeasurement.id();
final HashTable temperatureMeasurements) {
final var offset = temperatureMeasurements.getOffset(aggMeasurement.id());

// name
System.out.print(aggMeasurement.name());
Expand Down Expand Up @@ -162,15 +163,15 @@ private static double round(final double d) {
private static class CalculateAverageTask implements Runnable {
public static final int BATCH_SIZE_BYTES = BYTE_SPECIES.vectorByteSize();

private final SparseMap[] maps;
private final HashTable[] maps;
private final long[] chunkBounds;
private final long chunkStart;
private final long chunkEnd;
private final int t;

private SparseMap map;
private HashTable map;

public CalculateAverageTask(SparseMap[] maps, long[] chunkBounds, int t) {
public CalculateAverageTask(HashTable[] maps, long[] chunkBounds, int t) {
this.maps = maps;
this.chunkBounds = chunkBounds;
this.chunkStart = chunkBounds[t];
Expand All @@ -180,7 +181,7 @@ public CalculateAverageTask(SparseMap[] maps, long[] chunkBounds, int t) {

@Override
public void run() {
this.maps[this.t] = new SparseMap();
this.maps[this.t] = new HashTable();
this.map = this.maps[this.t];

var lineStart = this.chunkBounds[0];
Expand All @@ -192,8 +193,8 @@ public void run() {
}
}

final var vectorLimit = this.chunkStart + ((this.chunkEnd - this.chunkStart) & -BYTE_SPECIES.vectorByteSize());
for (long i = this.chunkStart; i < vectorLimit; i += BYTE_SPECIES.vectorByteSize()) {
final var vectorLimit = this.chunkStart + ((this.chunkEnd - this.chunkStart) & -BATCH_SIZE_BYTES);
for (long i = this.chunkStart; i < vectorLimit; i += BATCH_SIZE_BYTES) {
var lfMask = ByteVector.fromMemorySegment(BYTE_SPECIES, ALL, i, ByteOrder.nativeOrder())
.eq((byte) '\n')
.toLong();
Expand Down Expand Up @@ -272,31 +273,34 @@ private void processLine(final long lineStart, final long lfAddress) {
/**
* Open addressing, linear probing hash map backed by off-heap memory
*/
private static class SparseMap {
private static class HashTable {
private static final int TRUNCATED_HASH_BITS = 26;
// max # of unique keys
private static final long DENSE_SIZE = WEATHER_STATION_DISTINCT_MAX;
// max hash code (exclusive)
private static final long SPARSE_SIZE = 1L << (TRUNCATED_HASH_BITS + 1);
private static final long DATA_SCALE = 4;
public static final long SPARSE_SCALE = 32;
public static final long DENSE_SCALE = 8;

public final long sparseAddress;
public final long denseAddress;
public long size;

public SparseMap() {
public HashTable() {
var arena = new MallocArena(Arena.global());
var callocArena = new CallocArena(Arena.global());

this.size = 0L;

final var sparse = callocArena.allocate(ValueLayout.JAVA_LONG, SPARSE_SIZE);
final var sparse = callocArena.allocate(ValueLayout.JAVA_BYTE, SPARSE_SIZE * SPARSE_SCALE);
this.sparseAddress = (sparse.address() + MallocArena.MAX_ALIGN) & -MallocArena.MAX_ALIGN;

final var dense = arena.allocate(ValueLayout.JAVA_LONG, DENSE_SIZE * DATA_SCALE);
final var dense = arena.allocate(ValueLayout.JAVA_BYTE, DENSE_SIZE * DENSE_SCALE);
this.denseAddress = (dense.address() + MallocArena.MAX_ALIGN) & -MallocArena.MAX_ALIGN;
}

public long getOffset(final long index) {
return UNSAFE.getLong(this.denseAddress + index * DENSE_SCALE);
}

public void putEntry(final long keyAddress, final int keyLength, final int value) {
final var hash = hash(keyAddress, keyLength);
this.putEntryInternal(hash, keyAddress, keyLength, value, 1, value, value);
Expand All @@ -309,43 +313,46 @@ private void putEntryInternal(final long hash,
final int count,
final int temperatureMin,
final int temperatureMax) {
final var sparseOffset = this.sparseAddress + truncateHash(hash) * Long.BYTES;
final var sparseOffset = this.sparseAddress + truncateHash(hash) * SPARSE_SCALE;

for (long n = 0, sparseLinearOffset = sparseOffset; n < WEATHER_STATION_DISTINCT_MAX; n++, sparseLinearOffset += SPARSE_SCALE) {
final var entryKeyAddress = UNSAFE.getLong(sparseLinearOffset);

for (long n = 0, sparseLinearOffset = sparseOffset; n < WEATHER_STATION_DISTINCT_MAX; n++, sparseLinearOffset += Long.BYTES) {
final var denseOffset = UNSAFE.getLong(sparseLinearOffset);
if (denseOffset == 0L) {
if (entryKeyAddress == 0L) {
this.add(sparseLinearOffset, keyAddress, keyLength, temperature, count, temperatureMin, temperatureMax);
this.size++;
return;
}

if (isCollision(keyAddress, keyLength, denseOffset)) {
if (mismatch(keyAddress, entryKeyAddress, keyLength)) {
continue;
}

final var currTotal = UNSAFE.getLong(denseOffset + Integer.BYTES * 2);
UNSAFE.putLong(denseOffset + Integer.BYTES * 2, currTotal + temperature); // total
final var currMin = UNSAFE.getInt(sparseLinearOffset + Integer.BYTES * 5);
final var currMax = UNSAFE.getInt(sparseLinearOffset + Integer.BYTES * 6);
final var currTotal = UNSAFE.getLong(sparseLinearOffset + Integer.BYTES * 2);
final var currCount = UNSAFE.getInt(sparseLinearOffset + Integer.BYTES * 4);

final var currCount = UNSAFE.getInt(denseOffset + Integer.BYTES * 4);
UNSAFE.putInt(denseOffset + Integer.BYTES * 4, currCount + count); // count
UNSAFE.putLong(sparseLinearOffset + Integer.BYTES * 2, currTotal + temperature);
UNSAFE.putInt(sparseLinearOffset + Integer.BYTES * 4, currCount + count);

final var currMin = UNSAFE.getInt(denseOffset + Integer.BYTES * 5);
if (temperatureMin < currMin) {
UNSAFE.putInt(denseOffset + Integer.BYTES * 5, temperatureMin); // min
UNSAFE.putInt(sparseLinearOffset + Integer.BYTES * 5, temperatureMin);
}

final var currMax = UNSAFE.getInt(denseOffset + Integer.BYTES * 6);
if (temperatureMax > currMax) {
UNSAFE.putInt(denseOffset + Integer.BYTES * 6, temperatureMax); // max
UNSAFE.putInt(sparseLinearOffset + Integer.BYTES * 6, temperatureMax);
}

return;
}
}

public void merge(final SparseMap other) {
public void merge(final HashTable other) {
final var otherSize = other.size;
for (long i = 0, offset = other.denseAddress; i < otherSize; i++, offset += DATA_SCALE * Long.BYTES) {
for (long i = 0; i < otherSize; i++) {
final var offset = other.getOffset(i);

final var keyAddress = UNSAFE.getLong(offset);
final var keyLength = UNSAFE.getInt(offset + Integer.BYTES * 7);
final var hash = hash(keyAddress, keyLength);
Expand All @@ -369,22 +376,15 @@ private void add(final long sparseOffset,
final int temperatureMin,
final int temperatureMax) {
// new entry, initialize sparse and dense
final var denseOffset = this.denseAddress + this.size * DATA_SCALE * Long.BYTES;
UNSAFE.putLong(sparseOffset, denseOffset);

UNSAFE.putLong(denseOffset, keyAddress);
UNSAFE.putLong(denseOffset + Integer.BYTES * 2, temperature);
UNSAFE.putInt(denseOffset + Integer.BYTES * 4, count);
UNSAFE.putInt(denseOffset + Integer.BYTES * 5, temperatureMin);
UNSAFE.putInt(denseOffset + Integer.BYTES * 6, temperatureMax);
UNSAFE.putInt(denseOffset + Integer.BYTES * 7, keyLength);
}

private static boolean isCollision(final long keyAddress, final int keyLength, final long denseOffset) {
// key length compare is unnecessary

final var entryKeyAddress = UNSAFE.getLong(denseOffset);
return mismatch(keyAddress, entryKeyAddress, keyLength);
final var denseOffset = this.denseAddress + this.size * DENSE_SCALE;
UNSAFE.putLong(denseOffset, sparseOffset);

UNSAFE.putLong(sparseOffset, keyAddress);
UNSAFE.putLong(sparseOffset + Integer.BYTES * 2, temperature);
UNSAFE.putInt(sparseOffset + Integer.BYTES * 4, count);
UNSAFE.putInt(sparseOffset + Integer.BYTES * 5, temperatureMin);
UNSAFE.putInt(sparseOffset + Integer.BYTES * 6, temperatureMax);
UNSAFE.putInt(sparseOffset + Integer.BYTES * 7, keyLength);
}

private static boolean mismatch(final long leftAddr, final long rightAddr, final int length) {
Expand All @@ -404,8 +404,7 @@ private static boolean mismatch(final long leftAddr, final long rightAddr, final
final var r = ByteVector.fromMemorySegment(BYTE_SPECIES, ALL, rightAddr + loopBound, ByteOrder.nativeOrder());
final var eqMask = l.eq(r).toLong();

// LE compare to add 1 to length
return Long.numberOfTrailingZeros(~eqMask) <= (length - loopBound);
return Long.numberOfTrailingZeros(~eqMask) < ((length + 1) & (BYTE_SPECIES.vectorByteSize() - 1));
// to support platforms without TZCNT, the check can be replaced with
// a comparison to lowestZero = ~eqMask & (eqMask + 1)
}
Expand Down
Loading