Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.algorithm.deduplicate;

import org.apache.arrow.util.DataSizeRoundingUtil;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.BitVectorHelper;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.compare.RangeEqualsVisitor;

import io.netty.buffer.ArrowBuf;

/**
* Utilities for vector deduplication.
*/
class DeduplicationUtils {

/**
* Gets the start positions of the first distinct values in a vector.
* @param vector the target vector.
* @param runStarts the bit set to hold the start positions.
* @param <V> vector type.
*/
public static <V extends ValueVector> void populateRunStartIndicators(V vector, ArrowBuf runStarts) {
int bufSize = DataSizeRoundingUtil.divideBy8Ceil(vector.getValueCount());
Preconditions.checkArgument(runStarts.capacity() >= bufSize);
runStarts.setZero(0, bufSize);

BitVectorHelper.setValidityBitToOne(runStarts, 0);

for (int i = 1; i < vector.getValueCount(); i++) {
RangeEqualsVisitor visitor = new RangeEqualsVisitor(
vector, i - 1, i, /* length */1, /* need check type*/false);
if (!visitor.equals(vector)) {
BitVectorHelper.setValidityBitToOne(runStarts, i);
}
}
}

/**
* Gets the run lengths, given the start positions.
* @param runStarts the bit set for start positions.
* @param runLengths the run length vector to populate.
* @param valueCount the number of values in the bit set.
*/
public static void populateRunLengths(ArrowBuf runStarts, IntVector runLengths, int valueCount) {
int curStart = 0;
int lengthIndex = 0;
for (int i = 1; i < valueCount; i++) {
if (BitVectorHelper.get(runStarts, i) != 0) {
// we get a new distinct value
runLengths.setSafe(lengthIndex++, i - curStart);
curStart = i;
}
}

// process the last value
runLengths.setSafe(lengthIndex++, valueCount - curStart);
runLengths.setValueCount(lengthIndex);
}

/**
* Gets distinct values from the input vector by removing adjacent
* duplicated values.
* @param indicators the bit set containing the start positions of disctinct values.
* @param inputVector the input vector.
* @param outputVector the output vector.
* @param <V> vector type.
*/
public static <V extends ValueVector> void populateDeduplicatedValues(
ArrowBuf indicators, V inputVector, V outputVector) {
int dstIdx = 0;
for (int srcIdx = 0; srcIdx < inputVector.getValueCount(); srcIdx++) {
if (BitVectorHelper.get(indicators, srcIdx) != 0) {
outputVector.copyFromSafe(srcIdx, dstIdx++, inputVector);
}
}
outputVector.setValueCount(dstIdx);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.algorithm.deduplicate;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.DataSizeRoundingUtil;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.BitVectorHelper;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.ValueVector;

import io.netty.buffer.ArrowBuf;

/**
* Remove adjacent equal elements from a vector.
* If the vector is sorted, it removes all duplicated values in the vector.
* @param <V> vector type.
*/
public class VectorRunDeduplicator<V extends ValueVector> implements AutoCloseable {

/**
* Bit set for distinct values.
* If the value at some index is not equal to the previous value,
* its bit is set to 1, otherwise its bit is set to 0.
*/
private ArrowBuf distinctValueBuffer;

/**
* The vector to deduplicate.
*/
private final V vector;

private final BufferAllocator allocator;

/**
* Constructs a vector run deduplicator for a given vector.
* @param vector the vector to dedulicate. Ownership is NOT taken.
* @param allocator the allocator used for allocating buffers for start indices.
*/
public VectorRunDeduplicator(V vector, BufferAllocator allocator) {
this.vector = vector;
this.allocator = allocator;
}

private void createDistinctValueBuffer() {
Preconditions.checkArgument(distinctValueBuffer == null);
int bufSize = DataSizeRoundingUtil.divideBy8Ceil(vector.getValueCount());
distinctValueBuffer = allocator.buffer(bufSize);
DeduplicationUtils.populateRunStartIndicators(vector, distinctValueBuffer);
}

/**
* Gets the number of values which are different from their predecessor.
* @return the run count.
*/
public int getRunCount() {
if (distinctValueBuffer == null) {
createDistinctValueBuffer();
}
return vector.getValueCount() - BitVectorHelper.getNullCount(distinctValueBuffer, vector.getValueCount());
}

/**
* Gets the vector with deduplicated adjacent values removed.
* @param outVector the output vector.
*/
public void populateDeduplicatedValues(V outVector) {
if (distinctValueBuffer == null) {
createDistinctValueBuffer();
}

DeduplicationUtils.populateDeduplicatedValues(distinctValueBuffer, vector, outVector);
}

/**
* Gets the length of each distinct value.
* @param lengthVector the vector for holding length values.
*/
public void populateRunLengths(IntVector lengthVector) {
if (distinctValueBuffer == null) {
createDistinctValueBuffer();
}

DeduplicationUtils.populateRunLengths(distinctValueBuffer, lengthVector, vector.getValueCount());
}

@Override
public void close() {
if (distinctValueBuffer != null) {
distinctValueBuffer.close();
distinctValueBuffer = null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.algorithm.deduplicate;

import static org.junit.Assert.assertEquals;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.DataSizeRoundingUtil;
import org.apache.arrow.vector.BitVectorHelper;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarCharVector;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import io.netty.buffer.ArrowBuf;

/**
* Test cases for {@link DeduplicationUtils}.
*/
public class TestDeduplicationUtils {

private static final int VECTOR_LENGTH = 100;

private static final int REPETITION_COUNT = 3;

private BufferAllocator allocator;

@Before
public void prepare() {
allocator = new RootAllocator(1024 * 1024);
}

@After
public void shutdown() {
allocator.close();
}

@Test
public void testDeduplicateFixedWidth() {
try (IntVector origVec = new IntVector("original vec", allocator);
IntVector dedupVec = new IntVector("deduplicated vec", allocator);
IntVector lengthVec = new IntVector("length vec", allocator);
ArrowBuf distinctBuf = allocator.buffer(
DataSizeRoundingUtil.divideBy8Ceil(VECTOR_LENGTH * REPETITION_COUNT))) {
origVec.allocateNew(VECTOR_LENGTH * REPETITION_COUNT);
origVec.setValueCount(VECTOR_LENGTH * REPETITION_COUNT);
lengthVec.allocateNew();

// prepare data
for (int i = 0; i < VECTOR_LENGTH; i++) {
for (int j = 0; j < REPETITION_COUNT; j++) {
origVec.set(i * REPETITION_COUNT + j, i);
}
}

DeduplicationUtils.populateRunStartIndicators(origVec, distinctBuf);
assertEquals( VECTOR_LENGTH,
VECTOR_LENGTH * REPETITION_COUNT -
BitVectorHelper.getNullCount(distinctBuf, VECTOR_LENGTH * REPETITION_COUNT));

DeduplicationUtils.populateDeduplicatedValues(distinctBuf, origVec, dedupVec);
assertEquals(VECTOR_LENGTH, dedupVec.getValueCount());

for (int i = 0; i < VECTOR_LENGTH; i++) {
assertEquals(i, dedupVec.get(i));
}

DeduplicationUtils.populateRunLengths(distinctBuf, lengthVec, VECTOR_LENGTH * REPETITION_COUNT);
assertEquals(VECTOR_LENGTH, lengthVec.getValueCount());

for (int i = 0; i < VECTOR_LENGTH; i++) {
assertEquals(REPETITION_COUNT, lengthVec.get(i));
}
}
}

@Test
public void testDeduplicateVariableWidth() {
try (VarCharVector origVec = new VarCharVector("original vec", allocator);
VarCharVector dedupVec = new VarCharVector("deduplicated vec", allocator);
IntVector lengthVec = new IntVector("length vec", allocator);
ArrowBuf distinctBuf = allocator.buffer(
DataSizeRoundingUtil.divideBy8Ceil(VECTOR_LENGTH * REPETITION_COUNT))) {
origVec.allocateNew(
VECTOR_LENGTH * REPETITION_COUNT * 10, VECTOR_LENGTH * REPETITION_COUNT);
origVec.setValueCount(VECTOR_LENGTH * REPETITION_COUNT);
lengthVec.allocateNew();

// prepare data
for (int i = 0; i < VECTOR_LENGTH; i++) {
String str = String.valueOf(i * i);
for (int j = 0; j < REPETITION_COUNT; j++) {
origVec.set(i * REPETITION_COUNT + j, str.getBytes());
}
}

DeduplicationUtils.populateRunStartIndicators(origVec, distinctBuf);
assertEquals(VECTOR_LENGTH,
VECTOR_LENGTH * REPETITION_COUNT -
BitVectorHelper.getNullCount(distinctBuf, VECTOR_LENGTH * REPETITION_COUNT));

DeduplicationUtils.populateDeduplicatedValues(distinctBuf, origVec, dedupVec);
assertEquals(VECTOR_LENGTH, dedupVec.getValueCount());

for (int i = 0; i < VECTOR_LENGTH; i++) {
assertArrayEquals(String.valueOf(i * i).getBytes(), dedupVec.get(i));
}

DeduplicationUtils.populateRunLengths(
distinctBuf, lengthVec, VECTOR_LENGTH * REPETITION_COUNT);
assertEquals(VECTOR_LENGTH, lengthVec.getValueCount());

for (int i = 0; i < VECTOR_LENGTH; i++) {
assertEquals(REPETITION_COUNT, lengthVec.get(i));
}
}
}
}
Loading