Skip to content

[SPARK-55683][SQL] Optimize VectorizedPlainValuesReader.readUnsignedLongs#54479

Closed
LuciferYang wants to merge 1 commit intoapache:masterfrom
LuciferYang:SPARK-55683
Closed

[SPARK-55683][SQL] Optimize VectorizedPlainValuesReader.readUnsignedLongs#54479
LuciferYang wants to merge 1 commit intoapache:masterfrom
LuciferYang:SPARK-55683

Conversation

@LuciferYang
Copy link
Contributor

@LuciferYang LuciferYang commented Feb 25, 2026

What changes were proposed in this pull request?

This PR optimizes VectorizedPlainValuesReader.readUnsignedLongs by replacing the per-element BigInteger heap allocation chain with direct byte manipulation.

The original implementation allocates multiple objects per element:

// Old: String + BigInteger + internal int[] + byte[] allocations per element
c.putByteArray(rowId + i,
    new BigInteger(Long.toUnsignedString(buffer.getLong())).toByteArray());

The new implementation reads raw little-endian bytes directly from the ByteBuffer backing array (when available) and converts them to BigInteger-compatible big-endian encoding in a single pass:

// New: hasArray() fast path - operates directly on backing array, one byte[] per element
if (buffer.hasArray()) {
    byte[] src = buffer.array();
    int offset = buffer.arrayOffset() + buffer.position();
    for (int i = 0; i < total; i++, rowId++, offset += 8) {
        putLittleEndianBytesAsBigInteger(c, rowId, src, offset);
    }
} else {
    byte[] data = new byte[8];  // reused across all values in this batch
    for (int i = 0; i < total; i++, rowId++) {
        buffer.get(data);
        putLittleEndianBytesAsBigInteger(c, rowId, data, 0);
    }
}

The private helper putLittleEndianBytesAsBigInteger handles the conversion with output matching BigInteger.toByteArray() semantics:

  • Zero value: writes [0x00] (1 byte) rather than an empty array, since new BigInteger(new byte[0]) throws NumberFormatException
  • Sign byte: prepends 0x00 when the most significant byte has bit 7 set, to ensure the value is interpreted as positive by BigInteger
  • Byte order: reverses little-endian Parquet physical encoding to big-endian in a single loop

Why are the changes needed?

The original implementation constructs a BigInteger via Long.toUnsignedString + new BigInteger(String), which involves per-element allocations of a String, a BigInteger, its internal int[] magnitude array, and the final byte[]. For a typical batch of 4096 values this means ~16K object allocations, creating significant GC pressure in workloads reading large UINT_64 columns.

The new implementation reduces this to one byte[] allocation per element by operating directly on the raw bytes from the ByteBuffer, avoiding all intermediate object creation. Additionally, the direct buffer fallback path reuses a single byte[8] scratch buffer across the entire batch.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

  • The existing test SPARK-34817: Read UINT_64 as Decimal from parquet in ParquetIOSuite was extended with boundary values covering the critical edge cases of the new byte manipulation logic
  • Rename the original code to OldVectorizedPlainValuesReader, and compare the latency of the old and new readUnsignedLongs methods using JMH:
Benchmark Code (click to expand)
package org.apache.spark.sql.execution.datasources.parquet;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

import org.apache.parquet.bytes.ByteBufferInputStream;

import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.spark.sql.types.DataTypes;

@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@State(Scope.Thread)
@Fork(value = 1, jvmArgs = {"-Xms4G", "-Xmx4G"})
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 10, time = 1)
public class VectorizedPlainValuesReaderJMHBenchmark {

    // ==================== Parameters ====================

    @Param({"10000000"})
    private int numValues;

    // ==================== Test Data ====================

    private byte[] longData;
    private static final int BATCH_SIZE = 4096;

    private OldVectorizedPlainValuesReader oldSingleBufferOnHeapReader;
    private OldVectorizedPlainValuesReader oldSingleBufferOffHeapReader;
    private VectorizedPlainValuesReader newSingleBufferOnHeapReader;
    private VectorizedPlainValuesReader newSingleBufferOffHeapReader;

    // ==================== State Classes ====================

    /**
     * Column vector state using DecimalType(20, 0), which is the correct type for UINT64.
     * Parquet UINT_64 logical type is mapped to DecimalType(20, 0) in Spark.
     * Using LongType would cause NullPointerException because readUnsignedLongs
     * calls arrayData() which requires childColumns, only initialized for DecimalType.
     */
    @State(Scope.Thread)
    public static class DecimalColumnVectorState {
        public WritableColumnVector decimalColumn;

