Skip to content

Commit 7554e7d

Browse files
JkSelfmarin-ma
andauthored
[Arrow 3.0 Rebase] Rebase Shuffle component c++ and Java API (#109)
* [Java] compression workaround * [oap-native-sql] add FastPFOR codec Conflicts: cpp/cmake_modules/ThirdpartyToolchain.cmake cpp/src/arrow/ipc/writer.cc cpp/src/arrow/util/compression.cc cpp/src/arrow/util/compression.h * fix the compile errors after cherry pick 8f2b612ab72b36eeac22420c929043800dc61ac0 * [oap-native-sql] fastpfor add test Conflicts: cpp/src/arrow/util/compression_test.cc * [oap-native-sql] FastPFOR decompression Conflicts: cpp/src/arrow/ipc/reader.cc cpp/src/arrow/ipc/writer.cc * [oap-native-sql] filter float/double Conflicts: cpp/src/arrow/ipc/reader.cc * [oap-native-sql] fix * FastPForLib::CODECFactory::getFromName(fastpfor256) is not thread safe Conflicts: cpp/src/arrow/ipc/writer.cc * [oap-native-sql] force FastPFOR build with source Conflicts: cpp/cmake_modules/ThirdpartyToolchain.cmake cpp/thirdparty/versions.txt * typo Conflicts: cpp/src/arrow/util/compression.h * fix the failed unit test Co-authored-by: rongma1997 <rong.ma@intel.com>
1 parent d613aa6 commit 7554e7d

File tree

18 files changed

+469
-15
lines changed

18 files changed

+469
-15
lines changed

cpp/CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -705,6 +705,11 @@ if(ARROW_WITH_ZSTD)
705705
endif()
706706
endif()
707707

708+
if(ARROW_WITH_FASTPFOR)
709+
list(APPEND ARROW_STATIC_LINK_LIBS FastPFOR::FastPFOR)
710+
list(APPEND ARROW_STATIC_INSTALL_INTERFACE_LIBS FastPFOR::FastPFOR)
711+
endif()
712+
708713
if(ARROW_ORC)
709714
list(APPEND ARROW_LINK_LIBS orc::liborc ${ARROW_PROTOBUF_LIBPROTOBUF})
710715
list(APPEND ARROW_STATIC_LINK_LIBS orc::liborc ${ARROW_PROTOBUF_LIBPROTOBUF})

cpp/cmake_modules/DefineOptions.cmake

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,7 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
360360
define_option(ARROW_WITH_SNAPPY "Build with Snappy compression" OFF)
361361
define_option(ARROW_WITH_ZLIB "Build with zlib compression" OFF)
362362
define_option(ARROW_WITH_ZSTD "Build with zstd compression" OFF)
363+
define_option(ARROW_WITH_FASTPFOR "Build with FastPFOR compression" OFF)
363364

364365
define_option(
365366
ARROW_WITH_UTF8PROC

cpp/cmake_modules/ThirdpartyToolchain.cmake

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ set(ARROW_THIRDPARTY_DEPENDENCIES
7070
Thrift
7171
utf8proc
7272
ZLIB
73-
zstd)
73+
zstd
74+
FastPFOR)
7475

7576
# TODO(wesm): External GTest shared libraries are not currently
7677
# supported when building with MSVC because of the way that
@@ -607,6 +608,15 @@ else()
607608
)
608609
endif()
609610

611+
if(DEFINED ENV{FASTPFOR_SOURCE_URL})
612+
set(FASTPFOR_SOURCE_URL "$ENV{FASTPFOR_SOURCE_URL}")
613+
else()
614+
set_urls(
615+
FASTPFOR_SOURCE_URL
616+
"https://github.com/lemire/FastPFor/archive/${ARROW_FASTPFOR_BUILD_VERSION}.tar.gz"
617+
)
618+
endif()
619+
610620
# ----------------------------------------------------------------------
611621
# ExternalProject options
612622

@@ -2258,6 +2268,42 @@ if(ARROW_WITH_UTF8PROC)
22582268
include_directories(SYSTEM ${UTF8PROC_INCLUDE_DIR})
22592269
endif()
22602270

