Skip to content

Commit b7f99f0

Browse files
JkSelfmarin-ma
authored andcommitted
[Arrow 3.0 Rebase] Rebase Shuffle component c++ and Java API (apache#109)
* [Java] compression workaround * [oap-native-sql] add FastPFOR codec Conflicts: cpp/cmake_modules/ThirdpartyToolchain.cmake cpp/src/arrow/ipc/writer.cc cpp/src/arrow/util/compression.cc cpp/src/arrow/util/compression.h * fix the compile errors after cherry pick 8f2b612ab72b36eeac22420c929043800dc61ac0 * [oap-native-sql] fastpfor add test Conflicts: cpp/src/arrow/util/compression_test.cc * [oap-native-sql] FastPFOR decompression Conflicts: cpp/src/arrow/ipc/reader.cc cpp/src/arrow/ipc/writer.cc * [oap-native-sql] filter float/double Conflicts: cpp/src/arrow/ipc/reader.cc * [oap-native-sql] fix * FastPForLib::CODECFactory::getFromName(fastpfor256) is not thread safe Conflicts: cpp/src/arrow/ipc/writer.cc * [oap-native-sql] force FastPFOR build with source Conflicts: cpp/cmake_modules/ThirdpartyToolchain.cmake cpp/thirdparty/versions.txt * typo Conflicts: cpp/src/arrow/util/compression.h * fix the failed unit test Co-authored-by: rongma1997 <rong.ma@intel.com>
1 parent 81ff679 commit b7f99f0

File tree

18 files changed

+469
-16
lines changed

18 files changed

+469
-16
lines changed

cpp/CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -712,6 +712,11 @@ if(ARROW_WITH_ZSTD)
712712
endif()
713713
endif()
714714

715+
if(ARROW_WITH_FASTPFOR)
716+
list(APPEND ARROW_STATIC_LINK_LIBS FastPFOR::FastPFOR)
717+
list(APPEND ARROW_STATIC_INSTALL_INTERFACE_LIBS FastPFOR::FastPFOR)
718+
endif()
719+
715720
if(ARROW_ORC)
716721
list(APPEND ARROW_LINK_LIBS orc::liborc ${ARROW_PROTOBUF_LIBPROTOBUF})
717722
list(APPEND ARROW_STATIC_LINK_LIBS orc::liborc ${ARROW_PROTOBUF_LIBPROTOBUF})

cpp/cmake_modules/DefineOptions.cmake

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,7 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
362362
define_option(ARROW_WITH_SNAPPY "Build with Snappy compression" OFF)
363363
define_option(ARROW_WITH_ZLIB "Build with zlib compression" OFF)
364364
define_option(ARROW_WITH_ZSTD "Build with zstd compression" OFF)
365+
define_option(ARROW_WITH_FASTPFOR "Build with FastPFOR compression" OFF)
365366

366367
define_option(
367368
ARROW_WITH_UTF8PROC

cpp/cmake_modules/ThirdpartyToolchain.cmake

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ set(ARROW_THIRDPARTY_DEPENDENCIES
7171
utf8proc
7272
xsimd
7373
ZLIB
74-
zstd)
74+
zstd
75+
FastPFOR)
7576

7677
# TODO(wesm): External GTest shared libraries are not currently
7778
# supported when building with MSVC because of the way that
@@ -617,6 +618,15 @@ else()
617618
)
618619
endif()
619620

621+
if(DEFINED ENV{FASTPFOR_SOURCE_URL})
622+
set(FASTPFOR_SOURCE_URL "$ENV{FASTPFOR_SOURCE_URL}")
623+
else()
624+
set_urls(
625+
FASTPFOR_SOURCE_URL
626+
"https://github.com/lemire/FastPFor/archive/${ARROW_FASTPFOR_BUILD_VERSION}.tar.gz"
627+
)
628+
endif()
629+
620630
# ----------------------------------------------------------------------
621631
# ExternalProject options
622632

@@ -2292,6 +2302,42 @@ if(ARROW_WITH_UTF8PROC)
22922302
include_directories(SYSTEM ${UTF8PROC_INCLUDE_DIR})
22932303
endif()
22942304

