Skip to content

Commit 611dcd2

Browse files
committed
[ARROW-6113][Java] Support vector deduplicate function
1 parent a40d6b6 commit 611dcd2

File tree

4 files changed

+472
-0
lines changed

4 files changed

+472
-0
lines changed
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.arrow.algorithm.deduplicate;
19+
20+
import org.apache.arrow.util.DataSizeRoundingUtil;
21+
import org.apache.arrow.util.Preconditions;
22+
import org.apache.arrow.vector.BitVectorHelper;
23+
import org.apache.arrow.vector.IntVector;
24+
import org.apache.arrow.vector.ValueVector;
25+
import org.apache.arrow.vector.compare.RangeEqualsVisitor;
26+
27+
import io.netty.buffer.ArrowBuf;
28+
29+
/**
30+
* Utilities for vector deduplication.
31+
*/
32+
class DeduplicationUtils {
33+
34+
/**
35+
* Gets the start positions of the first distinct values in a vector.
36+
* @param vector the target vector.
37+
* @param runStarts the bit set to hold the start positions.
38+
* @param <V> vector type.
39+
*/
40+
public static <V extends ValueVector> void populateRunStartIndicators(V vector, ArrowBuf runStarts) {
41+
int bufSize = DataSizeRoundingUtil.divideBy8Ceil(vector.getValueCount());
42+
Preconditions.checkArgument(runStarts.capacity() >= bufSize);
43+
runStarts.setZero(0, bufSize);
44+
45+
BitVectorHelper.setValidityBitToOne(runStarts, 0);
46+
47+
for (int i = 1; i < vector.getValueCount(); i++) {
48+
RangeEqualsVisitor visitor = new RangeEqualsVisitor(vector, i - 1, i, 1, false);
49+
if (!visitor.equals(vector)) {
50+
BitVectorHelper.setValidityBitToOne(runStarts, i);
51+
}
52+
}
53+
}
54+
55+
/**
56+
* Gets the run lengths, given the start positions.
57+
* @param runStarts the bit set for start positions.
58+
* @param runLengths the run length vector to populate.
59+
* @param valueCount the number of values in the bit set.
60+
*/
61+
public static void populateRunLengths(ArrowBuf runStarts, IntVector runLengths, int valueCount) {
62+
int curStart = 0;
63+
int lengthIndex = 0;
64+
for (int i = 1; i < valueCount; i++) {
65+
if (BitVectorHelper.get(runStarts, i) != 0) {
66+
// we get a new distinct value
67+
runLengths.setSafe(lengthIndex++, i - curStart);
68+
curStart = i;
69+
}
70+
}
71+
72+
// process the last value
73+
runLengths.setSafe(lengthIndex++, valueCount - curStart);
74+
runLengths.setValueCount(lengthIndex);
75+
}
76+
77+
/**
78+
* Gets distinct values from the input vector by removing adjacent
79+
* duplicated values.
80+
* @param indicators the bit set containing the start positions of disctinct values.
81+
* @param inputVector the input vector.
82+
* @param outputVector the output vector.
83+
* @param <V> vector type.
84+
*/
85+
public static <V extends ValueVector> void populateDeduplicatedValues(
86+
ArrowBuf indicators, V inputVector, V outputVector) {
87+
int dstIdx = 0;
88+
for (int srcIdx = 0; srcIdx < inputVector.getValueCount(); srcIdx++) {
89+
if (BitVectorHelper.get(indicators, srcIdx) != 0) {
90+
outputVector.copyFromSafe(srcIdx, dstIdx++, inputVector);
91+
}
92+
}
93+
outputVector.setValueCount(dstIdx);
94+
}
95+
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.arrow.algorithm.deduplicate;
19+
20+
import org.apache.arrow.memory.BufferAllocator;
21+
import org.apache.arrow.util.DataSizeRoundingUtil;
22+
import org.apache.arrow.util.Preconditions;
23+
import org.apache.arrow.vector.BitVectorHelper;
24+
import org.apache.arrow.vector.IntVector;
25+
import org.apache.arrow.vector.ValueVector;
26+
27+
import io.netty.buffer.ArrowBuf;
28+
29+
/**
30+
* Remove adjacent equal elements from a vector.
31+
* If the vector is sorted, it removes all duplicated values in the vector.
32+
* @param <V> vector type.
33+
*/
34+
public class VectorRunDeduplicator<V extends ValueVector> implements AutoCloseable {
35+
36+
/**
37+
* Bit set for distinct values.
38+
* If the value at some index is not equal to the previous value,
39+
* its bit is set to 1, otherwise its bit is set to 0.
40+
*/
41+
private ArrowBuf distinctValueBuffer;
42+
43+
/**
44+
* The vector to deduplicate.
45+
*/
46+
private final V vector;
47+
48+
private final BufferAllocator allocator;
49+
50+
/**
51+
* Constructs a vector run deduplicator for a given vector.
52+
* @param vector the given vector.
53+
* @param allocator the allocator used for allocating buffers for start indices.
54+
*/
55+
public VectorRunDeduplicator(V vector, BufferAllocator allocator) {
56+
this.vector = vector;
57+
this.allocator = allocator;
58+
}
59+
60+
private void createDistinctValueBuffer() {
61+
Preconditions.checkArgument(distinctValueBuffer == null);
62+
int bufSize = DataSizeRoundingUtil.divideBy8Ceil(vector.getValueCount());
63+
distinctValueBuffer = allocator.buffer(bufSize);
64+
DeduplicationUtils.populateRunStartIndicators(vector, distinctValueBuffer);
65+
}
66+
67+
/**
68+
* Gets the number of values which are different from their predecessor.
69+
* @return the run count.
70+
*/
71+
public int getRunCount() {
72+
if (distinctValueBuffer == null) {
73+
createDistinctValueBuffer();
74+
}
75+
return vector.getValueCount() - BitVectorHelper.getNullCount(distinctValueBuffer, vector.getValueCount());
76+
}
77+
78+
/**
79+
* Gets the vector with deduplicated adjacent values removed.
80+
* @param outVector the output vector.
81+
*/
82+
public void populateDeduplicatedValues(V outVector) {
83+
if (distinctValueBuffer == null) {
84+
createDistinctValueBuffer();
85+
}
86+
87+
DeduplicationUtils.populateDeduplicatedValues(distinctValueBuffer, vector, outVector);
88+
}
89+
90+
/**
91+
* Gets the length of each distinct value.
92+
* @param lengthVector the vector for holding length values.
93+
*/
94+
public void populateRunLengths(IntVector lengthVector) {
95+
if (distinctValueBuffer == null) {
96+
createDistinctValueBuffer();
97+
}
98+
99+
DeduplicationUtils.populateRunLengths(distinctValueBuffer, lengthVector, vector.getValueCount());
100+
}
101+
102+
@Override
103+
public void close() {
104+
if (distinctValueBuffer != null) {
105+
distinctValueBuffer.close();
106+
distinctValueBuffer = null;
107+
}
108+
}
109+
}
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.arrow.algorithm.deduplicate;
19+
20+
import static org.junit.Assert.assertEquals;
21+
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
22+
23+
import org.apache.arrow.memory.BufferAllocator;
24+
import org.apache.arrow.memory.RootAllocator;
25+
import org.apache.arrow.util.DataSizeRoundingUtil;
26+
import org.apache.arrow.vector.BitVectorHelper;
27+
import org.apache.arrow.vector.IntVector;
28+
import org.apache.arrow.vector.VarCharVector;
29+
import org.junit.After;
30+
import org.junit.Before;
31+
import org.junit.Test;
32+
33+
import io.netty.buffer.ArrowBuf;
34+
35+
/**
36+
* Test cases for {@link DeduplicationUtils}.
37+
*/
38+
public class TestDeduplicationUtils {
39+
40+
private static final int VECTOR_LENGTH = 100;
41+
42+
private static final int REPETITION_COUNT = 3;
43+
44+
private BufferAllocator allocator;
45+
46+
@Before
47+
public void prepare() {
48+
allocator = new RootAllocator(1024 * 1024);
49+
}
50+
51+
@After
52+
public void shutdown() {
53+
allocator.close();
54+
}
55+
56+
@Test
57+
public void testDeduplicateFixedWidth() {
58+
try (IntVector origVec = new IntVector("original vec", allocator);
59+
IntVector dedupVec = new IntVector("deduplicated vec", allocator);
60+
IntVector lengthVec = new IntVector("length vec", allocator);
61+
ArrowBuf distinctBuf = allocator.buffer(
62+
DataSizeRoundingUtil.divideBy8Ceil(VECTOR_LENGTH * REPETITION_COUNT))) {
63+
origVec.allocateNew(VECTOR_LENGTH * REPETITION_COUNT);
64+
origVec.setValueCount(VECTOR_LENGTH * REPETITION_COUNT);
65+
lengthVec.allocateNew();
66+
67+
// prepare data
68+
for (int i = 0; i < VECTOR_LENGTH; i++) {
69+
for (int j = 0; j < REPETITION_COUNT; j++) {
70+
origVec.set(i * REPETITION_COUNT + j, i);
71+
}
72+
}
73+
74+
DeduplicationUtils.populateRunStartIndicators(origVec, distinctBuf);
75+
assertEquals( VECTOR_LENGTH,
76+
VECTOR_LENGTH * REPETITION_COUNT -
77+
BitVectorHelper.getNullCount(distinctBuf, VECTOR_LENGTH * REPETITION_COUNT));
78+
79+
DeduplicationUtils.populateDeduplicatedValues(distinctBuf, origVec, dedupVec);
80+
assertEquals(VECTOR_LENGTH, dedupVec.getValueCount());
81+
82+
for (int i = 0; i < VECTOR_LENGTH; i++) {
83+
assertEquals(i, dedupVec.get(i));
84+
}
85+
86+
DeduplicationUtils.populateRunLengths(distinctBuf, lengthVec, VECTOR_LENGTH * REPETITION_COUNT);
87+
assertEquals(VECTOR_LENGTH, lengthVec.getValueCount());
88+
89+
for (int i = 0; i < VECTOR_LENGTH; i++) {
90+
assertEquals(REPETITION_COUNT, lengthVec.get(i));
91+
}
92+
}
93+
}
94+
95+
@Test
96+
public void testDeduplicateVariableWidth() {
97+
try (VarCharVector origVec = new VarCharVector("original vec", allocator);
98+
VarCharVector dedupVec = new VarCharVector("deduplicated vec", allocator);
99+
IntVector lengthVec = new IntVector("length vec", allocator);
100+
ArrowBuf distinctBuf = allocator.buffer(
101+
DataSizeRoundingUtil.divideBy8Ceil(VECTOR_LENGTH * REPETITION_COUNT))) {
102+
origVec.allocateNew(
103+
VECTOR_LENGTH * REPETITION_COUNT * 10, VECTOR_LENGTH * REPETITION_COUNT);
104+
origVec.setValueCount(VECTOR_LENGTH * REPETITION_COUNT);
105+
lengthVec.allocateNew();
106+
107+
// prepare data
108+
for (int i = 0; i < VECTOR_LENGTH; i++) {
109+
String str = String.valueOf(i * i);
110+
for (int j = 0; j < REPETITION_COUNT; j++) {
111+
origVec.set(i * REPETITION_COUNT + j, str.getBytes());
112+
}
113+
}
114+
115+
DeduplicationUtils.populateRunStartIndicators(origVec, distinctBuf);
116+
assertEquals(VECTOR_LENGTH,
117+
VECTOR_LENGTH * REPETITION_COUNT -
118+
BitVectorHelper.getNullCount(distinctBuf, VECTOR_LENGTH * REPETITION_COUNT));
119+
120+
DeduplicationUtils.populateDeduplicatedValues(distinctBuf, origVec, dedupVec);
121+
assertEquals(VECTOR_LENGTH, dedupVec.getValueCount());
122+
123+
for (int i = 0; i < VECTOR_LENGTH; i++) {
124+
assertArrayEquals(String.valueOf(i * i).getBytes(), dedupVec.get(i));
125+
}
126+
127+
DeduplicationUtils.populateRunLengths(
128+
distinctBuf, lengthVec, VECTOR_LENGTH * REPETITION_COUNT);
129+
assertEquals(VECTOR_LENGTH, lengthVec.getValueCount());
130+
131+
for (int i = 0; i < VECTOR_LENGTH; i++) {
132+
assertEquals(REPETITION_COUNT, lengthVec.get(i));
133+
}
134+
}
135+
}
136+
}

0 commit comments

Comments
 (0)