Skip to content

iproto: introduce IPROTO_INSERT_ARROW request #10518

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,6 @@
[submodule "third_party/metrics"]
path = third_party/metrics
url = https://github.com/tarantool/metrics.git
[submodule "third_party/arrow/nanoarrow"]
path = third_party/arrow/nanoarrow
url = https://github.com/apache/arrow-nanoarrow.git
7 changes: 7 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,13 @@ include(BuildCDT)
libccdt_build()
add_dependencies(build_bundled_libs cdt)

#
# Nanoarrow
#
include(BuildNanoarrow)
nanoarrow_build()
add_dependencies(build_bundled_libs bundled-nanoarrow)

#
# Third-Party misc
#
Expand Down
75 changes: 75 additions & 0 deletions cmake/BuildNanoarrow.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# A macro to build the bundled nanoarrow library
macro(nanoarrow_build)
set(NANOARROW_SOURCE_DIR ${PROJECT_SOURCE_DIR}/third_party/arrow/nanoarrow/)
set(NANOARROW_INSTALL_DIR ${PROJECT_BINARY_DIR}/third_party/nanoarrow/)
set(NANOARROW_INCLUDE_DIR ${NANOARROW_INSTALL_DIR}/include/)

set(NANOARROW_CORE_LIBRARY ${NANOARROW_INSTALL_DIR}/lib/libnanoarrow.a)
set(NANOARROW_IPC_LIBRARY ${NANOARROW_INSTALL_DIR}/lib/libnanoarrow_ipc.a)
set(FLATCCRT_LIBRARY ${NANOARROW_INSTALL_DIR}/lib/libflatccrt.a)
set(NANOARROW_LIBRARIES
${NANOARROW_CORE_LIBRARY} ${NANOARROW_IPC_LIBRARY} ${FLATCCRT_LIBRARY})

set(NANOARROW_CFLAGS ${DEPENDENCY_CFLAGS})

# Silence the "src/nanoarrow/ipc/decoder.c:1115:7: error: conversion from
# `long unsigned int' to `int32_t' {aka `int'} may change value" warning
# on ancient compilers, in particular GCC 4.8.5 from CentOS 7.
set(NANOARROW_CFLAGS "${NANOARROW_CFLAGS} -Wno-error=conversion")

list(APPEND NANOARROW_CMAKE_FLAGS "-DCMAKE_C_FLAGS=${NANOARROW_CFLAGS}")

# Build nanoarrow IPC extension.
list(APPEND NANOARROW_CMAKE_FLAGS "-DNANOARROW_IPC=ON")

# Switch on the static build.
list(APPEND NANOARROW_CMAKE_FLAGS "-DBUILD_STATIC_LIBS=ON")

# Switch off the shared build.
list(APPEND NANOARROW_CMAKE_FLAGS "-DBUILD_SHARED_LIBS=OFF")

# Even though we set the external project's install dir
# below, we still need to pass the corresponding install
# prefix via cmake arguments.
list(APPEND NANOARROW_CMAKE_FLAGS
"-DCMAKE_INSTALL_PREFIX=${NANOARROW_INSTALL_DIR}")

# Pass the same toolchain as is used to build tarantool itself,
# because they can be incompatible.
list(APPEND NANOARROW_CMAKE_FLAGS "-DCMAKE_C_COMPILER=${CMAKE_C_COMPILER}")
list(APPEND NANOARROW_CMAKE_FLAGS "-DCMAKE_LINKER=${CMAKE_LINKER}")
list(APPEND NANOARROW_CMAKE_FLAGS "-DCMAKE_AR=${CMAKE_AR}")
list(APPEND NANOARROW_CMAKE_FLAGS "-DCMAKE_RANLIB=${CMAKE_RANLIB}")
list(APPEND NANOARROW_CMAKE_FLAGS "-DCMAKE_NM=${CMAKE_NM}")
list(APPEND NANOARROW_CMAKE_FLAGS "-DCMAKE_STRIP=${CMAKE_STRIP}")
list(APPEND NANOARROW_CMAKE_FLAGS "-DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}")