2271+
if(ARROW_WITH_FASTPFOR)
2272+
message(STATUS "Building (vendored) FastPFOR from source")
2273+
2274+
if(MSVC)
2275+
message(FATAL_ERROR "Cannot build FastPFOR on MSVC")
2276+
else()
2277+
set(FASTPFOR_STATIC_LIB_NAME libFastPFOR.a)
2278+
endif()
2279+
2280+
set(FASTPFOR_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/fastpfor_ep-install")
2281+
set(FASTPFOR_STATIC_LIB "${FASTPFOR_PREFIX}/lib/${FASTPFOR_STATIC_LIB_NAME}")
2282+
set(FASTPFOR_INCLUDE_DIR "${FASTPFOR_PREFIX}/include")
2283+
set(FASTPFOR_CMAKE_ARGS
2284+
${EP_COMMON_CMAKE_ARGS}
2285+
"-DCMAKE_INSTALL_PREFIX=${FASTPFOR_PREFIX}"
2286+
-DCMAKE_INSTALL_LIBDIR=lib
2287+
-DCMAKE_BUILD_TYPE=Release)
2288+
2289+
externalproject_add(fastpfor_ep
2290+
INSTALL_DIR ${FASTPFOR_PREFIX}
2291+
URL ${FASTPFOR_SOURCE_URL} ${EP_LOG_OPTIONS}
2292+
BUILD_BYPRODUCTS "${FASTPFOR_STATIC_LIB}"
2293+
CMAKE_ARGS ${FASTPFOR_CMAKE_ARGS})
2294+
2295+
include_directories(SYSTEM ${FASTPFOR_INCLUDE_DIR})
2296+
file(MAKE_DIRECTORY ${FASTPFOR_INCLUDE_DIR})
2297+
2298+
add_library(FastPFOR::FastPFOR STATIC IMPORTED)
2299+
set_target_properties(FastPFOR::FastPFOR
2300+
PROPERTIES IMPORTED_LOCATION "${FASTPFOR_STATIC_LIB}"
2301+
INTERFACE_INCLUDE_DIRECTORIES "${FASTPFOR_INCLUDE_DIR}")
2302+
2303+
add_dependencies(toolchain fastpfor_ep)
2304+
add_dependencies(FastPFOR::FastPFOR fastpfor_ep)
2305+
endif()
2306+
22612307
macro(build_cares)
22622308
message(STATUS "Building c-ares from source")
22632309
set(CARES_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/cares_ep-install")

cpp/src/arrow/CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,11 @@ if(ARROW_WITH_ZSTD)
290290
list(APPEND ARROW_SRCS util/compression_zstd.cc)
291291
endif()
292292

293+
if(ARROW_WITH_FASTPFOR)
294+
add_definitions(-DARROW_WITH_FASTPFOR)
295+
list(APPEND ARROW_SRCS util/compression_fastpfor.cc)
296+
endif()
297+
293298
set(ARROW_TESTING_SRCS
294299
io/test_common.cc
295300
ipc/test_common.cc

cpp/src/arrow/ipc/options.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ IpcReadOptions IpcReadOptions::Defaults() { return IpcReadOptions(); }
2929
namespace internal {
3030

3131
Status CheckCompressionSupported(Compression::type codec) {
32-
if (!(codec == Compression::LZ4_FRAME || codec == Compression::ZSTD)) {
32+
if (!(codec == Compression::LZ4_FRAME || codec == Compression::ZSTD || codec == Compression::FASTPFOR)) {
3333
return Status::Invalid("Only LZ4_FRAME and ZSTD compression allowed");
3434
}
3535
return Status::OK();

cpp/src/arrow/ipc/read_write_test.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -611,7 +611,8 @@ TEST_F(TestWriteRecordBatch, WriteWithCompression) {
611611
auto batch =
612612
RecordBatch::Make(schema, length, {rg.String(500, 0, 10, 0.1), dict_array});
613613

614-
std::vector<Compression::type> codecs = {Compression::LZ4_FRAME, Compression::ZSTD};
614+
std::vector<Compression::type> codecs = {Compression::LZ4_FRAME, Compression::ZSTD,
615+
Compression::FASTPFOR};
615616
for (auto codec : codecs) {
616617
if (!util::Codec::IsAvailable(codec)) {
617618
continue;

cpp/src/arrow/ipc/reader.cc

Lines changed: 91 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,90 @@ Result<std::shared_ptr<Buffer>> DecompressBuffer(const std::shared_ptr<Buffer>&
401401
return std::move(uncompressed);
402402
}
403403

404+
Status DecompressBufferByType(const Buffer& buffer, util::Codec* codec,
405+
std::shared_ptr<Buffer>* out, MemoryPool* pool) {
406+
const uint8_t* data = buffer.data();
407+
int64_t compressed_size = buffer.size() - sizeof(int64_t);
408+
int64_t uncompressed_size = BitUtil::FromLittleEndian(util::SafeLoadAs<int64_t>(data));
409+
410+
ARROW_ASSIGN_OR_RAISE(auto uncompressed, AllocateBuffer(uncompressed_size, pool));
411+
412+
int64_t actual_decompressed;
413+
ARROW_ASSIGN_OR_RAISE(
414+
actual_decompressed,
415+
codec->Decompress(compressed_size, data + sizeof(int64_t), uncompressed_size,
416+
uncompressed->mutable_data()));
417+
if (actual_decompressed != uncompressed_size) {
418+
return Status::Invalid("Failed to fully decompress buffer, expected ",
419+
uncompressed_size, " bytes but decompressed ",
420+
actual_decompressed);
421+
}
422+
*out = std::move(uncompressed);
423+
return Status::OK();
424+
}
425+
426+
Status DecompressBuffersByType(Compression::type compression,
427+
const IpcReadOptions& options,
428+
std::vector<std::shared_ptr<ArrayData>>* arrs,
429+
const std::vector<std::shared_ptr<Field>>& schema_fields) {
430+
ARROW_CHECK_EQ(arrs->size(), schema_fields.size());
431+
432+
std::unique_ptr<util::Codec> codec;
433+
std::unique_ptr<util::Codec> fastpfor32_codec;
434+
std::unique_ptr<util::Codec> fastpfor64_codec;
435+
ARROW_ASSIGN_OR_RAISE(codec, util::Codec::Create(Compression::LZ4_FRAME));
436+
ARROW_ASSIGN_OR_RAISE(fastpfor32_codec, util::Codec::CreateInt32(compression));
437+
ARROW_ASSIGN_OR_RAISE(fastpfor64_codec, util::Codec::CreateInt64(compression));
438+
439+
for (size_t field_idx = 0; field_idx < schema_fields.size(); ++field_idx) {
440+
const auto& field = schema_fields[field_idx];
441+
auto& arr = (*arrs)[field_idx];
442+
if (field->type()->id() == Type::NA) continue;
443+
444+
const auto& layout_buffers = field->type()->layout().buffers;
445+
for (size_t i = 0; i < layout_buffers.size(); ++i) {
446+
const auto& layout = layout_buffers[i];
447+
if (arr->buffers[i] == nullptr) {
448+
continue;
449+
}
450+
if (arr->buffers[i]->size() == 0) {
451+
continue;
452+
}
453+
if (arr->buffers[i]->size() < 8) {
454+
return Status::Invalid(
455+
"Likely corrupted message, compressed buffers "
456+
"are larger than 8 bytes by construction");
457+
}
458+
auto& buffer = arr->buffers[i];
459+
switch (layout.kind) {
460+
case DataTypeLayout::BufferKind::FIXED_WIDTH:
461+
if (layout.byte_width == 4 && field->type()->id() != Type::FLOAT) {
462+
RETURN_NOT_OK(DecompressBufferByType(*buffer, fastpfor32_codec.get(), &buffer,
463+
options.memory_pool));
464+
} else if (layout.byte_width == 8 && field->type()->id() != Type::DOUBLE) {
465+
RETURN_NOT_OK(DecompressBufferByType(*buffer, fastpfor64_codec.get(), &buffer,
466+
options.memory_pool));
467+
} else {
468+
RETURN_NOT_OK(
469+
DecompressBufferByType(*buffer, codec.get(), &buffer, options.memory_pool));
470+
}
471+
break;
472+
case DataTypeLayout::BufferKind::BITMAP:
473+
case DataTypeLayout::BufferKind::VARIABLE_WIDTH: {
474+
RETURN_NOT_OK(
475+
DecompressBufferByType(*buffer, codec.get(), &buffer, options.memory_pool));
476+
break;
477+
}
478+
case DataTypeLayout::BufferKind::ALWAYS_NULL:
479+
break;
480+
default:
481+
return Status::Invalid("Wrong buffer layout.");
482+
}
483+
}
484+
}
485+
return arrow::Status::OK();
486+
}
487+
404488
Status DecompressBuffers(Compression::type compression, const IpcReadOptions& options,
405489
ArrayDataVector* fields) {
406490
struct BufferAccumulator {
@@ -482,7 +566,13 @@ Result<std::shared_ptr<RecordBatch>> LoadRecordBatchSubset(
482566
filtered_columns = std::move(columns);
483567
}
484568
if (compression != Compression::UNCOMPRESSED) {
485-
RETURN_NOT_OK(DecompressBuffers(compression, options, &filtered_columns));
569+
570+
if (compression == Compression::FASTPFOR) {
571+
RETURN_NOT_OK(
572+
DecompressBuffersByType(compression, options, &filtered_columns, filtered_fields));
573+
} else {
574+
RETURN_NOT_OK(DecompressBuffers(compression, options, &filtered_columns));
575+
}
486576
}
487577

488578
return RecordBatch::Make(filtered_schema, metadata->length(),

cpp/src/arrow/ipc/writer.cc

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -181,10 +181,11 @@ class RecordBatchSerializer {
181181
}
182182

183183
Status CompressBuffer(const Buffer& buffer, util::Codec* codec,
184-
std::shared_ptr<Buffer>* out) {
184+
std::shared_ptr<Buffer>* out, MemoryPool* pool) {
185185
// Convert buffer to uncompressed-length-prefixed compressed buffer
186186
int64_t maximum_length = codec->MaxCompressedLen(buffer.size(), buffer.data());
187-
ARROW_ASSIGN_OR_RAISE(auto result, AllocateBuffer(maximum_length + sizeof(int64_t)));
187+
ARROW_ASSIGN_OR_RAISE(auto result,
188+
AllocateBuffer(maximum_length + sizeof(int64_t), pool));
188189

189190
int64_t actual_length;
190191
ARROW_ASSIGN_OR_RAISE(actual_length,
@@ -203,7 +204,7 @@ class RecordBatchSerializer {
203204
auto CompressOne = [&](size_t i) {
204205
if (out_->body_buffers[i]->size() > 0) {
205206
RETURN_NOT_OK(CompressBuffer(*out_->body_buffers[i], options_.codec.get(),
206-
&out_->body_buffers[i]));
207+
&out_->body_buffers[i], options_.memory_pool));
207208
}
208209
return Status::OK();
209210
};
@@ -212,6 +213,62 @@ class RecordBatchSerializer {
212213
options_.use_threads, static_cast<int>(out_->body_buffers.size()), CompressOne);
213214
}
214215

216+
Status CompressBodyBuffersByType(const std::vector<std::shared_ptr<Field>>& fields) {
217+
std::unique_ptr<util::Codec> codec;
218+
std::unique_ptr<util::Codec> int32_codec;
219+
std::unique_ptr<util::Codec> int64_codec;
220+
ARROW_ASSIGN_OR_RAISE(
221+
codec, util::Codec::Create(Compression::LZ4_FRAME, arrow::util::kUseDefaultCompressionLevel));
222+
ARROW_ASSIGN_OR_RAISE(
223+
int32_codec,
224+
util::Codec::CreateInt32(options_.codec->compression_type(), arrow::util::kUseDefaultCompressionLevel));
225+
ARROW_ASSIGN_OR_RAISE(
226+
int64_codec,
227+
util::Codec::CreateInt64(options_.codec->compression_type(), arrow::util::kUseDefaultCompressionLevel));
228+
229+
AppendCustomMetadata("ARROW:experimental_compression",
230+
util::Codec::GetCodecAsString(options_.codec->compression_type()));
231+
232+
int32_t buffer_idx = 0;
233+
for (const auto& field : fields) {
234+
if (field->type()->id() == Type::NA) continue;
235+
236+
const auto& layout_buffers = field->type()->layout().buffers;
237+
for (size_t i = 0; i < layout_buffers.size(); ++i) {
238+
const auto& layout = layout_buffers[i];
239+
auto& buffer = out_->body_buffers[buffer_idx + i];
240+
if (buffer->size() > 0) {
241+
switch (layout.kind) {
242+
case DataTypeLayout::BufferKind::FIXED_WIDTH:
243+
if (layout.byte_width == 4 && field->type()->id() != Type::FLOAT) {
244+
RETURN_NOT_OK(CompressBuffer(*buffer, int32_codec.get(), &buffer,
245+
options_.memory_pool));
246+
} else if (layout.byte_width == 8 && field->type()->id() != Type::DOUBLE) {
247+
RETURN_NOT_OK(CompressBuffer(*buffer, int64_codec.get(), &buffer,
248+
options_.memory_pool));
249+
} else {
250+
RETURN_NOT_OK(
251+
CompressBuffer(*buffer, codec.get(), &buffer, options_.memory_pool));
252+
}
253+
break;
254+
case DataTypeLayout::BufferKind::BITMAP:
255+
case DataTypeLayout::BufferKind::VARIABLE_WIDTH:
256+
RETURN_NOT_OK(
257+
CompressBuffer(*buffer, codec.get(), &buffer, options_.memory_pool));
258+
break;
259+
case DataTypeLayout::BufferKind::ALWAYS_NULL:
260+
break;
261+
default:
262+
return Status::Invalid("Wrong buffer layout.");
263+
}
264+
}
265+
}
266+
buffer_idx += layout_buffers.size();
267+
}
268+
// TODO: ParallelFor?
269+
return Status::OK();
270+
}
271+
215272
Status Assemble(const RecordBatch& batch) {
216273
if (field_nodes_.size() > 0) {
217274
field_nodes_.clear();
@@ -225,7 +282,12 @@ class RecordBatchSerializer {
225282
}
226283

227284
if (options_.codec != nullptr) {
228-
RETURN_NOT_OK(CompressBodyBuffers());
285+
286+
if (options_.codec->compression_type() == Compression::FASTPFOR) {
287+
RETURN_NOT_OK(CompressBodyBuffersByType(batch.schema()->fields()));
288+
} else {
289+
RETURN_NOT_OK(CompressBodyBuffers());
290+
}
229291
}
230292

231293
// The position for the start of a buffer relative to the passed frame of

0 commit comments

Comments
 (0)