Skip to content

Commit e04df5c

Browse files
JkSelfmarin-ma
authored andcommitted
[Arrow 3.0 Rebase] Rebase Shuffle component c++ and Java API (#109)
* [Java] compression workaround * [oap-native-sql] add FastPFOR codec Conflicts: cpp/cmake_modules/ThirdpartyToolchain.cmake cpp/src/arrow/ipc/writer.cc cpp/src/arrow/util/compression.cc cpp/src/arrow/util/compression.h * fix the compile errors after cherry pick 8f2b612ab72b36eeac22420c929043800dc61ac0 * [oap-native-sql] fastpfor add test Conflicts: cpp/src/arrow/util/compression_test.cc * [oap-native-sql] FastPFOR decompression Conflicts: cpp/src/arrow/ipc/reader.cc cpp/src/arrow/ipc/writer.cc * [oap-native-sql] filter float/double Conflicts: cpp/src/arrow/ipc/reader.cc * [oap-native-sql] fix * FastPForLib::CODECFactory::getFromName(fastpfor256) is not thread safe Conflicts: cpp/src/arrow/ipc/writer.cc * [oap-native-sql] force FastPFOR build with source Conflicts: cpp/cmake_modules/ThirdpartyToolchain.cmake cpp/thirdparty/versions.txt * typo Conflicts: cpp/src/arrow/util/compression.h * fix the failed unit test Co-authored-by: rongma1997 <rong.ma@intel.com>
1 parent 1b0c91b commit e04df5c

File tree

18 files changed

+469
-17
lines changed

18 files changed

+469
-17
lines changed

cpp/CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -742,6 +742,11 @@ if(ARROW_WITH_ZSTD)
742742
endif()
743743
endif()
744744

745+
if(ARROW_WITH_FASTPFOR)
746+
list(APPEND ARROW_STATIC_LINK_LIBS FastPFOR::FastPFOR)
747+
list(APPEND ARROW_STATIC_INSTALL_INTERFACE_LIBS FastPFOR::FastPFOR)
748+
endif()
749+
745750
if(ARROW_ORC)
746751
list(APPEND ARROW_LINK_LIBS orc::liborc ${ARROW_PROTOBUF_LIBPROTOBUF})
747752
list(APPEND ARROW_STATIC_LINK_LIBS orc::liborc ${ARROW_PROTOBUF_LIBPROTOBUF})

cpp/cmake_modules/DefineOptions.cmake

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,7 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
390390
define_option(ARROW_WITH_SNAPPY "Build with Snappy compression" OFF)
391391
define_option(ARROW_WITH_ZLIB "Build with zlib compression" OFF)
392392
define_option(ARROW_WITH_ZSTD "Build with zstd compression" OFF)
393+
define_option(ARROW_WITH_FASTPFOR "Build with FastPFOR compression" OFF)
393394

394395
define_option(ARROW_WITH_UTF8PROC
395396
"Build with support for Unicode properties using the utf8proc library;(only used if ARROW_COMPUTE is ON or ARROW_GANDIVA is ON)"

cpp/cmake_modules/ThirdpartyToolchain.cmake

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ set(ARROW_THIRDPARTY_DEPENDENCIES
7373
utf8proc
7474
xsimd
7575
ZLIB
76-
zstd)
76+
zstd
77+
FastPFOR)
7778

7879
# TODO(wesm): External GTest shared libraries are not currently
7980
# supported when building with MSVC because of the way that
@@ -660,6 +661,15 @@ else()
660661
"https://github.com/facebook/zstd/archive/${ARROW_ZSTD_BUILD_VERSION}.tar.gz")
661662
endif()
662663

664+
if(DEFINED ENV{FASTPFOR_SOURCE_URL})
665+
set(FASTPFOR_SOURCE_URL "$ENV{FASTPFOR_SOURCE_URL}")
666+
else()
667+
set_urls(
668+
FASTPFOR_SOURCE_URL
669+
"https://github.com/lemire/FastPFor/archive/${ARROW_FASTPFOR_BUILD_VERSION}.tar.gz"
670+
)
671+
endif()
672+
663673
# ----------------------------------------------------------------------
664674
# ExternalProject options
665675