include(ExternalProject)
ExternalProject_Add(bundled-nanoarrow-project
PREFIX ${NANOARROW_INSTALL_DIR}
SOURCE_DIR ${NANOARROW_SOURCE_DIR}
CMAKE_ARGS ${NANOARROW_CMAKE_FLAGS}
BUILD_BYPRODUCTS ${NANOARROW_LIBRARIES}
)

add_library(bundled-nanoarrow-core STATIC IMPORTED GLOBAL)
set_target_properties(bundled-nanoarrow-core PROPERTIES IMPORTED_LOCATION
${NANOARROW_CORE_LIBRARY})
add_dependencies(bundled-nanoarrow-core bundled-nanoarrow-project)

add_library(bundled-nanoarrow-ipc STATIC IMPORTED GLOBAL)
set_target_properties(bundled-nanoarrow-ipc PROPERTIES IMPORTED_LOCATION
${NANOARROW_IPC_LIBRARY})
add_dependencies(bundled-nanoarrow-ipc bundled-nanoarrow-project)

add_library(bundled-flatccrt STATIC IMPORTED GLOBAL)
set_target_properties(bundled-flatccrt PROPERTIES IMPORTED_LOCATION
${FLATCCRT_LIBRARY})
add_dependencies(bundled-flatccrt bundled-nanoarrow-project)

add_custom_target(bundled-nanoarrow
DEPENDS bundled-nanoarrow-core bundled-nanoarrow-ipc bundled-flatccrt)

# Setup NANOARROW_INCLUDE_DIRS for global use.
set(NANOARROW_INCLUDE_DIRS ${NANOARROW_INCLUDE_DIR})
endmacro()
4 changes: 4 additions & 0 deletions debian/copyright
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,10 @@ Files: third_party/arrow/abi.h
Copyright: 2016-2024 The Apache Software Foundation
License: Apache-2.0

Files: third_party/arrow/nanoarrow/*
Copyright: 2023-2024 The Apache Software Foundation
License: Apache-2.0

License: BSD-2-Clause
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
Expand Down
1 change: 1 addition & 0 deletions extra/exports
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ box_index_tuple_position
box_info_lsn
box_init_latest_dd_version_id
box_insert
box_insert_arrow
box_iproto_override
box_iproto_send
box_is_ro
Expand Down
2 changes: 1 addition & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ endif()
set (reexport_libraries server core misc bitset csv swim swim_udp swim_ev
shutdown tzcode ${LUAJIT_LIBRARIES} ${MSGPUCK_LIBRARIES} ${ICU_LIBRARIES}
${CURL_LIBRARIES} ${XXHASH_LIBRARIES} ${LIBCDT_LIBRARIES}
${EXTRA_REEXPORT_LIBRARIES})
${NANOARROW_LIBRARIES} ${EXTRA_REEXPORT_LIBRARIES})

set (common_libraries
${reexport_libraries}
Expand Down
2 changes: 2 additions & 0 deletions src/box/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ add_custom_target(box_generate_lua_sources
set_property(DIRECTORY PROPERTY ADDITIONAL_MAKE_CLEAN_FILES ${lua_sources})

include_directories(${ZSTD_INCLUDE_DIRS})
include_directories(${NANOARROW_INCLUDE_DIRS})
include_directories(${PROJECT_BINARY_DIR}/src/box/sql)
include_directories(${PROJECT_BINARY_DIR}/src/box)
include_directories(${EXTRA_CORE_INCLUDE_DIRS})
Expand Down Expand Up @@ -281,6 +282,7 @@ set(box_sources
decimal.c
read_view.c
mp_box_ctx.c
arrow_ipc.c
${sql_sources}
${lua_sources}
lua/init.c
Expand Down
166 changes: 166 additions & 0 deletions src/box/arrow_ipc.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* SPDX-License-Identifier: BSD-2-Clause
*
* Copyright 2010-2024, Tarantool AUTHORS, please see AUTHORS file.
*/
#include "arrow_ipc.h"

