Skip to content

Commit 333dfd7

Browse files
committed
box: introduce box_insert_arrow()
The new method inserts into a given space the data, provided in Arrow columnar format [1]. At the moment it is not supported by memtx and vinyl spaces. 1. https://arrow.apache.org/docs/format/Columnar.html NO_DOC=Will be added along with the implementation for memtx NO_CHANGELOG=See NO_DOC
1 parent 3e94c27 commit 333dfd7

File tree

7 files changed

+94
-2
lines changed

7 files changed

+94
-2
lines changed

extra/exports

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ box_index_tuple_position
7373
box_info_lsn
7474
box_init_latest_dd_version_id
7575
box_insert
76+
box_insert_arrow
7677
box_iproto_override
7778
box_iproto_send
7879
box_is_ro

src/box/box.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4011,6 +4011,19 @@ box_upsert(uint32_t space_id, uint32_t index_id, const char *tuple,
40114011
return box_process1(&request, result);
40124012
}
40134013

4014+
API_EXPORT int
4015+
box_insert_arrow(uint32_t space_id, struct ArrowArray *array,
4016+
struct ArrowSchema *schema)
4017+
{
4018+
struct request request;
4019+
memset(&request, 0, sizeof(request));
4020+
request.type = IPROTO_INSERT_ARROW;
4021+
request.space_id = space_id;
4022+
request.arrow_array = array;
4023+
request.arrow_schema = schema;
4024+
return box_process1(&request, NULL);
4025+
}
4026+
40144027
/**
40154028
* Trigger space truncation by bumping a counter
40164029
* in _truncate space.

src/box/box.h

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -666,6 +666,31 @@ box_upsert(uint32_t space_id, uint32_t index_id, const char *tuple,
666666
const char *tuple_end, const char *ops, const char *ops_end,
667667
int index_base, box_tuple_t **result);
668668

669+
struct ArrowArray;
670+
struct ArrowSchema;
671+
672+
/**
673+
* Executes a batch insert request.
674+
*
675+
* A record batch from the Arrow `array` is inserted into the space columns,
676+
* whose names are provided by the Arrow `schema`. Column types in the schema
677+
* must match the types of the corresponding fields in the space format.
678+
*
679+
* If a column is nullable in space format, it can be omitted. All non-nullable
680+
* columns (including primary key parts) must be present in the batch.
681+
*
682+
* This function does not release neither `array` nor `schema`.
683+
*
684+
* \param space_id space identifier
685+
* \param array input data in ArrowArray format
686+
* \param schema definition of the input data in ArrowSchema format
687+
* \retval 0 on success
688+
* \retval -1 on error (check box_error_last())
689+
*/
690+
API_EXPORT int
691+
box_insert_arrow(uint32_t space_id, struct ArrowArray *array,
692+
struct ArrowSchema *schema);
693+
669694
/**
670695
* Truncate space.
671696
*

src/box/space.c

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1344,13 +1344,34 @@ after_old_tuple_lookup:;
13441344

13451345
/**
13461346
* Executes an IPROTO_INSERT_ARROW request. The request contains the data in
1347-
* Arrow columnar format.
1347+
* Arrow columnar format that can appear in two guises:
1348+
* 1. in-memory data structures (arrow_array and arrow_schema);
1349+
* 2. serialized for interprocess communication (arrow_ipc and arrow_ipc_end).
13481350
* Can return nonzero in case of error (diag is set).
13491351
*/
13501352
static int
13511353
space_execute_insert_arrow(struct space *space, struct txn *txn,
13521354
struct request *request)
13531355
{
1356+
if (request->arrow_array != NULL) {
1357+
assert(request->arrow_schema != NULL);
1358+
assert(request->arrow_ipc == NULL);
1359+
assert(request->arrow_ipc_end == NULL);
1360+
struct ArrowArray *array = request->arrow_array;
1361+
struct ArrowSchema *schema = request->arrow_schema;
1362+
1363+
if (space->vtab->execute_insert_arrow(
1364+
space, txn, array, schema) != 0)
1365+
return -1;
1366+
struct region *txn_region = tx_region_acquire(txn);
1367+
int rc = arrow_ipc_encode(
1368+
array, schema, txn_region, &request->arrow_ipc,
1369+
&request->arrow_ipc_end);
1370+
tx_region_release(txn, TX_ALLOC_SYSTEM);
1371+
return rc;
1372+
}
1373+
1374+
assert(request->arrow_schema == NULL);
13541375
assert(request->arrow_ipc != NULL);
13551376
assert(request->arrow_ipc_end != NULL);
13561377
struct ArrowArray array;

src/box/xrow.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,10 @@ struct request {
211211
/** Insert/replace/upsert tuple or proc argument or update operations. */
212212
const char *tuple;
213213
const char *tuple_end;
214+
/** The data in in-memory Arrow format. */
215+
struct ArrowArray *arrow_array;
216+
/** Arrow schema for @arrow_array. */
217+
struct ArrowSchema *arrow_schema;
214218
/** The data in serialized Arrow format. */
215219
const char *arrow_ipc;
216220
/** End of @arrow_ipc. */

test/app-tap/module_api.c

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
#include <lua.h>
1616
#include <lauxlib.h>
1717

18+
#include "arrow/abi.h"
19+
1820
#define STR2(x) #x
1921
#define STR(x) STR2(x)
2022

@@ -3362,6 +3364,24 @@ test_fiber_basic_api(lua_State *L)
33623364
return 1;
33633365
}
33643366

3367+
static int
3368+
test_box_insert_arrow(struct lua_State *L)
3369+
{
3370+
fail_unless(lua_gettop(L) == 1);
3371+
fail_unless(lua_isnumber(L, 1));
3372+
uint32_t space_id = lua_tointeger(L, 1);
3373+
struct ArrowSchema schema;
3374+
struct ArrowArray array;
3375+
memset(&schema, 0, sizeof(schema));
3376+
memset(&array, 0, sizeof(array));
3377+
3378+
int rc = box_insert_arrow(space_id, &array, &schema);
3379+
fail_unless(rc == -1);
3380+
check_diag("ClientError", "memtx does not support arrow format");
3381+
lua_pushboolean(L, 1);
3382+
return 1;
3383+
}
3384+
33653385
LUA_API int
33663386
luaopen_module_api(lua_State *L)
33673387
{
@@ -3420,6 +3440,7 @@ luaopen_module_api(lua_State *L)
34203440
{"box_iproto_send", test_box_iproto_send},
34213441
{"box_iproto_override_set", test_box_iproto_override_set},
34223442
{"box_iproto_override_reset", test_box_iproto_override_reset},
3443+
{"box_insert_arrow", test_box_insert_arrow},
34233444
{NULL, NULL}
34243445
};
34253446
luaL_register(L, "module_api", lib);

test/app-tap/module_api.test.lua

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -670,8 +670,14 @@ local function test_box_ibuf(test, module)
670670
test:ok(module.box_ibuf(require('buffer').ibuf()), "box_ibuf API")
671671
end
672672

673+
local function test_box_insert_arrow(test, module)
674+
test:plan(1)
675+
test:ok(module.box_insert_arrow(box.space.test.id),
676+
"box_insert_arrow API")
677+
end
678+
673679
require('tap').test("module_api", function(test)
674-
test:plan(51)
680+
test:plan(52)
675681
local status, module = pcall(require, 'module_api')
676682
test:is(status, true, "module")
677683
test:ok(status, "module is loaded")
@@ -709,6 +715,7 @@ require('tap').test("module_api", function(test)
709715
test:test("box_iproto_send", test_box_iproto_send, module)
710716
test:test("box_iproto_override", test_box_iproto_override, module)
711717
test:test("box_ibuf", test_box_ibuf, module)
718+
test:test("box_insert_arrow", test_box_insert_arrow, module)
712719

713720
space:drop()
714721
end)

0 commit comments

Comments
 (0)