2305+
if(ARROW_WITH_FASTPFOR)
2306+
message(STATUS "Building (vendored) FastPFOR from source")
2307+
2308+
if(MSVC)
2309+
message(FATAL_ERROR "Cannot build FastPFOR on MSVC")
2310+
else()
2311+
set(FASTPFOR_STATIC_LIB_NAME libFastPFOR.a)
2312+
endif()
2313+
2314+
set(FASTPFOR_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/fastpfor_ep-install")
2315+
set(FASTPFOR_STATIC_LIB "${FASTPFOR_PREFIX}/lib/${FASTPFOR_STATIC_LIB_NAME}")
2316+
set(FASTPFOR_INCLUDE_DIR "${FASTPFOR_PREFIX}/include")
2317+
set(FASTPFOR_CMAKE_ARGS
2318+
${EP_COMMON_CMAKE_ARGS}
2319+
"-DCMAKE_INSTALL_PREFIX=${FASTPFOR_PREFIX}"
2320+
-DCMAKE_INSTALL_LIBDIR=lib
2321+
-DCMAKE_BUILD_TYPE=Release)
2322+
2323+
externalproject_add(fastpfor_ep
2324+
INSTALL_DIR ${FASTPFOR_PREFIX}
2325+
URL ${FASTPFOR_SOURCE_URL} ${EP_LOG_OPTIONS}
2326+
BUILD_BYPRODUCTS "${FASTPFOR_STATIC_LIB}"
2327+
CMAKE_ARGS ${FASTPFOR_CMAKE_ARGS})
2328+
2329+
include_directories(SYSTEM ${FASTPFOR_INCLUDE_DIR})
2330+
file(MAKE_DIRECTORY ${FASTPFOR_INCLUDE_DIR})
2331+
2332+
add_library(FastPFOR::FastPFOR STATIC IMPORTED)
2333+
set_target_properties(FastPFOR::FastPFOR
2334+
PROPERTIES IMPORTED_LOCATION "${FASTPFOR_STATIC_LIB}"
2335+
INTERFACE_INCLUDE_DIRECTORIES "${FASTPFOR_INCLUDE_DIR}")
2336+
2337+
add_dependencies(toolchain fastpfor_ep)
2338+
add_dependencies(FastPFOR::FastPFOR fastpfor_ep)
2339+
endif()
2340+
22952341
macro(build_cares)
22962342
message(STATUS "Building c-ares from source")
22972343
set(CARES_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/cares_ep-install")

cpp/src/arrow/CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,11 @@ if(ARROW_WITH_ZSTD)
294294
list(APPEND ARROW_SRCS util/compression_zstd.cc)
295295
endif()
296296

297+
if(ARROW_WITH_FASTPFOR)
298+
add_definitions(-DARROW_WITH_FASTPFOR)
299+
list(APPEND ARROW_SRCS util/compression_fastpfor.cc)
300+
endif()
301+
297302
set(ARROW_TESTING_SRCS
298303
io/test_common.cc
299304
ipc/test_common.cc

cpp/src/arrow/ipc/options.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ IpcReadOptions IpcReadOptions::Defaults() { return IpcReadOptions(); }
2929
namespace internal {
3030

3131
Status CheckCompressionSupported(Compression::type codec) {
32-
if (!(codec == Compression::LZ4_FRAME || codec == Compression::ZSTD)) {
32+
if (!(codec == Compression::LZ4_FRAME || codec == Compression::ZSTD || codec == Compression::FASTPFOR)) {
3333
return Status::Invalid("Only LZ4_FRAME and ZSTD compression allowed");
3434
}
3535
return Status::OK();

cpp/src/arrow/ipc/read_write_test.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -610,7 +610,8 @@ TEST_F(TestWriteRecordBatch, WriteWithCompression) {
610610
auto batch =
611611
RecordBatch::Make(schema, length, {rg.String(500, 0, 10, 0.1), dict_array});
612612

613-
std::vector<Compression::type> codecs = {Compression::LZ4_FRAME, Compression::ZSTD};
613+
std::vector<Compression::type> codecs = {Compression::LZ4_FRAME, Compression::ZSTD,
614+
Compression::FASTPFOR};
614615
for (auto codec : codecs) {
615616
if (!util::Codec::IsAvailable(codec)) {
616617
continue;

cpp/src/arrow/ipc/reader.cc

Lines changed: 91 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,90 @@ Result<std::shared_ptr<Buffer>> DecompressBuffer(const std::shared_ptr<Buffer>&
426426
return std::move(uncompressed);
427427
}
428428

429+
Status DecompressBufferByType(const Buffer& buffer, util::Codec* codec,
430+
std::shared_ptr<Buffer>* out, MemoryPool* pool) {
431+
const uint8_t* data = buffer.data();
432+
int64_t compressed_size = buffer.size() - sizeof(int64_t);
433+
int64_t uncompressed_size = BitUtil::FromLittleEndian(util::SafeLoadAs<int64_t>(data));
434+
435+
ARROW_ASSIGN_OR_RAISE(auto uncompressed, AllocateBuffer(uncompressed_size, pool));
436+
437+
int64_t actual_decompressed;
438+
ARROW_ASSIGN_OR_RAISE(
439+
actual_decompressed,
440+
codec->Decompress(compressed_size, data + sizeof(int64_t), uncompressed_size,
441+
uncompressed->mutable_data()));
442+
if (actual_decompressed != uncompressed_size) {
443+
return Status::Invalid("Failed to fully decompress buffer, expected ",
444+
uncompressed_size, " bytes but decompressed ",
445+
actual_decompressed);
446+
}
447+
*out = std::move(uncompressed);
448+
return Status::OK();
449+
}
450+
451+
Status DecompressBuffersByType(Compression::type compression,
452+
const IpcReadOptions& options,
453+
std::vector<std::shared_ptr<ArrayData>>* arrs,
454+
const std::vector<std::shared_ptr<Field>>& schema_fields) {
455+
ARROW_CHECK_EQ(arrs->size(), schema_fields.size());
456+
457+
std::unique_ptr<util::Codec> codec;
458+
std::unique_ptr<util::Codec> fastpfor32_codec;
459+
std::unique_ptr<util::Codec> fastpfor64_codec;
460+
ARROW_ASSIGN_OR_RAISE(codec, util::Codec::Create(Compression::LZ4_FRAME));
461+
ARROW_ASSIGN_OR_RAISE(fastpfor32_codec, util::Codec::CreateInt32(compression));
462+
ARROW_ASSIGN_OR_RAISE(fastpfor64_codec, util::Codec::CreateInt64(compression));
463+
464+
for (size_t field_idx = 0; field_idx < schema_fields.size(); ++field_idx) {
465+
const auto& field = schema_fields[field_idx];
466+
auto& arr = (*arrs)[field_idx];
467+
if (field->type()->id() == Type::NA) continue;
468+
469+
const auto& layout_buffers = field->type()->layout().buffers;
470+
for (size_t i = 0; i < layout_buffers.size(); ++i) {
471+
const auto& layout = layout_buffers[i];
472+
if (arr->buffers[i] == nullptr) {
473+
continue;
474+
}
475+
if (arr->buffers[i]->size() == 0) {
476+
continue;
477+
}
478+
if (arr->buffers[i]->size() < 8) {
479+
return Status::Invalid(
480+
"Likely corrupted message, compressed buffers "
481+
"are larger than 8 bytes by construction");
482+
}
483+
auto& buffer = arr->buffers[i];
484+
switch (layout.kind) {
485+
case DataTypeLayout::BufferKind::FIXED_WIDTH:
486+
if (layout.byte_width == 4 && field->type()->id() != Type::FLOAT) {
487+
RETURN_NOT_OK(DecompressBufferByType(*buffer, fastpfor32_codec.get(), &buffer,
488+
options.memory_pool));
489+
} else if (layout.byte_width == 8 && field->type()->id() != Type::DOUBLE) {
490+
RETURN_NOT_OK(DecompressBufferByType(*buffer, fastpfor64_codec.get(), &buffer,
491+
options.memory_pool));
492+
} else {
493+
RETURN_NOT_OK(
494+
DecompressBufferByType(*buffer, codec.get(), &buffer, options.memory_pool));
495+
}
496+
break;
497+
case DataTypeLayout::BufferKind::BITMAP:
498+
case DataTypeLayout::BufferKind::VARIABLE_WIDTH: {
499+
RETURN_NOT_OK(
500+
DecompressBufferByType(*buffer, codec.get(), &buffer, options.memory_pool));
501+
break;
502+
}
503+
case DataTypeLayout::BufferKind::ALWAYS_NULL:
504+
break;
505+
default:
506+
return Status::Invalid("Wrong buffer layout.");
507+
}
508+
}
509+
}
510+
return arrow::Status::OK();
511+
}
512+
429513
Status DecompressBuffers(Compression::type compression, const IpcReadOptions& options,
430514
ArrayDataVector* fields) {
431515
struct BufferAccumulator {
@@ -507,8 +591,13 @@ Result<std::shared_ptr<RecordBatch>> LoadRecordBatchSubset(
507591
filtered_columns = std::move(columns);
508592
}
509593
if (context.compression != Compression::UNCOMPRESSED) {
510-
RETURN_NOT_OK(
511-
DecompressBuffers(context.compression, context.options, &filtered_columns));
594+
595+
if (context.compression == Compression::FASTPFOR) {
596+
RETURN_NOT_OK(
597+
DecompressBuffersByType(context.compression, context.options, &filtered_columns, filtered_fields));
598+
} else {
599+
RETURN_NOT_OK(DecompressBuffers(context.compression, context.options, &filtered_columns));
600+
}
512601
}
513602

514603
// swap endian in a set of ArrayData if necessary (swap_endian == true)

cpp/src/arrow/ipc/writer.cc

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -182,10 +182,11 @@ class RecordBatchSerializer {
182182
}
183183

184184
Status CompressBuffer(const Buffer& buffer, util::Codec* codec,
185-
std::shared_ptr<Buffer>* out) {
185+
std::shared_ptr<Buffer>* out, MemoryPool* pool) {
186186
// Convert buffer to uncompressed-length-prefixed compressed buffer
187187
int64_t maximum_length = codec->MaxCompressedLen(buffer.size(), buffer.data());
188-
ARROW_ASSIGN_OR_RAISE(auto result, AllocateBuffer(maximum_length + sizeof(int64_t)));
188+
ARROW_ASSIGN_OR_RAISE(auto result,
189+
AllocateBuffer(maximum_length + sizeof(int64_t), pool));
189190

190191
int64_t actual_length;
191192
ARROW_ASSIGN_OR_RAISE(actual_length,
@@ -204,7 +205,7 @@ class RecordBatchSerializer {
204205
auto CompressOne = [&](size_t i) {
205206
if (out_->body_buffers[i]->size() > 0) {
206207
RETURN_NOT_OK(CompressBuffer(*out_->body_buffers[i], options_.codec.get(),
207-
&out_->body_buffers[i]));
208+
&out_->body_buffers[i], options_.memory_pool));
208209
}
209210
return Status::OK();
210211
};
@@ -213,6 +214,62 @@ class RecordBatchSerializer {
213214
options_.use_threads, static_cast<int>(out_->body_buffers.size()), CompressOne);
214215
}
215216

217+
Status CompressBodyBuffersByType(const std::vector<std::shared_ptr<Field>>& fields) {
218+
std::unique_ptr<util::Codec> codec;
219+
std::unique_ptr<util::Codec> int32_codec;
220+
std::unique_ptr<util::Codec> int64_codec;
221+
ARROW_ASSIGN_OR_RAISE(
222+
codec, util::Codec::Create(Compression::LZ4_FRAME, arrow::util::kUseDefaultCompressionLevel));
223+
ARROW_ASSIGN_OR_RAISE(
224+
int32_codec,
225+
util::Codec::CreateInt32(options_.codec->compression_type(), arrow::util::kUseDefaultCompressionLevel));
226+
ARROW_ASSIGN_OR_RAISE(
227+
int64_codec,
228+
util::Codec::CreateInt64(options_.codec->compression_type(), arrow::util::kUseDefaultCompressionLevel));
229+
230+
AppendCustomMetadata("ARROW:experimental_compression",
231+
util::Codec::GetCodecAsString(options_.codec->compression_type()));
232+
233+
int32_t buffer_idx = 0;
234+
for (const auto& field : fields) {
235+
if (field->type()->id() == Type::NA) continue;
236+
237+
const auto& layout_buffers = field->type()->layout().buffers;
238+
for (size_t i = 0; i < layout_buffers.size(); ++i) {
239+
const auto& layout = layout_buffers[i];
240+
auto& buffer = out_->body_buffers[buffer_idx + i];
241+
if (buffer->size() > 0) {
242+
switch (layout.kind) {
243+
case DataTypeLayout::BufferKind::FIXED_WIDTH:
244+
if (layout.byte_width == 4 && field->type()->id() != Type::FLOAT) {
245+
RETURN_NOT_OK(CompressBuffer(*buffer, int32_codec.get(), &buffer,
246+
options_.memory_pool));
247+
} else if (layout.byte_width == 8 && field->type()->id() != Type::DOUBLE) {
248+
RETURN_NOT_OK(CompressBuffer(*buffer, int64_codec.get(), &buffer,
249+
options_.memory_pool));
250+
} else {
251+
RETURN_NOT_OK(
252+
CompressBuffer(*buffer, codec.get(), &buffer, options_.memory_pool));
253+
}
254+
break;
255+
case DataTypeLayout::BufferKind::BITMAP:
256+
case DataTypeLayout::BufferKind::VARIABLE_WIDTH:
257+
RETURN_NOT_OK(
258+
CompressBuffer(*buffer, codec.get(), &buffer, options_.memory_pool));
259+
break;
260+
case DataTypeLayout::BufferKind::ALWAYS_NULL:
261+
break;
262+
default:
263+
return Status::Invalid("Wrong buffer layout.");
264+
}
265+
}
266+
}
267+
buffer_idx += layout_buffers.size();
268+
}
269+
// TODO: ParallelFor?
270+
return Status::OK();
271+
}
272+
216273
Status Assemble(const RecordBatch& batch) {
217274
if (field_nodes_.size() > 0) {
218275
field_nodes_.clear();
@@ -226,7 +283,12 @@ class RecordBatchSerializer {
226283
}
227284

228285
if (options_.codec != nullptr) {
229-
RETURN_NOT_OK(CompressBodyBuffers());
286+
287+
if (options_.codec->compression_type() == Compression::FASTPFOR) {
288+
RETURN_NOT_OK(CompressBodyBuffersByType(batch.schema()->fields()));
289+
} else {
290+
RETURN_NOT_OK(CompressBodyBuffers());
291+
}
230292
}
231293

232294
// The position for the start of a buffer relative to the passed frame of

0 commit comments

Comments
 (0)