#include "diag.h"
#include "error.h"
#include "small/region.h"
#include "nanoarrow/nanoarrow_ipc.h"

int
arrow_ipc_encode(struct ArrowArray *array, struct ArrowSchema *schema,
struct region *region, const char **ret_data,
const char **ret_data_end)
{
ArrowErrorCode rc;
struct ArrowError error;
struct ArrowBuffer buffer;
ArrowBufferInit(&buffer);

struct ArrowArrayView array_view;
rc = ArrowArrayViewInitFromSchema(&array_view, schema, &error);
if (rc != NANOARROW_OK) {
diag_set(ClientError, ER_ARROW_IPC_ENCODE,
"ArrowArrayViewInitFromSchema", error.message);
return -1;
}

/* Set buffer sizes and data pointers from an array. */
rc = ArrowArrayViewSetArray(&array_view, array, &error);
if (rc != NANOARROW_OK) {
diag_set(ClientError, ER_ARROW_IPC_ENCODE,
"ArrowArrayViewSetArray", error.message);
goto error1;
}

/* All bytes written to the stream will be appended to the buffer. */
struct ArrowIpcOutputStream stream;
rc = ArrowIpcOutputStreamInitBuffer(&stream, &buffer);
if (rc != NANOARROW_OK) {
diag_set(ClientError, ER_ARROW_IPC_ENCODE,
"ArrowIpcOutputStreamInitBuffer", NULL);
goto error1;
}

/*
* A stream writer which encodes schema and array into an IPC byte
* stream. The writer takes ownership of the output byte stream.
*/
struct ArrowIpcWriter writer;
rc = ArrowIpcWriterInit(&writer, &stream);
if (rc != NANOARROW_OK) {
diag_set(ClientError, ER_ARROW_IPC_ENCODE, "ArrowIpcWriterInit",
NULL);
stream.release(&stream);
goto error1;
}

rc = ArrowIpcWriterWriteSchema(&writer, schema, &error);
if (rc != NANOARROW_OK) {
diag_set(ClientError, ER_ARROW_IPC_ENCODE,
"ArrowIpcWriterWriteSchema", error.message);
goto error2;
}

rc = ArrowIpcWriterWriteArrayView(&writer, &array_view, &error);
if (rc != NANOARROW_OK) {
diag_set(ClientError, ER_ARROW_IPC_ENCODE,
"ArrowIpcWriterWriteArrayView", error.message);
goto error2;
}

/*
* TODO: It is possible to avoid extra `memcpy()' by switching
* `ArrowBuffer' to `region_realloc()'.
*/
char *data = xregion_alloc(region, buffer.size_bytes);
memcpy(data, buffer.data, buffer.size_bytes);
*ret_data = data;
*ret_data_end = data + buffer.size_bytes;

ArrowIpcWriterReset(&writer);
ArrowArrayViewReset(&array_view);
ArrowBufferReset(&buffer);
return 0;
error2:
ArrowIpcWriterReset(&writer);
error1:
ArrowArrayViewReset(&array_view);
ArrowBufferReset(&buffer);
return -1;
}

