Skip to content

Commit

Permalink
Spark 3.3: Support bucket in FunctionCatalog (apache#5513)
Browse files Browse the repository at this point in the history
  • Loading branch information
kbendick authored Aug 18, 2022
1 parent 1741e4d commit 69bcf05
Show file tree
Hide file tree
Showing 6 changed files with 788 additions and 31 deletions.
40 changes: 9 additions & 31 deletions api/src/main/java/org/apache/iceberg/transforms/Bucket.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Set;
import java.util.UUID;
import org.apache.iceberg.expressions.BoundPredicate;
Expand All @@ -38,6 +37,7 @@
import org.apache.iceberg.relocated.com.google.common.hash.Hashing;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.BucketUtil;

abstract class Bucket<T> implements Transform<T, Integer> {
private static final HashFunction MURMUR3 = Hashing.murmur3_32_fixed();
Expand Down Expand Up @@ -166,7 +166,7 @@ private BucketInteger(int numBuckets) {

@Override
public int hash(Integer value) {
return MURMUR3.hashLong(value.longValue()).asInt();
return BucketUtil.hash(value);
}

@Override
Expand All @@ -182,7 +182,7 @@ private BucketLong(int numBuckets) {

@Override
public int hash(Long value) {
return MURMUR3.hashLong(value).asInt();
return BucketUtil.hash(value);
}

@Override
Expand All @@ -202,7 +202,7 @@ static class BucketFloat extends Bucket<Float> {

@Override
public int hash(Float value) {
return MURMUR3.hashLong(Double.doubleToLongBits((double) value)).asInt();
return BucketUtil.hash(value);
}

@Override
Expand All @@ -220,7 +220,7 @@ static class BucketDouble extends Bucket<Double> {

@Override
public int hash(Double value) {
return MURMUR3.hashLong(Double.doubleToLongBits(value)).asInt();
return BucketUtil.hash(value);
}

@Override
Expand All @@ -236,7 +236,7 @@ private BucketString(int numBuckets) {

@Override
public int hash(CharSequence value) {
return MURMUR3.hashString(value, StandardCharsets.UTF_8).asInt();
return BucketUtil.hash(value);
}

@Override
Expand All @@ -254,24 +254,7 @@ private BucketByteBuffer(int numBuckets) {

@Override
public int hash(ByteBuffer value) {
if (value.hasArray()) {
return MURMUR3
.hashBytes(
value.array(),
value.arrayOffset() + value.position(),
value.arrayOffset() + value.remaining())
.asInt();
} else {
int position = value.position();
byte[] copy = new byte[value.remaining()];
try {
value.get(copy);
} finally {
// make sure the buffer position is unchanged
value.position(position);
}
return MURMUR3.hashBytes(copy).asInt();
}
return BucketUtil.hash(value);
}

@Override
Expand All @@ -287,12 +270,7 @@ private BucketUUID(int numBuckets) {

@Override
public int hash(UUID value) {
return MURMUR3
.newHasher(16)
.putLong(Long.reverseBytes(value.getMostSignificantBits()))
.putLong(Long.reverseBytes(value.getLeastSignificantBits()))
.hash()
.asInt();
return BucketUtil.hash(value);
}

@Override
Expand All @@ -308,7 +286,7 @@ private BucketDecimal(int numBuckets) {

@Override
public int hash(BigDecimal value) {
return MURMUR3.hashBytes(value.unscaledValue().toByteArray()).asInt();
return BucketUtil.hash(value);
}

@Override
Expand Down
88 changes: 88 additions & 0 deletions api/src/main/java/org/apache/iceberg/util/BucketUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.util;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import org.apache.iceberg.relocated.com.google.common.hash.HashFunction;
import org.apache.iceberg.relocated.com.google.common.hash.Hashing;

/**
* Contains the logic for hashing various types for use with the {@code bucket} partition
* transformations
*/
public class BucketUtil {

private static final HashFunction MURMUR3 = Hashing.murmur3_32_fixed();

private BucketUtil() {}

public static int hash(int value) {
return MURMUR3.hashLong((long) value).asInt();
}

public static int hash(long value) {
return MURMUR3.hashLong(value).asInt();
}

public static int hash(float value) {
return MURMUR3.hashLong(Double.doubleToLongBits((double) value)).asInt();
}

public static int hash(double value) {
return MURMUR3.hashLong(Double.doubleToLongBits(value)).asInt();
}

public static int hash(CharSequence value) {
return MURMUR3.hashString(value, StandardCharsets.UTF_8).asInt();
}

public static int hash(ByteBuffer value) {
if (value.hasArray()) {
return MURMUR3
.hashBytes(value.array(), value.arrayOffset() + value.position(), value.remaining())
.asInt();
} else {
int position = value.position();
byte[] copy = new byte[value.remaining()];
try {
value.get(copy);
} finally {
// make sure the buffer position is unchanged
value.position(position);
}
return MURMUR3.hashBytes(copy).asInt();
}
}

public static int hash(UUID value) {
return MURMUR3
.newHasher(16)
.putLong(Long.reverseBytes(value.getMostSignificantBits()))
.putLong(Long.reverseBytes(value.getLeastSignificantBits()))
.hash()
.asInt();
}

public static int hash(BigDecimal value) {
return MURMUR3.hashBytes(value.unscaledValue().toByteArray()).asInt();
}
}
19 changes: 19 additions & 0 deletions api/src/test/java/org/apache/iceberg/transforms/TestBucketing.java
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,25 @@ public void testByteBufferOnHeap() {
Assert.assertEquals("Buffer limit should not change", 105, buffer.limit());
}

@Test
public void testByteBufferOnHeapArrayOffset() {
byte[] bytes = randomBytes(128);
ByteBuffer raw = ByteBuffer.wrap(bytes, 5, 100);
ByteBuffer buffer = raw.slice();
Assert.assertEquals("Buffer arrayOffset should be 5", 5, buffer.arrayOffset());

Bucket<ByteBuffer> bucketFunc = Bucket.get(Types.BinaryType.get(), 100);

Assert.assertEquals(
"HeapByteBuffer hash should match hash for correct slice",
hashBytes(bytes, 5, 100),
bucketFunc.hash(buffer));

// verify that the buffer was not modified
Assert.assertEquals("Buffer position should be 0", 0, buffer.position());
Assert.assertEquals("Buffer limit should not change", 100, buffer.limit());
}

@Test
public void testByteBufferOffHeap() {
byte[] bytes = randomBytes(128);
Expand Down
Loading

0 comments on commit 69bcf05

Please sign in to comment.