-
Notifications
You must be signed in to change notification settings - Fork 387
iproto: introduce IPROTO_INSERT_ARROW
request
#10518
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
locker
merged 3 commits into
tarantool:master
from
Gumix:iverbin/gh-10508-iproto_insert_arrow
Oct 1, 2024
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
# A macro to build the bundled nanoarrow library | ||
macro(nanoarrow_build) | ||
set(NANOARROW_SOURCE_DIR ${PROJECT_SOURCE_DIR}/third_party/arrow/nanoarrow/) | ||
set(NANOARROW_INSTALL_DIR ${PROJECT_BINARY_DIR}/third_party/nanoarrow/) | ||
set(NANOARROW_INCLUDE_DIR ${NANOARROW_INSTALL_DIR}/include/) | ||
|
||
set(NANOARROW_CORE_LIBRARY ${NANOARROW_INSTALL_DIR}/lib/libnanoarrow.a) | ||
set(NANOARROW_IPC_LIBRARY ${NANOARROW_INSTALL_DIR}/lib/libnanoarrow_ipc.a) | ||
set(FLATCCRT_LIBRARY ${NANOARROW_INSTALL_DIR}/lib/libflatccrt.a) | ||
set(NANOARROW_LIBRARIES | ||
${NANOARROW_CORE_LIBRARY} ${NANOARROW_IPC_LIBRARY} ${FLATCCRT_LIBRARY}) | ||
|
||
set(NANOARROW_CFLAGS ${DEPENDENCY_CFLAGS}) | ||
|
||
# Silence the "src/nanoarrow/ipc/decoder.c:1115:7: error: conversion from | ||
# `long unsigned int' to `int32_t' {aka `int'} may change value" warning | ||
# on ancient compilers, in particular GCC 4.8.5 from CentOS 7. | ||
set(NANOARROW_CFLAGS "${NANOARROW_CFLAGS} -Wno-error=conversion") | ||
|
||
list(APPEND NANOARROW_CMAKE_FLAGS "-DCMAKE_C_FLAGS=${NANOARROW_CFLAGS}") | ||
|
||
# Build nanoarrow IPC extension. | ||
list(APPEND NANOARROW_CMAKE_FLAGS "-DNANOARROW_IPC=ON") | ||
|
||
# Switch on the static build. | ||
list(APPEND NANOARROW_CMAKE_FLAGS "-DBUILD_STATIC_LIBS=ON") | ||
|
||
# Switch off the shared build. | ||
list(APPEND NANOARROW_CMAKE_FLAGS "-DBUILD_SHARED_LIBS=OFF") | ||
|
||
# Even though we set the external project's install dir | ||
# below, we still need to pass the corresponding install | ||
# prefix via cmake arguments. | ||
list(APPEND NANOARROW_CMAKE_FLAGS | ||
"-DCMAKE_INSTALL_PREFIX=${NANOARROW_INSTALL_DIR}") | ||
|
||
# Pass the same toolchain as is used to build tarantool itself, | ||
# because they can be incompatible. | ||
list(APPEND NANOARROW_CMAKE_FLAGS "-DCMAKE_C_COMPILER=${CMAKE_C_COMPILER}") | ||
list(APPEND NANOARROW_CMAKE_FLAGS "-DCMAKE_LINKER=${CMAKE_LINKER}") | ||
list(APPEND NANOARROW_CMAKE_FLAGS "-DCMAKE_AR=${CMAKE_AR}") | ||
list(APPEND NANOARROW_CMAKE_FLAGS "-DCMAKE_RANLIB=${CMAKE_RANLIB}") | ||
list(APPEND NANOARROW_CMAKE_FLAGS "-DCMAKE_NM=${CMAKE_NM}") | ||
list(APPEND NANOARROW_CMAKE_FLAGS "-DCMAKE_STRIP=${CMAKE_STRIP}") | ||
list(APPEND NANOARROW_CMAKE_FLAGS "-DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}") | ||
|
||
include(ExternalProject) | ||
ExternalProject_Add(bundled-nanoarrow-project | ||
PREFIX ${NANOARROW_INSTALL_DIR} | ||
SOURCE_DIR ${NANOARROW_SOURCE_DIR} | ||
CMAKE_ARGS ${NANOARROW_CMAKE_FLAGS} | ||
BUILD_BYPRODUCTS ${NANOARROW_LIBRARIES} | ||
) | ||
|
||
add_library(bundled-nanoarrow-core STATIC IMPORTED GLOBAL) | ||
set_target_properties(bundled-nanoarrow-core PROPERTIES IMPORTED_LOCATION | ||
${NANOARROW_CORE_LIBRARY}) | ||
add_dependencies(bundled-nanoarrow-core bundled-nanoarrow-project) | ||
|
||
add_library(bundled-nanoarrow-ipc STATIC IMPORTED GLOBAL) | ||
set_target_properties(bundled-nanoarrow-ipc PROPERTIES IMPORTED_LOCATION | ||
${NANOARROW_IPC_LIBRARY}) | ||
add_dependencies(bundled-nanoarrow-ipc bundled-nanoarrow-project) | ||
|
||
add_library(bundled-flatccrt STATIC IMPORTED GLOBAL) | ||
set_target_properties(bundled-flatccrt PROPERTIES IMPORTED_LOCATION | ||
${FLATCCRT_LIBRARY}) | ||
add_dependencies(bundled-flatccrt bundled-nanoarrow-project) | ||
|
||
add_custom_target(bundled-nanoarrow | ||
DEPENDS bundled-nanoarrow-core bundled-nanoarrow-ipc bundled-flatccrt) | ||
|
||
# Setup NANOARROW_INCLUDE_DIRS for global use. | ||
set(NANOARROW_INCLUDE_DIRS ${NANOARROW_INCLUDE_DIR}) | ||
endmacro() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,166 @@ | ||
/* | ||
* SPDX-License-Identifier: BSD-2-Clause | ||
* | ||
* Copyright 2010-2024, Tarantool AUTHORS, please see AUTHORS file. | ||
*/ | ||
#include "arrow_ipc.h" | ||
|
||
#include "diag.h" | ||
#include "error.h" | ||
#include "small/region.h" | ||
#include "nanoarrow/nanoarrow_ipc.h" | ||
|
||
int | ||
arrow_ipc_encode(struct ArrowArray *array, struct ArrowSchema *schema, | ||
struct region *region, const char **ret_data, | ||
const char **ret_data_end) | ||
{ | ||
ArrowErrorCode rc; | ||
struct ArrowError error; | ||
struct ArrowBuffer buffer; | ||
ArrowBufferInit(&buffer); | ||
|
||
struct ArrowArrayView array_view; | ||
rc = ArrowArrayViewInitFromSchema(&array_view, schema, &error); | ||
if (rc != NANOARROW_OK) { | ||
diag_set(ClientError, ER_ARROW_IPC_ENCODE, | ||
"ArrowArrayViewInitFromSchema", error.message); | ||
return -1; | ||
} | ||
|
||
/* Set buffer sizes and data pointers from an array. */ | ||
rc = ArrowArrayViewSetArray(&array_view, array, &error); | ||
if (rc != NANOARROW_OK) { | ||
diag_set(ClientError, ER_ARROW_IPC_ENCODE, | ||
"ArrowArrayViewSetArray", error.message); | ||
goto error1; | ||
} | ||
|
||
/* All bytes written to the stream will be appended to the buffer. */ | ||
struct ArrowIpcOutputStream stream; | ||
rc = ArrowIpcOutputStreamInitBuffer(&stream, &buffer); | ||
if (rc != NANOARROW_OK) { | ||
diag_set(ClientError, ER_ARROW_IPC_ENCODE, | ||
"ArrowIpcOutputStreamInitBuffer", NULL); | ||
goto error1; | ||
} | ||
|
||
/* | ||
* A stream writer which encodes schema and array into an IPC byte | ||
* stream. The writer takes ownership of the output byte stream. | ||
*/ | ||
struct ArrowIpcWriter writer; | ||
rc = ArrowIpcWriterInit(&writer, &stream); | ||
if (rc != NANOARROW_OK) { | ||
diag_set(ClientError, ER_ARROW_IPC_ENCODE, "ArrowIpcWriterInit", | ||
NULL); | ||
stream.release(&stream); | ||
goto error1; | ||
} | ||
|
||
rc = ArrowIpcWriterWriteSchema(&writer, schema, &error); | ||
if (rc != NANOARROW_OK) { | ||
diag_set(ClientError, ER_ARROW_IPC_ENCODE, | ||
"ArrowIpcWriterWriteSchema", error.message); | ||
goto error2; | ||
} | ||
|
||
rc = ArrowIpcWriterWriteArrayView(&writer, &array_view, &error); | ||
if (rc != NANOARROW_OK) { | ||
diag_set(ClientError, ER_ARROW_IPC_ENCODE, | ||
"ArrowIpcWriterWriteArrayView", error.message); | ||
goto error2; | ||
} | ||
|
||
/* | ||
* TODO: It is possible to avoid extra `memcpy()' by switching | ||
* `ArrowBuffer' to `region_realloc()'. | ||
*/ | ||
char *data = xregion_alloc(region, buffer.size_bytes); | ||
memcpy(data, buffer.data, buffer.size_bytes); | ||
*ret_data = data; | ||
*ret_data_end = data + buffer.size_bytes; | ||
|
||
ArrowIpcWriterReset(&writer); | ||
ArrowArrayViewReset(&array_view); | ||
ArrowBufferReset(&buffer); | ||
return 0; | ||
error2: | ||
ArrowIpcWriterReset(&writer); | ||
error1: | ||
ArrowArrayViewReset(&array_view); | ||
ArrowBufferReset(&buffer); | ||
return -1; | ||
} | ||
|
||
int | ||
arrow_ipc_decode(struct ArrowArray *array, struct ArrowSchema *schema, | ||
const char *data, const char *data_end) | ||
{ | ||
ssize_t size = data_end - data; | ||
if (size <= 0) { | ||
diag_set(ClientError, ER_ARROW_IPC_DECODE, NULL, | ||
"Unexpected data size"); | ||
return -1; | ||
} | ||
|
||
ArrowErrorCode rc; | ||
struct ArrowError error; | ||
struct ArrowBuffer buffer; | ||
ArrowBufferInit(&buffer); | ||
|
||
rc = ArrowBufferAppend(&buffer, data, size); | ||
if (rc != NANOARROW_OK) { | ||
diag_set(ClientError, ER_ARROW_IPC_DECODE, "ArrowBufferAppend", | ||
NULL); | ||
ArrowBufferReset(&buffer); | ||
return -1; | ||
} | ||
|
||
/* | ||
* Create an input stream from a buffer. | ||
* The stream takes ownership of the buffer and reads bytes from it. | ||
*/ | ||
struct ArrowIpcInputStream input_stream; | ||
rc = ArrowIpcInputStreamInitBuffer(&input_stream, &buffer); | ||
if (rc != NANOARROW_OK) { | ||
diag_set(ClientError, ER_ARROW_IPC_DECODE, | ||
"ArrowIpcInputStreamInitBuffer", NULL); | ||
ArrowBufferReset(&buffer); | ||
return -1; | ||
} | ||
|
||
/* | ||
* Initialize an array stream from an input stream of bytes. | ||
* The array_stream takes ownership of input_stream. | ||
*/ | ||
struct ArrowArrayStream array_stream; | ||
rc = ArrowIpcArrayStreamReaderInit(&array_stream, &input_stream, NULL); | ||
if (rc != NANOARROW_OK) { | ||
diag_set(ClientError, ER_ARROW_IPC_DECODE, | ||
"ArrowIpcArrayStreamReaderInit", NULL); | ||
input_stream.release(&input_stream); | ||
return -1; | ||
} | ||
|
||
rc = ArrowArrayStreamGetSchema(&array_stream, schema, &error); | ||
if (rc != NANOARROW_OK) { | ||
diag_set(ClientError, ER_ARROW_IPC_DECODE, | ||
"ArrowArrayStreamGetSchema", error.message); | ||
goto error; | ||
} | ||
|
||
rc = ArrowArrayStreamGetNext(&array_stream, array, &error); | ||
if (rc != NANOARROW_OK) { | ||
diag_set(ClientError, ER_ARROW_IPC_DECODE, | ||
"ArrowArrayStreamGetNext", error.message); | ||
schema->release(schema); | ||
goto error; | ||
} | ||
|
||
ArrowArrayStreamRelease(&array_stream); | ||
return 0; | ||
error: | ||
ArrowArrayStreamRelease(&array_stream); | ||
return -1; | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
/* | ||
* SPDX-License-Identifier: BSD-2-Clause | ||
* | ||
* Copyright 2010-2024, Tarantool AUTHORS, please see AUTHORS file. | ||
*/ | ||
#pragma once | ||
|
||
#include "arrow/abi.h" | ||
|
||
#if defined(__cplusplus) | ||
extern "C" { | ||
#endif /* defined(__cplusplus) */ | ||
|
||
struct region; | ||
|
||
/** | ||
* Encodes `array' and `schema' into Arrow IPC format. The memory is allocated | ||
* on `region', and the address is returned via `ret_data' and `ret_data_end'. | ||
* Returns 0 on success, -1 on failure (diag is set). | ||
*/ | ||
int | ||
arrow_ipc_encode(struct ArrowArray *array, struct ArrowSchema *schema, | ||
struct region *region, const char **ret_data, | ||
const char **ret_data_end); | ||
|
||
/** | ||
* Decodes `array' and `schema' from the `data' in Arrow IPC format. | ||
* Returns 0 on success, -1 on failure (diag is set). | ||
*/ | ||
int | ||
arrow_ipc_decode(struct ArrowArray *array, struct ArrowSchema *schema, | ||
const char *data, const char *data_end); | ||
|
||
#if defined(__cplusplus) | ||
} /* extern "C" */ | ||
#endif /* defined(__cplusplus) */ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.