int
arrow_ipc_decode(struct ArrowArray *array, struct ArrowSchema *schema,
const char *data, const char *data_end)
{
ssize_t size = data_end - data;
if (size <= 0) {
diag_set(ClientError, ER_ARROW_IPC_DECODE, NULL,
"Unexpected data size");
return -1;
}

ArrowErrorCode rc;
struct ArrowError error;
struct ArrowBuffer buffer;
ArrowBufferInit(&buffer);

rc = ArrowBufferAppend(&buffer, data, size);
if (rc != NANOARROW_OK) {
diag_set(ClientError, ER_ARROW_IPC_DECODE, "ArrowBufferAppend",
NULL);
ArrowBufferReset(&buffer);
return -1;
}

/*
* Create an input stream from a buffer.
* The stream takes ownership of the buffer and reads bytes from it.
*/
struct ArrowIpcInputStream input_stream;
rc = ArrowIpcInputStreamInitBuffer(&input_stream, &buffer);
if (rc != NANOARROW_OK) {
diag_set(ClientError, ER_ARROW_IPC_DECODE,
"ArrowIpcInputStreamInitBuffer", NULL);
ArrowBufferReset(&buffer);
return -1;
}

/*
* Initialize an array stream from an input stream of bytes.
* The array_stream takes ownership of input_stream.
*/
struct ArrowArrayStream array_stream;
rc = ArrowIpcArrayStreamReaderInit(&array_stream, &input_stream, NULL);
if (rc != NANOARROW_OK) {
diag_set(ClientError, ER_ARROW_IPC_DECODE,
"ArrowIpcArrayStreamReaderInit", NULL);
input_stream.release(&input_stream);
return -1;
}

rc = ArrowArrayStreamGetSchema(&array_stream, schema, &error);
if (rc != NANOARROW_OK) {
diag_set(ClientError, ER_ARROW_IPC_DECODE,
"ArrowArrayStreamGetSchema", error.message);
goto error;
}

rc = ArrowArrayStreamGetNext(&array_stream, array, &error);
if (rc != NANOARROW_OK) {
diag_set(ClientError, ER_ARROW_IPC_DECODE,
"ArrowArrayStreamGetNext", error.message);
schema->release(schema);
goto error;
}

ArrowArrayStreamRelease(&array_stream);
return 0;
error:
ArrowArrayStreamRelease(&array_stream);
return -1;
}
36 changes: 36 additions & 0 deletions src/box/arrow_ipc.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: BSD-2-Clause
*
* Copyright 2010-2024, Tarantool AUTHORS, please see AUTHORS file.
*/
#pragma once

#include "arrow/abi.h"

#if defined(__cplusplus)
extern "C" {
#endif /* defined(__cplusplus) */

struct region;

/**
* Encodes `array' and `schema' into Arrow IPC format. The memory is allocated
* on `region', and the address is returned via `ret_data' and `ret_data_end'.
* Returns 0 on success, -1 on failure (diag is set).
*/
int
arrow_ipc_encode(struct ArrowArray *array, struct ArrowSchema *schema,
struct region *region, const char **ret_data,
const char **ret_data_end);

/**
* Decodes `array' and `schema' from the `data' in Arrow IPC format.
* Returns 0 on success, -1 on failure (diag is set).
*/
int
arrow_ipc_decode(struct ArrowArray *array, struct ArrowSchema *schema,
const char *data, const char *data_end);

#if defined(__cplusplus)
} /* extern "C" */
#endif /* defined(__cplusplus) */
1 change: 1 addition & 0 deletions src/box/blackhole.c
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ static const struct space_vtab blackhole_space_vtab = {
/* .execute_delete = */ blackhole_space_execute_delete,
/* .execute_update = */ blackhole_space_execute_update,
/* .execute_upsert = */ blackhole_space_execute_upsert,
/* .execute_insert_arrow = */ generic_space_execute_insert_arrow,
/* .ephemeral_replace = */ generic_space_ephemeral_replace,
/* .ephemeral_delete = */ generic_space_ephemeral_delete,
/* .ephemeral_rowid_next = */ generic_space_ephemeral_rowid_next,
Expand Down
13 changes: 13 additions & 0 deletions src/box/box.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4011,6 +4011,19 @@ box_upsert(uint32_t space_id, uint32_t index_id, const char *tuple,
return box_process1(&request, result);
}

API_EXPORT int
box_insert_arrow(uint32_t space_id, struct ArrowArray *array,
struct ArrowSchema *schema)
{
struct request request;
memset(&request, 0, sizeof(request));
request.type = IPROTO_INSERT_ARROW;
request.space_id = space_id;
request.arrow_array = array;
request.arrow_schema = schema;
return box_process1(&request, NULL);
}

/**
* Trigger space truncation by bumping a counter
* in _truncate space.
Expand Down
Loading
Loading