        @Setup(Level.Iteration)
        public void setup() {
            // UINT64 -> DecimalType(20, 0): precision=20, scale=0
            decimalColumn = new OnHeapColumnVector(BATCH_SIZE, DataTypes.createDecimalType(20, 0));
        }

        @TearDown(Level.Iteration)
        public void tearDown() {
            decimalColumn.close();
        }

        @Setup(Level.Invocation)
        public void reset() {
            decimalColumn.reset();
        }
    }

    // ==================== Setup ====================

    @Setup(Level.Trial)
    public void setupTrial() {
        Random random = new Random(42);
        longData = generateLongData(numValues, random);
    }

    @Setup(Level.Invocation)
    public void setupInvocation() throws IOException {
        oldSingleBufferOnHeapReader = new OldVectorizedPlainValuesReader();
        oldSingleBufferOnHeapReader.initFromPage(numValues, createSingleBufferInputStream(longData));
        oldSingleBufferOffHeapReader = new OldVectorizedPlainValuesReader();
        oldSingleBufferOffHeapReader.initFromPage(numValues, createDirectSingleBufferInputStream(longData));
        newSingleBufferOnHeapReader = new VectorizedPlainValuesReader();
        newSingleBufferOnHeapReader.initFromPage(numValues, createSingleBufferInputStream(longData));
        newSingleBufferOffHeapReader = new VectorizedPlainValuesReader();
        newSingleBufferOffHeapReader.initFromPage(numValues, createDirectSingleBufferInputStream(longData));
    }

    // ==================== Data Generation ====================

    private byte[] generateLongData(int count, Random random) {
        ByteBuffer buffer = ByteBuffer.allocate(count * 8).order(ByteOrder.LITTLE_ENDIAN);
        for (int i = 0; i < count; i++) {
            buffer.putLong(random.nextLong()); // full unsigned long range
        }
        return buffer.array();
    }

    // ==================== ByteBufferInputStream Creation ====================

    private ByteBufferInputStream createSingleBufferInputStream(byte[] data) {
        ByteBuffer buffer = ByteBuffer.wrap(data).order(ByteOrder.LITTLE_ENDIAN);
        return ByteBufferInputStream.wrap(buffer);
    }

    private ByteBuffer createDirectBuffer(byte[] data) {
        ByteBuffer buffer = ByteBuffer.allocateDirect(data.length).order(ByteOrder.LITTLE_ENDIAN);
        buffer.put(data);
        buffer.flip();
        return buffer;
    }

    private ByteBufferInputStream createDirectSingleBufferInputStream(byte[] data) {
        ByteBuffer buffer = createDirectBuffer(data);
        return ByteBufferInputStream.wrap(buffer);
    }

    // ====================================================================================
    // readUnsignedLongs onHeap
    // ====================================================================================

    @Benchmark
    public void readUnsignedLongs_onHeap_Old(DecimalColumnVectorState state) throws IOException {
        for (int i = 0; i < numValues; i += BATCH_SIZE) {
            oldSingleBufferOnHeapReader.readUnsignedLongs(
                    Math.min(BATCH_SIZE, numValues - i), state.decimalColumn, 0);
        }
    }

    @Benchmark
    public void readUnsignedLongs_onHeap_New(DecimalColumnVectorState state) throws IOException {
        for (int i = 0; i < numValues; i += BATCH_SIZE) {
            newSingleBufferOnHeapReader.readUnsignedLongs(
                    Math.min(BATCH_SIZE, numValues - i), state.decimalColumn, 0);
        }
    }

    // ====================================================================================
    // readUnsignedLongs offHeap
    // ====================================================================================

    @Benchmark
    public void readUnsignedLongs_offHeap_Old(DecimalColumnVectorState state) throws IOException {
        for (int i = 0; i < numValues; i += BATCH_SIZE) {
            oldSingleBufferOffHeapReader.readUnsignedLongs(
                    Math.min(BATCH_SIZE, numValues - i), state.decimalColumn, 0);
        }
    }

    @Benchmark
    public void readUnsignedLongs_offHeap_New(DecimalColumnVectorState state) throws IOException {
        for (int i = 0; i < numValues; i += BATCH_SIZE) {
            newSingleBufferOffHeapReader.readUnsignedLongs(
                    Math.min(BATCH_SIZE, numValues - i), state.decimalColumn, 0);
        }
    }

    // ==================== Main Method ====================

