Skip to content

Commit 25fd97b

Browse files
martinradevwesm
authored andcommitted
PARQUET-1716: [C++] Add BYTE_STREAM_SPLIT encoder and decoder
The patch implements an encoder and decoder for Parquet's BYTE_STREAM_SPLIT encoding. The patch also adds tests for the new encoding. Closes #6005 from martinradev/byte_stream_split_submit and squashes the following commits: 5a78f8b <Martin Radev> ARROW-5913: Add BYTE_STREAM_SPLIT encoder and decoder Authored-by: Martin Radev <martin.b.radev@gmail.com> Signed-off-by: Wes McKinney <wesm+git@apache.org>
1 parent 184f828 commit 25fd97b

File tree

5 files changed

+384
-0
lines changed

5 files changed

+384
-0
lines changed

cpp/src/parquet/column_reader.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -577,6 +577,12 @@ class ColumnReaderImplBase {
577577
decoders_[static_cast<int>(encoding)] = std::move(decoder);
578578
break;
579579
}
580+
case Encoding::BYTE_STREAM_SPLIT: {
581+
auto decoder = MakeTypedDecoder<DType>(Encoding::BYTE_STREAM_SPLIT, descr_);
582+
current_decoder_ = decoder.get();
583+
decoders_[static_cast<int>(encoding)] = std::move(decoder);
584+
break;
585+
}
580586
case Encoding::RLE_DICTIONARY:
581587
throw ParquetException("Dictionary page must be before data page.");
582588

cpp/src/parquet/encoding.cc

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -815,6 +815,104 @@ void DictEncoderImpl<ByteArrayType>::PutDictionary(const arrow::Array& values) {
815815
}
816816
}
817817

818+
// ----------------------------------------------------------------------
819+
// ByteStreamSplitEncoder<T> implementations
820+
821+
template <typename DType>
822+
class ByteStreamSplitEncoder : public EncoderImpl, virtual public TypedEncoder<DType> {
823+
public:
824+
using T = typename DType::c_type;
825+
using TypedEncoder<DType>::Put;
826+
827+
explicit ByteStreamSplitEncoder(
828+
const ColumnDescriptor* descr,
829+
::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
830+
831+
int64_t EstimatedDataEncodedSize() override;
832+
std::shared_ptr<Buffer> FlushValues() override;
833+
834+
void Put(const T* buffer, int num_values) override;
835+
void Put(const arrow::Array& values) override;
836+
void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
837+
int64_t valid_bits_offset) override;
838+
839+
protected:
840+
arrow::TypedBufferBuilder<T> values_;
841+
842+
private:
843+
void PutArrowArray(const arrow::Array& values);
844+
};
845+
846+
template <typename DType>
847+
ByteStreamSplitEncoder<DType>::ByteStreamSplitEncoder(const ColumnDescriptor* descr,
848+
::arrow::MemoryPool* pool)
849+
: EncoderImpl(descr, Encoding::BYTE_STREAM_SPLIT, pool), values_{pool} {}
850+
851+
template <typename DType>
852+
int64_t ByteStreamSplitEncoder<DType>::EstimatedDataEncodedSize() {
853+
return values_.length() * sizeof(T);
854+
}
855+
856+
template <typename DType>
857+
std::shared_ptr<Buffer> ByteStreamSplitEncoder<DType>::FlushValues() {
858+
constexpr size_t num_streams = sizeof(T);
859+
std::shared_ptr<ResizableBuffer> output_buffer =
860+
AllocateBuffer(this->memory_pool(), EstimatedDataEncodedSize());
861+
uint8_t* output_buffer_raw = output_buffer->mutable_data();
862+
const size_t num_values = values_.length();
863+
const uint8_t* raw_values = reinterpret_cast<const uint8_t*>(values_.data());
864+
for (size_t i = 0; i < num_values; ++i) {
865+
for (size_t j = 0U; j < num_streams; ++j) {
866+
const uint8_t byte_in_value = raw_values[i * num_streams + j];
867+
output_buffer_raw[j * num_values + i] = byte_in_value;
868+
}
869+
}
870+
values_.Reset();
871+
return std::move(output_buffer);
872+
}
873+
874+
template <typename DType>
875+
void ByteStreamSplitEncoder<DType>::Put(const T* buffer, int num_values) {
876+
PARQUET_THROW_NOT_OK(values_.Append(buffer, num_values));
877+
}
878+
879+
template <typename DType>
880+
void ByteStreamSplitEncoder<DType>::Put(const ::arrow::Array& values) {
881+
PutArrowArray(values);
882+
}
883+
884+
template <>
885+
void ByteStreamSplitEncoder<FloatType>::PutArrowArray(const ::arrow::Array& values) {
886+
DirectPutImpl<arrow::FloatArray>(values,
887+
reinterpret_cast<arrow::BufferBuilder*>(&values_));
888+
}
889+
890+
template <>
891+
void ByteStreamSplitEncoder<DoubleType>::PutArrowArray(const ::arrow::Array& values) {
892+
DirectPutImpl<arrow::DoubleArray>(values,
893+
reinterpret_cast<arrow::BufferBuilder*>(&values_));
894+
}
895+
896+
template <typename DType>
897+
void ByteStreamSplitEncoder<DType>::PutSpaced(const T* src, int num_values,
898+
const uint8_t* valid_bits,
899+
int64_t valid_bits_offset) {
900+
std::shared_ptr<ResizableBuffer> buffer;
901+
PARQUET_THROW_NOT_OK(arrow::AllocateResizableBuffer(this->memory_pool(),
902+
num_values * sizeof(T), &buffer));
903+
int32_t num_valid_values = 0;
904+
arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset,
905+
num_values);
906+
T* data = reinterpret_cast<T*>(buffer->mutable_data());
907+
for (int32_t i = 0; i < num_values; i++) {
908+
if (valid_bits_reader.IsSet()) {
909+
data[num_valid_values++] = src[i];
910+
}
911+
valid_bits_reader.Next();
912+
}
913+
Put(data, num_valid_values);
914+
}
915+
818916
// ----------------------------------------------------------------------
819917
// Encoder and decoder factory functions
820918

