@@ -474,6 +474,90 @@ Result<std::shared_ptr<Buffer>> DecompressBuffer(const std::shared_ptr<Buffer>&
474474 return std::move (uncompressed);
475475}
476476
477+ Status DecompressBufferByType (const Buffer& buffer, util::Codec* codec,
478+ std::shared_ptr<Buffer>* out, MemoryPool* pool) {
479+ const uint8_t * data = buffer.data ();
480+ int64_t compressed_size = buffer.size () - sizeof (int64_t );
481+ int64_t uncompressed_size = bit_util::FromLittleEndian (util::SafeLoadAs<int64_t >(data));
482+
483+ ARROW_ASSIGN_OR_RAISE (auto uncompressed, AllocateBuffer (uncompressed_size, pool));
484+
485+ int64_t actual_decompressed;
486+ ARROW_ASSIGN_OR_RAISE (
487+ actual_decompressed,
488+ codec->Decompress (compressed_size, data + sizeof (int64_t ), uncompressed_size,
489+ uncompressed->mutable_data ()));
490+ if (actual_decompressed != uncompressed_size) {
491+ return Status::Invalid (" Failed to fully decompress buffer, expected " ,
492+ uncompressed_size, " bytes but decompressed " ,
493+ actual_decompressed);
494+ }
495+ *out = std::move (uncompressed);
496+ return Status::OK ();
497+ }
498+
499+ Status DecompressBuffersByType (Compression::type compression,
500+ const IpcReadOptions& options,
501+ std::vector<std::shared_ptr<ArrayData>>* arrs,
502+ const std::vector<std::shared_ptr<Field>>& schema_fields) {
503+ ARROW_CHECK_EQ (arrs->size (), schema_fields.size ());
504+
505+ std::unique_ptr<util::Codec> codec;
506+ std::unique_ptr<util::Codec> fastpfor32_codec;
507+ std::unique_ptr<util::Codec> fastpfor64_codec;
508+ ARROW_ASSIGN_OR_RAISE (codec, util::Codec::Create (Compression::LZ4_FRAME));
509+ ARROW_ASSIGN_OR_RAISE (fastpfor32_codec, util::Codec::CreateInt32 (compression));
510+ ARROW_ASSIGN_OR_RAISE (fastpfor64_codec, util::Codec::CreateInt64 (compression));
511+
512+ for (size_t field_idx = 0 ; field_idx < schema_fields.size (); ++field_idx) {
513+ const auto & field = schema_fields[field_idx];
514+ auto & arr = (*arrs)[field_idx];
515+ if (field->type ()->id () == Type::NA) continue ;
516+
517+ const auto & layout_buffers = field->type ()->layout ().buffers ;
518+ for (size_t i = 0 ; i < layout_buffers.size (); ++i) {
519+ const auto & layout = layout_buffers[i];
520+ if (arr->buffers [i] == nullptr ) {
521+ continue ;
522+ }
523+ if (arr->buffers [i]->size () == 0 ) {
524+ continue ;
525+ }
526+ if (arr->buffers [i]->size () < 8 ) {
527+ return Status::Invalid (
528+ " Likely corrupted message, compressed buffers "
529+ " are larger than 8 bytes by construction" );
530+ }
531+ auto & buffer = arr->buffers [i];
532+ switch (layout.kind ) {
533+ case DataTypeLayout::BufferKind::FIXED_WIDTH:
534+ if (layout.byte_width == 4 && field->type ()->id () != Type::FLOAT) {
535+ RETURN_NOT_OK (DecompressBufferByType (*buffer, fastpfor32_codec.get (), &buffer,
536+ options.memory_pool ));
537+ } else if (layout.byte_width == 8 && field->type ()->id () != Type::DOUBLE) {
538+ RETURN_NOT_OK (DecompressBufferByType (*buffer, fastpfor64_codec.get (), &buffer,
539+ options.memory_pool ));
540+ } else {
541+ RETURN_NOT_OK (
542+ DecompressBufferByType (*buffer, codec.get (), &buffer, options.memory_pool ));
543+ }
544+ break ;
545+ case DataTypeLayout::BufferKind::BITMAP:
546+ case DataTypeLayout::BufferKind::VARIABLE_WIDTH: {
547+ RETURN_NOT_OK (
548+ DecompressBufferByType (*buffer, codec.get (), &buffer, options.memory_pool ));
549+ break ;
550+ }
551+ case DataTypeLayout::BufferKind::ALWAYS_NULL:
552+ break ;
553+ default :
554+ return Status::Invalid (" Wrong buffer layout." );
555+ }
556+ }
557+ }
558+ return arrow::Status::OK ();
559+ }
560+
477561Status DecompressBuffers (Compression::type compression, const IpcReadOptions& options,
478562 ArrayDataVector* fields) {
479563 struct BufferAccumulator {
@@ -555,8 +639,13 @@ Result<std::shared_ptr<RecordBatch>> LoadRecordBatchSubset(
555639 filtered_columns = std::move (columns);
556640 }
557641 if (context.compression != Compression::UNCOMPRESSED) {
558- RETURN_NOT_OK (
559- DecompressBuffers (context.compression , context.options , &filtered_columns));
642+
643+ if (context.compression == Compression::FASTPFOR) {
644+ RETURN_NOT_OK (
645+ DecompressBuffersByType (context.compression , context.options , &filtered_columns, filtered_fields));
646+ } else {
647+ RETURN_NOT_OK (DecompressBuffers (context.compression , context.options , &filtered_columns));
648+ }
560649 }
561650
562651 // swap endian in a set of ArrayData if necessary (swap_endian == true)
0 commit comments