    public static void main(String[] args) throws RunnerException {
        String filter = args.length > 0 ?
                args[0] : VectorizedPlainValuesReaderJMHBenchmark.class.getSimpleName();
        Options opt = new OptionsBuilder()
                .include(filter)
                .build();

        new Runner(opt).run();
    }
}

Perform build/sbt "sql/Test/runMain org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReaderJMHBenchmark" to conduct the test

Benchmark results:

  • Java 17.0.18+8-LTS
Benchmark                                                              (numValues)  Mode  Cnt        Score       Error  Units
VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_offHeap_New     10000000  avgt   10   249413.824 ± 12242.331  us/op
VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_offHeap_Old     10000000  avgt   10  2301279.127 ± 14970.249  us/op
VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_onHeap_New      10000000  avgt   10   282651.747 ±  5031.717  us/op
VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_onHeap_Old      10000000  avgt   10  2382690.093 ± 10364.228  us/op
  • Java 21.0.10+7-LTS
Benchmark                                                              (numValues)  Mode  Cnt        Score       Error  Units
VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_offHeap_New     10000000  avgt   10   256621.630 ± 24087.509  us/op
VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_offHeap_Old     10000000  avgt   10  2120170.591 ±  4862.317  us/op
VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_onHeap_New      10000000  avgt   10   284058.229 ± 19966.179  us/op
VectorizedPlainValuesReaderJMHBenchmark.readUnsignedLongs_onHeap_Old      10000000  avgt   10  2190838.305 ±  7979.740  us/op

Both onHeap and offHeap paths show approximately ~8x improvement.

Was this patch authored or co-authored using generative AI tooling?

Yes, Claude Sonnet 4.6 was used to assist in completing the code writing.

@LuciferYang LuciferYang marked this pull request as draft February 25, 2026 09:49
@LuciferYang
Copy link
Contributor Author

Test first, the PRprdescription and performance comparison will be updated later.

@LuciferYang LuciferYang marked this pull request as ready for review February 25, 2026 13:33
@pan3793 pan3793 requested a review from Copilot February 25, 2026 15:42
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Optimizes Spark SQL’s Parquet vectorized PLAIN decoding for UINT_64 by replacing the per-value BigInteger(String) construction path with direct byte-level conversion, and extends the existing Parquet IO test to cover boundary cases for unsigned 64-bit decoding.

Changes:

  • Reworked VectorizedPlainValuesReader.readUnsignedLongs to convert little-endian UINT_64 bytes into BigInteger-compatible big-endian two’s-complement byte arrays without String/BigInteger intermediates.
  • Added a helper to produce BigInteger.toByteArray()-compatible encodings (zero handling + sign-byte handling).
  • Extended ParquetIOSuite’s UINT_64 test data with boundary values (0/1/max/min/-1/-2).

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.

File Description
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java Implements the new byte-manipulation decoding path + helper for BigInteger-compatible output.
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala Extends existing UINT_64 Parquet read test with boundary values to validate the new encoding logic.
Comments suppressed due to low confidence (2)

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java:278

  • putLittleEndianBytesAsBigInteger still allocates a new byte[] per value (new byte[totalLen], plus new byte[]{0} for zeros). Since WritableColumnVector.putByteArray(rowId, value, offset, count) copies the bytes into arrayData(), you can reuse a single scratch buffer (e.g., byte[9]) across the loop and pass the appropriate offset/length to avoid per-element heap allocations entirely.
    // Zero value: must write [0x00] rather than an empty array.
    // BigInteger.ZERO.toByteArray() returns [0x00], and new BigInteger(new byte[0])
    // throws NumberFormatException("Zero length BigInteger").
    if (msbIndex == offset && src[offset] == 0) {
      c.putByteArray(rowId, new byte[]{0});
      return;
    }

    // Prepend a 0x00 sign byte if the most significant byte has bit 7 set.
    // This matches BigInteger.toByteArray() behavior: positive values whose highest
    // magnitude byte has the MSB set are prefixed with 0x00 to distinguish them
    // from negative values in two's-complement encoding.
    boolean needSignByte = (src[msbIndex] & 0x80) != 0;
    int valueLen = msbIndex - offset + 1;
    int totalLen = needSignByte ? valueLen + 1 : valueLen;

    byte[] dest = new byte[totalLen];
    int destOffset = 0;
    if (needSignByte) {
      dest[destOffset++] = 0x00;
    }
    // Reverse byte order: little-endian src → big-endian dest
    for (int i = msbIndex; i >= offset; i--) {
      dest[destOffset++] = src[i];
    }
    c.putByteArray(rowId, dest, 0, totalLen);

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala:1275

  • The boundary values sequence is duplicated here and again when constructing boundaryExpected below. Consider defining a single val boundaryValues = Seq(...) and reusing it for both writing and expected rows to avoid accidental drift if the list changes in the future.
        // Boundary values: zero, one, signed extremes interpreted as unsigned
        Seq(0L, 1L, Long.MaxValue, Long.MinValue, -2L, -1L).foreach { v =>
          val group = factory.newGroup().append("a", v)
          writer.write(group)
        }

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@dongjoon-hyun
Copy link
Member