@@ -863,6 +961,18 @@ std::unique_ptr<Encoder> MakeEncoder(Type::type type_num, Encoding::type encodin
863961
DCHECK(false) << "Encoder not implemented";
864962
break;
865963
}
964+
} else if (encoding == Encoding::BYTE_STREAM_SPLIT) {
965+
switch (type_num) {
966+
case Type::FLOAT:
967+
return std::unique_ptr<Encoder>(
968+
new ByteStreamSplitEncoder<FloatType>(descr, pool));
969+
case Type::DOUBLE:
970+
return std::unique_ptr<Encoder>(
971+
new ByteStreamSplitEncoder<DoubleType>(descr, pool));
972+
default:
973+
throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE");
974+
break;
975+
}
866976
} else {
867977
ParquetException::NYI("Selected encoding is not supported");
868978
}
@@ -2236,6 +2346,74 @@ class DeltaByteArrayDecoder : public DecoderImpl,
22362346
ByteArray last_value_;
22372347
};
22382348

2349+
// ----------------------------------------------------------------------
2350+
// BYTE_STREAM_SPLIT
2351+
2352+
template <typename DType>
2353+
class ByteStreamSplitDecoder : public DecoderImpl, virtual public TypedDecoder<DType> {
2354+
public:
2355+
using T = typename DType::c_type;
2356+
explicit ByteStreamSplitDecoder(const ColumnDescriptor* descr);
2357+
2358+
int Decode(T* buffer, int max_values) override;
2359+
2360+
int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
2361+
int64_t valid_bits_offset,
2362+
typename EncodingTraits<DType>::Accumulator* builder) override;
2363+
2364+
int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
2365+
int64_t valid_bits_offset,
2366+
typename EncodingTraits<DType>::DictAccumulator* builder) override;
2367+
2368+
void SetData(int num_values, const uint8_t* data, int len) override;
2369+
2370+
private:
2371+
int num_values_in_buffer{0U};
2372+
};
2373+
2374+
template <typename DType>
2375+
ByteStreamSplitDecoder<DType>::ByteStreamSplitDecoder(const ColumnDescriptor* descr)
2376+
: DecoderImpl(descr, Encoding::BYTE_STREAM_SPLIT) {}
2377+
2378+
template <typename DType>
2379+
void ByteStreamSplitDecoder<DType>::SetData(int num_values, const uint8_t* data,
2380+
int len) {
2381+
DecoderImpl::SetData(num_values, data, len);
2382+
num_values_in_buffer = num_values;
2383+
}
2384+
2385+
template <typename DType>
2386+
int ByteStreamSplitDecoder<DType>::Decode(T* buffer, int max_values) {
2387+
constexpr size_t num_streams = sizeof(T);
2388+
const int values_to_decode = std::min(num_values_, max_values);
2389+
const int num_decoded_previously = num_values_in_buffer - num_values_;
2390+
for (int i = 0; i < values_to_decode; ++i) {
2391+
uint8_t gathered_byte_data[num_streams];
2392+
for (size_t b = 0; b < num_streams; ++b) {
2393+
const size_t byte_index = b * num_values_in_buffer + num_decoded_previously + i;
2394+
gathered_byte_data[b] = data_[byte_index];
2395+
}
2396+
buffer[i] = arrow::util::SafeLoadAs<T>(&gathered_byte_data[0]);
2397+
}
2398+
num_values_ -= values_to_decode;
2399+
len_ -= sizeof(T) * values_to_decode;
2400+
return values_to_decode;
2401+
}
2402+
2403+
template <typename DType>
2404+
int ByteStreamSplitDecoder<DType>::DecodeArrow(
2405+
int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
2406+
typename EncodingTraits<DType>::Accumulator* builder) {
2407+
ParquetException::NYI("DecodeArrow for ByteStreamSplitDecoder");
2408+
}
2409+
2410+
template <typename DType>
2411+
int ByteStreamSplitDecoder<DType>::DecodeArrow(
2412+
int num_values, int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset,
2413+
typename EncodingTraits<DType>::DictAccumulator* builder) {
2414+
ParquetException::NYI("DecodeArrow for ByteStreamSplitDecoder");
2415+
}
2416+
22392417
// ----------------------------------------------------------------------
22402418

22412419
std::unique_ptr<Decoder> MakeDecoder(Type::type type_num, Encoding::type encoding,
@@ -2261,6 +2439,16 @@ std::unique_ptr<Decoder> MakeDecoder(Type::type type_num, Encoding::type encodin
22612439
default:
22622440
break;
22632441
}
2442+
} else if (encoding == Encoding::BYTE_STREAM_SPLIT) {
2443+
switch (type_num) {
2444+
case Type::FLOAT:
2445+
return std::unique_ptr<Decoder>(new ByteStreamSplitDecoder<FloatType>(descr));
2446+
case Type::DOUBLE:
2447+
return std::unique_ptr<Decoder>(new ByteStreamSplitDecoder<DoubleType>(descr));
2448+
default:
2449+
throw ParquetException("BYTE_STREAM_SPLIT only supports FLOAT and DOUBLE");
2450+
break;
2451+
}
22642452
} else {
22652453
ParquetException::NYI("Selected encoding is not supported");
22662454
}

0 commit comments

Comments
 (0)