Skip to content

Commit beee8c7

Browse files
JkSelfmarin-ma
authored andcommitted
Rebase Shuffle component c++ and Java API (#109)
* [Java] compression workaround * [oap-native-sql] add FastPFOR codec Conflicts: cpp/cmake_modules/ThirdpartyToolchain.cmake cpp/src/arrow/ipc/writer.cc cpp/src/arrow/util/compression.cc cpp/src/arrow/util/compression.h * fix the compile errors after cherry pick 8f2b612ab72b36eeac22420c929043800dc61ac0 * [oap-native-sql] fastpfor add test Conflicts: cpp/src/arrow/util/compression_test.cc * [oap-native-sql] FastPFOR decompression Conflicts: cpp/src/arrow/ipc/reader.cc cpp/src/arrow/ipc/writer.cc * [oap-native-sql] filter float/double Conflicts: cpp/src/arrow/ipc/reader.cc * [oap-native-sql] fix * FastPForLib::CODECFactory::getFromName(fastpfor256) is not thread safe Conflicts: cpp/src/arrow/ipc/writer.cc * [oap-native-sql] force FastPFOR build with source Conflicts: cpp/cmake_modules/ThirdpartyToolchain.cmake cpp/thirdparty/versions.txt * typo Conflicts: cpp/src/arrow/util/compression.h * fix the failed unit test Co-authored-by: rongma1997 <rong.ma@intel.com>
1 parent 7cd194c commit beee8c7

File tree

18 files changed

+469
-17
lines changed

18 files changed

+469
-17
lines changed

cpp/CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -748,6 +748,11 @@ if(ARROW_WITH_ZSTD)
748748
endif()
749749
endif()
750750

751+
if(ARROW_WITH_FASTPFOR)
752+
list(APPEND ARROW_STATIC_LINK_LIBS FastPFOR::FastPFOR)
753+
list(APPEND ARROW_STATIC_INSTALL_INTERFACE_LIBS FastPFOR::FastPFOR)
754+
endif()
755+
751756
if(ARROW_ORC)
752757
list(APPEND ARROW_LINK_LIBS orc::liborc ${ARROW_PROTOBUF_LIBPROTOBUF})
753758
list(APPEND ARROW_STATIC_LINK_LIBS orc::liborc ${ARROW_PROTOBUF_LIBPROTOBUF})

cpp/cmake_modules/DefineOptions.cmake

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,7 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
392392
define_option(ARROW_WITH_SNAPPY "Build with Snappy compression" OFF)
393393
define_option(ARROW_WITH_ZLIB "Build with zlib compression" OFF)
394394
define_option(ARROW_WITH_ZSTD "Build with zstd compression" OFF)
395+
define_option(ARROW_WITH_FASTPFOR "Build with FastPFOR compression" OFF)
395396

396397
define_option(ARROW_WITH_UCX
397398
"Build with UCX transport for Arrow Flight;(only used if ARROW_FLIGHT is ON)"

cpp/cmake_modules/ThirdpartyToolchain.cmake

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ set(ARROW_THIRDPARTY_DEPENDENCIES
7575
utf8proc
7676
xsimd
7777
ZLIB
78-
zstd)
78+
zstd
79+
FastPFOR)
7980

8081
# TODO(wesm): External GTest shared libraries are not currently
8182
# supported when building with MSVC because of the way that
@@ -686,6 +687,15 @@ else()
686687
"https://github.com/facebook/zstd/archive/${ARROW_ZSTD_BUILD_VERSION}.tar.gz")
687688
endif()
688689

690+
if(DEFINED ENV{FASTPFOR_SOURCE_URL})
691+
set(FASTPFOR_SOURCE_URL "$ENV{FASTPFOR_SOURCE_URL}")
692+
else()
693+
set_urls(
694+
FASTPFOR_SOURCE_URL
695+
"https://github.com/lemire/FastPFor/archive/${ARROW_FASTPFOR_BUILD_VERSION}.tar.gz"
696+
)
697+
endif()
698+
689699
# ----------------------------------------------------------------------
690700
# ExternalProject options
691701

@@ -2544,6 +2554,42 @@ if(ARROW_WITH_UTF8PROC)
25442554
include_directories(SYSTEM ${UTF8PROC_INCLUDE_DIR})
25452555
endif()
25462556

2557+
if(ARROW_WITH_FASTPFOR)
2558+
message(STATUS "Building (vendored) FastPFOR from source")
2559+
2560+
if(MSVC)
2561+
message(FATAL_ERROR "Cannot build FastPFOR on MSVC")
2562+
else()
2563+
set(FASTPFOR_STATIC_LIB_NAME libFastPFOR.a)
2564+
endif()
2565+
2566+
set(FASTPFOR_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/fastpfor_ep-install")
2567+
set(FASTPFOR_STATIC_LIB "${FASTPFOR_PREFIX}/lib/${FASTPFOR_STATIC_LIB_NAME}")
2568+
set(FASTPFOR_INCLUDE_DIR "${FASTPFOR_PREFIX}/include")
2569+
set(FASTPFOR_CMAKE_ARGS
2570+
${EP_COMMON_CMAKE_ARGS}
2571+
"-DCMAKE_INSTALL_PREFIX=${FASTPFOR_PREFIX}"
2572+
-DCMAKE_INSTALL_LIBDIR=lib
2573+
-DCMAKE_BUILD_TYPE=Release)
2574+
2575+
externalproject_add(fastpfor_ep
2576+
INSTALL_DIR ${FASTPFOR_PREFIX}
2577+
URL ${FASTPFOR_SOURCE_URL} ${EP_LOG_OPTIONS}
2578+
BUILD_BYPRODUCTS "${FASTPFOR_STATIC_LIB}"
2579+
CMAKE_ARGS ${FASTPFOR_CMAKE_ARGS})
2580+
2581+
include_directories(SYSTEM ${FASTPFOR_INCLUDE_DIR})
2582+
file(MAKE_DIRECTORY ${FASTPFOR_INCLUDE_DIR})
2583+
2584+
add_library(FastPFOR::FastPFOR STATIC IMPORTED)
2585+
set_target_properties(FastPFOR::FastPFOR
2586+
PROPERTIES IMPORTED_LOCATION "${FASTPFOR_STATIC_LIB}"
2587+
INTERFACE_INCLUDE_DIRECTORIES "${FASTPFOR_INCLUDE_DIR}")
2588+
2589+
add_dependencies(toolchain fastpfor_ep)
2590+
add_dependencies(FastPFOR::FastPFOR fastpfor_ep)
2591+
endif()
2592+
25472593
macro(build_cares)
25482594
message(STATUS "Building c-ares from source")
25492595
set(CARES_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/cares_ep-install")

cpp/src/arrow/CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,11 @@ if(ARROW_WITH_ZSTD)
309309
list(APPEND ARROW_SRCS util/compression_zstd.cc)
310310
endif()
311311

312+
if(ARROW_WITH_FASTPFOR)
313+
add_definitions(-DARROW_WITH_FASTPFOR)
314+
list(APPEND ARROW_SRCS util/compression_fastpfor.cc)
315+
endif()
316+
312317
set(ARROW_TESTING_SRCS
313318
io/test_common.cc
314319
ipc/test_common.cc

cpp/src/arrow/ipc/options.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ IpcReadOptions IpcReadOptions::Defaults() { return IpcReadOptions(); }
2929
namespace internal {
3030

3131
Status CheckCompressionSupported(Compression::type codec) {
32-
if (!(codec == Compression::LZ4_FRAME || codec == Compression::ZSTD)) {
32+
if (!(codec == Compression::LZ4_FRAME || codec == Compression::ZSTD || codec == Compression::FASTPFOR)) {
3333
return Status::Invalid("Only LZ4_FRAME and ZSTD compression allowed");
3434
}
3535
return Status::OK();

cpp/src/arrow/ipc/read_write_test.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -706,7 +706,8 @@ TEST_F(TestWriteRecordBatch, WriteWithCompression) {
706706
auto batch =
707707
RecordBatch::Make(schema, length, {rg.String(500, 0, 10, 0.1), dict_array});
708708

709-
std::vector<Compression::type> codecs = {Compression::LZ4_FRAME, Compression::ZSTD};
709+
std::vector<Compression::type> codecs = {Compression::LZ4_FRAME, Compression::ZSTD,
710+
Compression::FASTPFOR};
710711
for (auto codec : codecs) {
711712
if (!util::Codec::IsAvailable(codec)) {
712713
continue;

cpp/src/arrow/ipc/reader.cc

Lines changed: 91 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,90 @@ Result<std::shared_ptr<Buffer>> DecompressBuffer(const std::shared_ptr<Buffer>&
474474
return std::move(uncompressed);
475475
}
476476

477+
Status DecompressBufferByType(const Buffer& buffer, util::Codec* codec,
478+
std::shared_ptr<Buffer>* out, MemoryPool* pool) {
479+
const uint8_t* data = buffer.data();
480+
int64_t compressed_size = buffer.size() - sizeof(int64_t);
481+
int64_t uncompressed_size = bit_util::FromLittleEndian(util::SafeLoadAs<int64_t>(data));
482+
483+
ARROW_ASSIGN_OR_RAISE(auto uncompressed, AllocateBuffer(uncompressed_size, pool));
484+
485+
int64_t actual_decompressed;
486+
ARROW_ASSIGN_OR_RAISE(
487+
actual_decompressed,
488+
codec->Decompress(compressed_size, data + sizeof(int64_t), uncompressed_size,
489+
uncompressed->mutable_data()));
490+
if (actual_decompressed != uncompressed_size) {
491+
return Status::Invalid("Failed to fully decompress buffer, expected ",
492+
uncompressed_size, " bytes but decompressed ",
493+
actual_decompressed);
494+
}
495+
*out = std::move(uncompressed);
496+
return Status::OK();
497+
}
498+
499+
Status DecompressBuffersByType(Compression::type compression,
500+
const IpcReadOptions& options,
501+
std::vector<std::shared_ptr<ArrayData>>* arrs,
502+
const std::vector<std::shared_ptr<Field>>& schema_fields) {
503+
ARROW_CHECK_EQ(arrs->size(), schema_fields.size());
504+
505+
std::unique_ptr<util::Codec> codec;
506+
std::unique_ptr<util::Codec> fastpfor32_codec;
507+
std::unique_ptr<util::Codec> fastpfor64_codec;
508+
ARROW_ASSIGN_OR_RAISE(codec, util::Codec::Create(Compression::LZ4_FRAME));
509+
ARROW_ASSIGN_OR_RAISE(fastpfor32_codec, util::Codec::CreateInt32(compression));
510+
ARROW_ASSIGN_OR_RAISE(fastpfor64_codec, util::Codec::CreateInt64(compression));
511+
512+
for (size_t field_idx = 0; field_idx < schema_fields.size(); ++field_idx) {
513+
const auto& field = schema_fields[field_idx];
514+
auto& arr = (*arrs)[field_idx];
515+
if (field->type()->id() == Type::NA) continue;
516+
517+
const auto& layout_buffers = field->type()->layout().buffers;
518+
for (size_t i = 0; i < layout_buffers.size(); ++i) {
519+
const auto& layout = layout_buffers[i];
520+
if (arr->buffers[i] == nullptr) {
521+
continue;
522+
}
523+
if (arr->buffers[i]->size() == 0) {
524+
continue;
525+
}
526+
if (arr->buffers[i]->size() < 8) {
527+
return Status::Invalid(
528+
"Likely corrupted message, compressed buffers "
529+
"are larger than 8 bytes by construction");
530+
}
531+
auto& buffer = arr->buffers[i];
532+
switch (layout.kind) {
533+
case DataTypeLayout::BufferKind::FIXED_WIDTH:
534+
if (layout.byte_width == 4 && field->type()->id() != Type::FLOAT) {
535+
RETURN_NOT_OK(DecompressBufferByType(*buffer, fastpfor32_codec.get(), &buffer,
536+
options.memory_pool));
537+
} else if (layout.byte_width == 8 && field->type()->id() != Type::DOUBLE) {
538+
RETURN_NOT_OK(DecompressBufferByType(*buffer, fastpfor64_codec.get(), &buffer,
539+
options.memory_pool));
540+
} else {
541+
RETURN_NOT_OK(
542+
DecompressBufferByType(*buffer, codec.get(), &buffer, options.memory_pool));
543+
}
544+
break;
545+
case DataTypeLayout::BufferKind::BITMAP:
546+
case DataTypeLayout::BufferKind::VARIABLE_WIDTH: {
547+
RETURN_NOT_OK(
548+
DecompressBufferByType(*buffer, codec.get(), &buffer, options.memory_pool));
549+
break;
550+
}
551+
case DataTypeLayout::BufferKind::ALWAYS_NULL:
552+
break;
553+
default:
554+
return Status::Invalid("Wrong buffer layout.");
555+
}
556+
}
557+
}
558+
return arrow::Status::OK();
559+
}
560+
477561
Status DecompressBuffers(Compression::type compression, const IpcReadOptions& options,
478562
ArrayDataVector* fields) {
479563
struct BufferAccumulator {
@@ -555,8 +639,13 @@ Result<std::shared_ptr<RecordBatch>> LoadRecordBatchSubset(
555639
filtered_columns = std::move(columns);
556640
}
557641
if (context.compression != Compression::UNCOMPRESSED) {
558-
RETURN_NOT_OK(
559-
DecompressBuffers(context.compression, context.options, &filtered_columns));
642+
643+
if (context.compression == Compression::FASTPFOR) {
644+
RETURN_NOT_OK(
645+
DecompressBuffersByType(context.compression, context.options, &filtered_columns, filtered_fields));
646+
} else {
647+
RETURN_NOT_OK(DecompressBuffers(context.compression, context.options, &filtered_columns));
648+
}
560649
}
561650

562651
// swap endian in a set of ArrayData if necessary (swap_endian == true)

cpp/src/arrow/ipc/writer.cc

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -183,10 +183,11 @@ class RecordBatchSerializer {
183183
}
184184

185185
Status CompressBuffer(const Buffer& buffer, util::Codec* codec,
186-
std::shared_ptr<Buffer>* out) {
186+
std::shared_ptr<Buffer>* out, MemoryPool* pool) {
187187
// Convert buffer to uncompressed-length-prefixed compressed buffer
188188
int64_t maximum_length = codec->MaxCompressedLen(buffer.size(), buffer.data());
189-
ARROW_ASSIGN_OR_RAISE(auto result, AllocateBuffer(maximum_length + sizeof(int64_t)));
189+
ARROW_ASSIGN_OR_RAISE(auto result,
190+
AllocateBuffer(maximum_length + sizeof(int64_t), pool));
190191

191192
int64_t actual_length;
192193
ARROW_ASSIGN_OR_RAISE(actual_length,
@@ -205,7 +206,7 @@ class RecordBatchSerializer {
205206
auto CompressOne = [&](size_t i) {
206207
if (out_->body_buffers[i]->size() > 0) {
207208
RETURN_NOT_OK(CompressBuffer(*out_->body_buffers[i], options_.codec.get(),
208-
&out_->body_buffers[i]));
209+
&out_->body_buffers[i], options_.memory_pool));
209210
}
210211
return Status::OK();
211212
};
@@ -214,6 +215,62 @@ class RecordBatchSerializer {
214215
options_.use_threads, static_cast<int>(out_->body_buffers.size()), CompressOne);
215216
}
216217

218+
Status CompressBodyBuffersByType(const std::vector<std::shared_ptr<Field>>& fields) {
219+
std::unique_ptr<util::Codec> codec;
220+
std::unique_ptr<util::Codec> int32_codec;
221+
std::unique_ptr<util::Codec> int64_codec;
222+
ARROW_ASSIGN_OR_RAISE(
223+
codec, util::Codec::Create(Compression::LZ4_FRAME, arrow::util::kUseDefaultCompressionLevel));
224+
ARROW_ASSIGN_OR_RAISE(
225+
int32_codec,
226+
util::Codec::CreateInt32(options_.codec->compression_type(), arrow::util::kUseDefaultCompressionLevel));
227+
ARROW_ASSIGN_OR_RAISE(
228+
int64_codec,
229+
util::Codec::CreateInt64(options_.codec->compression_type(), arrow::util::kUseDefaultCompressionLevel));
230+
231+
AppendCustomMetadata("ARROW:experimental_compression",
232+
util::Codec::GetCodecAsString(options_.codec->compression_type()));
233+
234+
int32_t buffer_idx = 0;
235+
for (const auto& field : fields) {
236+
if (field->type()->id() == Type::NA) continue;
237+
238+
const auto& layout_buffers = field->type()->layout().buffers;
239+
for (size_t i = 0; i < layout_buffers.size(); ++i) {
240+
const auto& layout = layout_buffers[i];
241+
auto& buffer = out_->body_buffers[buffer_idx + i];
242+
if (buffer->size() > 0) {
243+
switch (layout.kind) {
244+
case DataTypeLayout::BufferKind::FIXED_WIDTH:
245+
if (layout.byte_width == 4 && field->type()->id() != Type::FLOAT) {
246+
RETURN_NOT_OK(CompressBuffer(*buffer, int32_codec.get(), &buffer,
247+
options_.memory_pool));
248+
} else if (layout.byte_width == 8 && field->type()->id() != Type::DOUBLE) {
249+
RETURN_NOT_OK(CompressBuffer(*buffer, int64_codec.get(), &buffer,
250+
options_.memory_pool));
251+
} else {
252+
RETURN_NOT_OK(
253+
CompressBuffer(*buffer, codec.get(), &buffer, options_.memory_pool));
254+
}
255+
break;
256+
case DataTypeLayout::BufferKind::BITMAP:
257+
case DataTypeLayout::BufferKind::VARIABLE_WIDTH:
258+
RETURN_NOT_OK(
259+
CompressBuffer(*buffer, codec.get(), &buffer, options_.memory_pool));
260+
break;
261+
case DataTypeLayout::BufferKind::ALWAYS_NULL:
262+
break;
263+
default:
264+
return Status::Invalid("Wrong buffer layout.");
265+
}
266+
}
267+
}
268+
buffer_idx += layout_buffers.size();
269+
}
270+
// TODO: ParallelFor?
271+
return Status::OK();
272+
}
273+
217274
Status Assemble(const RecordBatch& batch) {
218275
if (field_nodes_.size() > 0) {
219276
field_nodes_.clear();
@@ -236,7 +293,12 @@ class RecordBatchSerializer {
236293
out_->raw_body_length = raw_size;
237294

238295
if (options_.codec != nullptr) {
239-
RETURN_NOT_OK(CompressBodyBuffers());
296+
297+
if (options_.codec->compression_type() == Compression::FASTPFOR) {
298+
RETURN_NOT_OK(CompressBodyBuffersByType(batch.schema()->fields()));
299+
} else {
300+
RETURN_NOT_OK(CompressBodyBuffers());
301+
}
240302
}
241303

242304
// The position for the start of a buffer relative to the passed frame of

0 commit comments

Comments
 (0)