@@ -2411,6 +2421,42 @@ if(ARROW_WITH_UTF8PROC)
24112421
include_directories(SYSTEM ${UTF8PROC_INCLUDE_DIR})
24122422
endif()
24132423

2424+
if(ARROW_WITH_FASTPFOR)
2425+
message(STATUS "Building (vendored) FastPFOR from source")
2426+
2427+
if(MSVC)
2428+
message(FATAL_ERROR "Cannot build FastPFOR on MSVC")
2429+
else()
2430+
set(FASTPFOR_STATIC_LIB_NAME libFastPFOR.a)
2431+
endif()
2432+
2433+
set(FASTPFOR_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/fastpfor_ep-install")
2434+
set(FASTPFOR_STATIC_LIB "${FASTPFOR_PREFIX}/lib/${FASTPFOR_STATIC_LIB_NAME}")
2435+
set(FASTPFOR_INCLUDE_DIR "${FASTPFOR_PREFIX}/include")
2436+
set(FASTPFOR_CMAKE_ARGS
2437+
${EP_COMMON_CMAKE_ARGS}
2438+
"-DCMAKE_INSTALL_PREFIX=${FASTPFOR_PREFIX}"
2439+
-DCMAKE_INSTALL_LIBDIR=lib
2440+
-DCMAKE_BUILD_TYPE=Release)
2441+
2442+
externalproject_add(fastpfor_ep
2443+
INSTALL_DIR ${FASTPFOR_PREFIX}
2444+
URL ${FASTPFOR_SOURCE_URL} ${EP_LOG_OPTIONS}
2445+
BUILD_BYPRODUCTS "${FASTPFOR_STATIC_LIB}"
2446+
CMAKE_ARGS ${FASTPFOR_CMAKE_ARGS})
2447+
2448+
include_directories(SYSTEM ${FASTPFOR_INCLUDE_DIR})
2449+
file(MAKE_DIRECTORY ${FASTPFOR_INCLUDE_DIR})
2450+
2451+
add_library(FastPFOR::FastPFOR STATIC IMPORTED)
2452+
set_target_properties(FastPFOR::FastPFOR
2453+
PROPERTIES IMPORTED_LOCATION "${FASTPFOR_STATIC_LIB}"
2454+
INTERFACE_INCLUDE_DIRECTORIES "${FASTPFOR_INCLUDE_DIR}")
2455+
2456+
add_dependencies(toolchain fastpfor_ep)
2457+
add_dependencies(FastPFOR::FastPFOR fastpfor_ep)
2458+
endif()
2459+
24142460
macro(build_cares)
24152461
message(STATUS "Building c-ares from source")
24162462
set(CARES_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/cares_ep-install")

cpp/src/arrow/CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,11 @@ if(ARROW_WITH_ZSTD)
306306
list(APPEND ARROW_SRCS util/compression_zstd.cc)
307307
endif()
308308

309+
if(ARROW_WITH_FASTPFOR)
310+
add_definitions(-DARROW_WITH_FASTPFOR)
311+
list(APPEND ARROW_SRCS util/compression_fastpfor.cc)
312+
endif()
313+
309314
set(ARROW_TESTING_SRCS
310315
io/test_common.cc
311316
ipc/test_common.cc

cpp/src/arrow/ipc/options.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ IpcReadOptions IpcReadOptions::Defaults() { return IpcReadOptions(); }
2929
namespace internal {
3030

3131
Status CheckCompressionSupported(Compression::type codec) {
32-
if (!(codec == Compression::LZ4_FRAME || codec == Compression::ZSTD)) {
32+
if (!(codec == Compression::LZ4_FRAME || codec == Compression::ZSTD || codec == Compression::FASTPFOR)) {
3333
return Status::Invalid("Only LZ4_FRAME and ZSTD compression allowed");
3434
}
3535
return Status::OK();

cpp/src/arrow/ipc/read_write_test.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -706,7 +706,8 @@ TEST_F(TestWriteRecordBatch, WriteWithCompression) {
706706
auto batch =
707707
RecordBatch::Make(schema, length, {rg.String(500, 0, 10, 0.1), dict_array});
708708

709-
std::vector<Compression::type> codecs = {Compression::LZ4_FRAME, Compression::ZSTD};
709+
std::vector<Compression::type> codecs = {Compression::LZ4_FRAME, Compression::ZSTD,
710+
Compression::FASTPFOR};
710711
for (auto codec : codecs) {
711712
if (!util::Codec::IsAvailable(codec)) {
712713
continue;

cpp/src/arrow/ipc/reader.cc

Lines changed: 91 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -474,6 +474,90 @@ Result<std::shared_ptr<Buffer>> DecompressBuffer(const std::shared_ptr<Buffer>&
474474
return std::move(uncompressed);
475475
}
476476

477+
Status DecompressBufferByType(const Buffer& buffer, util::Codec* codec,
478+
std::shared_ptr<Buffer>* out, MemoryPool* pool) {
479+
const uint8_t* data = buffer.data();
480+
int64_t compressed_size = buffer.size() - sizeof(int64_t);
481+
int64_t uncompressed_size = bit_util::FromLittleEndian(util::SafeLoadAs<int64_t>(data));
482+
483+
ARROW_ASSIGN_OR_RAISE(auto uncompressed, AllocateBuffer(uncompressed_size, pool));
484+
485+
int64_t actual_decompressed;
486+
ARROW_ASSIGN_OR_RAISE(
487+
actual_decompressed,
488+
codec->Decompress(compressed_size, data + sizeof(int64_t), uncompressed_size,
489+
uncompressed->mutable_data()));
490+
if (actual_decompressed != uncompressed_size) {
491+
return Status::Invalid("Failed to fully decompress buffer, expected ",
492+
uncompressed_size, " bytes but decompressed ",
493+
actual_decompressed);
494+
}
495+
*out = std::move(uncompressed);
496+
return Status::OK();
497+
}
498+
499+
Status DecompressBuffersByType(Compression::type compression,
500+
const IpcReadOptions& options,
501+
std::vector<std::shared_ptr<ArrayData>>* arrs,
502+
const std::vector<std::shared_ptr<Field>>& schema_fields) {
503+
ARROW_CHECK_EQ(arrs->size(), schema_fields.size());
504+
505+
std::unique_ptr<util::Codec> codec;
506+
std::unique_ptr<util::Codec> fastpfor32_codec;
507+
std::unique_ptr<util::Codec> fastpfor64_codec;
508+
ARROW_ASSIGN_OR_RAISE(codec, util::Codec::Create(Compression::LZ4_FRAME));
509+
ARROW_ASSIGN_OR_RAISE(fastpfor32_codec, util::Codec::CreateInt32(compression));
510+
ARROW_ASSIGN_OR_RAISE(fastpfor64_codec, util::Codec::CreateInt64(compression));
511+
512+
for (size_t field_idx = 0; field_idx < schema_fields.size(); ++field_idx) {
513+
const auto& field = schema_fields[field_idx];
514+
auto& arr = (*arrs)[field_idx];
515+
if (field->type()->id() == Type::NA) continue;
516+
517+
const auto& layout_buffers = field->type()->layout().buffers;
518+
for (size_t i = 0; i < layout_buffers.size(); ++i) {
519+
const auto& layout = layout_buffers[i];
520+
if (arr->buffers[i] == nullptr) {
521+
continue;
522+
}
523+
if (arr->buffers[i]->size() == 0) {
524+
continue;
525+
}
526+
if (arr->buffers[i]->size() < 8) {
527+
return Status::Invalid(
528+
"Likely corrupted message, compressed buffers "
529+
"are larger than 8 bytes by construction");
530+
}
531+
auto& buffer = arr->buffers[i];
532+
switch (layout.kind) {
533+
case DataTypeLayout::BufferKind::FIXED_WIDTH:
534+
if (layout.byte_width == 4 && field->type()->id() != Type::FLOAT) {
535+
RETURN_NOT_OK(DecompressBufferByType(*buffer, fastpfor32_codec.get(), &buffer,
536+
options.memory_pool));
537+
} else if (layout.byte_width == 8 && field->type()->id() != Type::DOUBLE) {
538+
RETURN_NOT_OK(DecompressBufferByType(*buffer, fastpfor64_codec.get(), &buffer,
539+
options.memory_pool));
540+
} else {
541+
RETURN_NOT_OK(
542+
DecompressBufferByType(*buffer, codec.get(), &buffer, options.memory_pool));
543+
}
544+
break;
545+
case DataTypeLayout::BufferKind::BITMAP:
546+
case DataTypeLayout::BufferKind::VARIABLE_WIDTH: {
547+
RETURN_NOT_OK(
548+
DecompressBufferByType(*buffer, codec.get(), &buffer, options.memory_pool));
549+
break;
550+
}
551+
case DataTypeLayout::BufferKind::ALWAYS_NULL:
552+
break;
553+
default:
554+
return Status::Invalid("Wrong buffer layout.");
555+
}
556+
}
557+
}
558+
return arrow::Status::OK();
559+
}
560+
477561
Status DecompressBuffers(Compression::type compression, const IpcReadOptions& options,
478562
ArrayDataVector* fields) {
479563
struct BufferAccumulator {
@@ -555,8 +639,13 @@ Result<std::shared_ptr<RecordBatch>> LoadRecordBatchSubset(
555639
filtered_columns = std::move(columns);
556640
}
557641
if (context.compression != Compression::UNCOMPRESSED) {
558-
RETURN_NOT_OK(
559-
DecompressBuffers(context.compression, context.options, &filtered_columns));
642+
643+
if (context.compression == Compression::FASTPFOR) {
644+
RETURN_NOT_OK(
645+
DecompressBuffersByType(context.compression, context.options, &filtered_columns, filtered_fields));
646+
} else {
647+
RETURN_NOT_OK(DecompressBuffers(context.compression, context.options, &filtered_columns));
648+
}
560649
}
561650

562651
// swap endian in a set of ArrayData if necessary (swap_endian == true)

cpp/src/arrow/ipc/writer.cc

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -183,10 +183,11 @@ class RecordBatchSerializer {
183183
}
184184

185185
Status CompressBuffer(const Buffer& buffer, util::Codec* codec,
186-
std::shared_ptr<Buffer>* out) {
186+
std::shared_ptr<Buffer>* out, MemoryPool* pool) {
187187
// Convert buffer to uncompressed-length-prefixed compressed buffer
188188
int64_t maximum_length = codec->MaxCompressedLen(buffer.size(), buffer.data());
189-
ARROW_ASSIGN_OR_RAISE(auto result, AllocateBuffer(maximum_length + sizeof(int64_t)));
189+
ARROW_ASSIGN_OR_RAISE(auto result,
190+
AllocateBuffer(maximum_length + sizeof(int64_t), pool));
190191

191192
int64_t actual_length;
192193
ARROW_ASSIGN_OR_RAISE(actual_length,
@@ -205,7 +206,7 @@ class RecordBatchSerializer {
205206
auto CompressOne = [&](size_t i) {
206207
if (out_->body_buffers[i]->size() > 0) {
207208
RETURN_NOT_OK(CompressBuffer(*out_->body_buffers[i], options_.codec.get(),
208-
&out_->body_buffers[i]));
209+
&out_->body_buffers[i], options_.memory_pool));
209210
}
210211
return Status::OK();
211212
};
@@ -214,6 +215,62 @@ class RecordBatchSerializer {
214215
options_.use_threads, static_cast<int>(out_->body_buffers.size()), CompressOne);
215216
}
216217

218+
Status CompressBodyBuffersByType(const std::vector<std::shared_ptr<Field>>& fields) {
219+
std::unique_ptr<util::Codec> codec;
220+
std::unique_ptr<util::Codec> int32_codec;
221+
std::unique_ptr<util::Codec> int64_codec;
222+
ARROW_ASSIGN_OR_RAISE(
223+
codec, util::Codec::Create(Compression::LZ4_FRAME, arrow::util::kUseDefaultCompressionLevel));
224+
ARROW_ASSIGN_OR_RAISE(
225+
int32_codec,
226+
util::Codec::CreateInt32(options_.codec->compression_type(), arrow::util::kUseDefaultCompressionLevel));
227+
ARROW_ASSIGN_OR_RAISE(
228+
int64_codec,
229+
util::Codec::CreateInt64(options_.codec->compression_type(), arrow::util::kUseDefaultCompressionLevel));
230+
231+
AppendCustomMetadata("ARROW:experimental_compression",
232+
util::Codec::GetCodecAsString(options_.codec->compression_type()));
233+
234+
int32_t buffer_idx = 0;
235+
for (const auto& field : fields) {
236+
if (field->type()->id() == Type::NA) continue;
237+
238+
const auto& layout_buffers = field->type()->layout().buffers;
239+
for (size_t i = 0; i < layout_buffers.size(); ++i) {
240+
const auto& layout = layout_buffers[i];
241+
auto& buffer = out_->body_buffers[buffer_idx + i];
242+
if (buffer->size() > 0) {
243+
switch (layout.kind) {
244+
case DataTypeLayout::BufferKind::FIXED_WIDTH:
245+
if (layout.byte_width == 4 && field->type()->id() != Type::FLOAT) {
246+
RETURN_NOT_OK(CompressBuffer(*buffer, int32_codec.get(), &buffer,
247+
options_.memory_pool));
248+
} else if (layout.byte_width == 8 && field->type()->id() != Type::DOUBLE) {
249+
RETURN_NOT_OK(CompressBuffer(*buffer, int64_codec.get(), &buffer,
250+
options_.memory_pool));
251+
} else {
252+
RETURN_NOT_OK(
253+
CompressBuffer(*buffer, codec.get(), &buffer, options_.memory_pool));
254+
}
255+
break;
256+
case DataTypeLayout::BufferKind::BITMAP:
257+
case DataTypeLayout::BufferKind::VARIABLE_WIDTH:
258+
RETURN_NOT_OK(
259+
CompressBuffer(*buffer, codec.get(), &buffer, options_.memory_pool));
260+
break;
261+
case DataTypeLayout::BufferKind::ALWAYS_NULL:
262+
break;
263+
default:
264+
return Status::Invalid("Wrong buffer layout.");
265+
}
266+
}
267+
}
268+
buffer_idx += layout_buffers.size();
269+
}
270+
// TODO: ParallelFor?
271+
return Status::OK();
272+
}
273+
217274
Status Assemble(const RecordBatch& batch) {
218275
if (field_nodes_.size() > 0) {
219276
field_nodes_.clear();
@@ -236,7 +293,12 @@ class RecordBatchSerializer {
236293
out_->raw_body_length = raw_size;
237294

238295
if (options_.codec != nullptr) {
239-
RETURN_NOT_OK(CompressBodyBuffers());
296+
297+
if (options_.codec->compression_type() == Compression::FASTPFOR) {
298+
RETURN_NOT_OK(CompressBodyBuffersByType(batch.schema()->fields()));
299+
} else {
300+
RETURN_NOT_OK(CompressBodyBuffers());
301+
}
240302
}
241303

242304
// The position for the start of a buffer relative to the passed frame of

0 commit comments

Comments
 (0)