Merged to master for Apache Spark 4.2.0. Thank you, @LuciferYang and @pan3793 .

@LuciferYang
Copy link
Contributor Author

LuciferYang commented Feb 26, 2026

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java:278

  • putLittleEndianBytesAsBigInteger still allocates a new byte[] per value (new byte[totalLen], plus new byte[]{0} for zeros). Since WritableColumnVector.putByteArray(rowId, value, offset, count) copies the bytes into arrayData(), you can reuse a single scratch buffer (e.g., byte[9]) across the loop and pass the appropriate offset/length to avoid per-element heap allocations entirely.
    // Zero value: must write [0x00] rather than an empty array.
    // BigInteger.ZERO.toByteArray() returns [0x00], and new BigInteger(new byte[0])
    // throws NumberFormatException("Zero length BigInteger").
    if (msbIndex == offset && src[offset] == 0) {
      c.putByteArray(rowId, new byte[]{0});
      return;
    }

    // Prepend a 0x00 sign byte if the most significant byte has bit 7 set.
    // This matches BigInteger.toByteArray() behavior: positive values whose highest
    // magnitude byte has the MSB set are prefixed with 0x00 to distinguish them
    // from negative values in two's-complement encoding.
    boolean needSignByte = (src[msbIndex] & 0x80) != 0;
    int valueLen = msbIndex - offset + 1;
    int totalLen = needSignByte ? valueLen + 1 : valueLen;

    byte[] dest = new byte[totalLen];
    int destOffset = 0;
    if (needSignByte) {
      dest[destOffset++] = 0x00;
    }
    // Reverse byte order: little-endian src → big-endian dest
    for (int i = msbIndex; i >= offset; i--) {
      dest[destOffset++] = src[i];
    }
    c.putByteArray(rowId, dest, 0, totalLen);

This suggestion from Copilot sounds good. Let me try it out to see the effect. If there are benefits, I'll submit a FOLLOWUP.

@LuciferYang
Copy link
Contributor Author

sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java:278

  • putLittleEndianBytesAsBigInteger still allocates a new byte[] per value (new byte[totalLen], plus new byte[]{0} for zeros). Since WritableColumnVector.putByteArray(rowId, value, offset, count) copies the bytes into arrayData(), you can reuse a single scratch buffer (e.g., byte[9]) across the loop and pass the appropriate offset/length to avoid per-element heap allocations entirely.
    // Zero value: must write [0x00] rather than an empty array.
    // BigInteger.ZERO.toByteArray() returns [0x00], and new BigInteger(new byte[0])
    // throws NumberFormatException("Zero length BigInteger").
    if (msbIndex == offset && src[offset] == 0) {
      c.putByteArray(rowId, new byte[]{0});
      return;
    }

    // Prepend a 0x00 sign byte if the most significant byte has bit 7 set.
    // This matches BigInteger.toByteArray() behavior: positive values whose highest
    // magnitude byte has the MSB set are prefixed with 0x00 to distinguish them
    // from negative values in two's-complement encoding.
    boolean needSignByte = (src[msbIndex] & 0x80) != 0;
    int valueLen = msbIndex - offset + 1;
    int totalLen = needSignByte ? valueLen + 1 : valueLen;

    byte[] dest = new byte[totalLen];
    int destOffset = 0;
    if (needSignByte) {
      dest[destOffset++] = 0x00;
    }
    // Reverse byte order: little-endian src → big-endian dest
    for (int i = msbIndex; i >= offset; i--) {
      dest[destOffset++] = src[i];
    }
    c.putByteArray(rowId, dest, 0, totalLen);

This suggestion from Copilot sounds good. Let me try it out to see the effect. If there are benefits, I'll submit a FOLLOWUP.

It has shown stable optimization effects for OnHeap , and I've opened a follow-up:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants