Skip to content

Commit bc3fc36

Browse files
committed
DX-64328 Array types for Gandiva (apache#58)
Add List input and output types for Gandiva functions. Add new reference implementations for array_contains and array_remove, tested via integration with Dremio. int32, int64, double and float list types have been tested. Support List types in function specification and llvm code generation. Pass back function type information through the expression registry. See 1p here: https://docs.google.com/document/d/1exwXdUUnk5FqZLzVZyTdhqgwxTk0u9bL54aLVNM5Tas/edit
1 parent 21a45e1 commit bc3fc36

40 files changed

+2282
-109
lines changed

cpp/src/arrow/buffer.h

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -521,10 +521,27 @@ class ARROW_EXPORT ResizableBuffer : public MutableBuffer {
521521
return Reserve(sizeof(T) * new_nb_elements);
522522
}
523523

524+
public:
525+
uint8_t* offsetBuffer;
526+
int64_t offsetCapacity;
527+
uint8_t* validityBuffer;
528+
uint8_t* outerValidityBuffer;
529+
524530
protected:
525-
ResizableBuffer(uint8_t* data, int64_t size) : MutableBuffer(data, size) {}
531+
ResizableBuffer(uint8_t* data, int64_t size) : MutableBuffer(data, size) {
532+
offsetBuffer = nullptr;
533+
offsetCapacity = 0;
534+
validityBuffer = nullptr;
535+
outerValidityBuffer = nullptr;
536+
537+
}
526538
ResizableBuffer(uint8_t* data, int64_t size, std::shared_ptr<MemoryManager> mm)
527-
: MutableBuffer(data, size, std::move(mm)) {}
539+
: MutableBuffer(data, size, std::move(mm)) {
540+
offsetBuffer = nullptr;
541+
offsetCapacity = 0;
542+
validityBuffer = nullptr;
543+
outerValidityBuffer = nullptr;
544+
}
528545
};
529546

530547
/// \defgroup buffer-allocation-functions Functions for allocating buffers

cpp/src/gandiva/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ set_source_files_properties(${GANDIVA_PRECOMPILED_CC_PATH} PROPERTIES GENERATED
4545

4646
set(SRC_FILES
4747
annotator.cc
48+
array_ops.cc
4849
bitmap_accumulator.cc
4950
cache.cc
5051
cast_time.cc
@@ -68,6 +69,7 @@ set(SRC_FILES
6869
function_ir_builder.cc
6970
function_registry.cc
7071
function_registry_arithmetic.cc
72+
function_registry_array.cc
7173
function_registry_datetime.cc
7274
function_registry_hash.cc
7375
function_registry_math_ops.cc
@@ -244,6 +246,7 @@ endfunction()
244246

245247
add_gandiva_test(internals-test
246248
SOURCES
249+
array_ops_test.cc
247250
bitmap_accumulator_test.cc
248251
cache_test.cc
249252
engine_llvm_test.cc

cpp/src/gandiva/annotator.cc

Lines changed: 78 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,27 @@ FieldDescriptorPtr Annotator::MakeDesc(FieldPtr field, bool is_output) {
4646
int data_idx = buffer_count_++;
4747
int validity_idx = buffer_count_++;
4848
int offsets_idx = FieldDescriptor::kInvalidIdx;
49+
int child_offsets_idx = FieldDescriptor::kInvalidIdx;
4950
if (arrow::is_binary_like(field->type()->id())) {
5051
offsets_idx = buffer_count_++;
5152
}
53+
54+
if (field->type()->id() == arrow::Type::LIST) {
55+
offsets_idx = buffer_count_++;
56+
if (arrow::is_binary_like(field->type()->field(0)->type()->id())) {
57+
child_offsets_idx = buffer_count_++;
58+
}
59+
}
5260
int data_buffer_ptr_idx = FieldDescriptor::kInvalidIdx;
5361
if (is_output) {
5462
data_buffer_ptr_idx = buffer_count_++;
5563
}
64+
int child_valid_buffer_ptr_idx = FieldDescriptor::kInvalidIdx;
65+
if (field->type()->id() == arrow::Type::LIST) {
66+
child_valid_buffer_ptr_idx = buffer_count_++;
67+
}
5668
return std::make_shared<FieldDescriptor>(field, data_idx, validity_idx, offsets_idx,
57-
data_buffer_ptr_idx);
69+
data_buffer_ptr_idx, child_offsets_idx, child_valid_buffer_ptr_idx);
5870
}
5971

6072
int Annotator::AddHolderPointer(void* holder) {
@@ -80,17 +92,76 @@ void Annotator::PrepareBuffersForField(const FieldDescriptor& desc,
8092
if (desc.HasOffsetsIdx()) {
8193
uint8_t* offsets_buf = const_cast<uint8_t*>(array_data.buffers[buffer_idx]->data());
8294
eval_batch->SetBuffer(desc.offsets_idx(), offsets_buf, array_data.offset);
83-
++buffer_idx;
95+
96+
if (desc.HasChildOffsetsIdx()) {
97+
if (is_output) {
98+
// if list field is output field, we should put buffer pointer into eval batch
99+
// for resizing
100+
uint8_t* child_offsets_buf = reinterpret_cast<uint8_t*>(
101+
array_data.child_data.at(0)->buffers[buffer_idx].get());
102+
eval_batch->SetBuffer(desc.child_data_offsets_idx(), child_offsets_buf,
103+
array_data.child_data.at(0)->offset);
104+
105+
uint8_t* child_valid_buf = reinterpret_cast<uint8_t*>(
106+
array_data.child_data.at(0)->buffers[0].get());
107+
eval_batch->SetBuffer(desc.child_data_validity_idx(), child_valid_buf,
108+
array_data.child_data.at(0)->offset);
109+
110+
} else {
111+
// if list field is input field, just put buffer data into eval batch
112+
uint8_t* child_offsets_buf = const_cast<uint8_t*>(
113+
array_data.child_data.at(0)->buffers[buffer_idx]->data());
114+
eval_batch->SetBuffer(desc.child_data_offsets_idx(), child_offsets_buf,
115+
array_data.child_data.at(0)->offset);
116+
117+
uint8_t* child_valid_buf = const_cast<uint8_t*>(
118+
array_data.child_data.at(0)->buffers[0]->data());
119+
eval_batch->SetBuffer(desc.child_data_offsets_idx(), child_valid_buf,
120+
array_data.child_data.at(0)->offset);
121+
}
122+
}
123+
if (array_data.type->id() != arrow::Type::LIST ||
124+
arrow::is_binary_like(array_data.type->field(0)->type()->id())) {
125+
// primitive type list data buffer index is 1
126+
// binary like type list data buffer index is 2
127+
++buffer_idx;
128+
}
129+
}
130+
131+
int const childDataIndex = 0;
132+
if (array_data.type->id() != arrow::Type::LIST) {
133+
uint8_t* data_buf = const_cast<uint8_t*>(array_data.buffers[buffer_idx]->data());
134+
eval_batch->SetBuffer(desc.data_idx(), data_buf, array_data.offset);
135+
} else {
136+
uint8_t* data_buf =
137+
const_cast<uint8_t*>(array_data.child_data.at(childDataIndex)->buffers[buffer_idx]->data());
138+
eval_batch->SetBuffer(desc.data_idx(), data_buf, array_data.child_data.at(0)->offset);
139+
140+
int const childDataBufferIndex = 0;
141+
if (array_data.child_data.at(childDataIndex)->buffers[childDataBufferIndex] ) {
142+
uint8_t* child_valid_buf = const_cast<uint8_t*>(
143+
array_data.child_data.at(childDataIndex)->buffers[childDataBufferIndex]->data());
144+
eval_batch->SetBuffer(desc.child_data_validity_idx(), child_valid_buf, 0);
145+
}
146+
84147
}
85148

86-
uint8_t* data_buf = const_cast<uint8_t*>(array_data.buffers[buffer_idx]->data());
87-
eval_batch->SetBuffer(desc.data_idx(), data_buf, array_data.offset);
88149
if (is_output) {
89150
// pass in the Buffer object for output data buffers. Can be used for resizing.
90-
uint8_t* data_buf_ptr =
91-
reinterpret_cast<uint8_t*>(array_data.buffers[buffer_idx].get());
92-
eval_batch->SetBuffer(desc.data_buffer_ptr_idx(), data_buf_ptr, array_data.offset);
151+
152+
if (array_data.type->id() != arrow::Type::LIST) {
153+
uint8_t* data_buf_ptr =
154+
reinterpret_cast<uint8_t*>(array_data.buffers[buffer_idx].get());
155+
eval_batch->SetBuffer(desc.data_buffer_ptr_idx(), data_buf_ptr, array_data.offset);
156+
} else {
157+
// list data buffer is in child data buffer
158+
uint8_t* data_buf_ptr = reinterpret_cast<uint8_t*>(
159+
array_data.child_data.at(0)->buffers[buffer_idx].get());
160+
eval_batch->SetBuffer(desc.data_buffer_ptr_idx(), data_buf_ptr,
161+
array_data.child_data.at(0)->offset);
162+
}
93163
}
164+
94165
}
95166

96167
EvalBatchPtr Annotator::PrepareEvalBatch(const arrow::RecordBatch& record_batch,
@@ -106,7 +177,6 @@ EvalBatchPtr Annotator::PrepareEvalBatch(const arrow::RecordBatch& record_batch,
106177
// skip columns not involved in the expression.
107178
continue;
108179
}
109-
110180
PrepareBuffersForField(*(found->second), *(record_batch.column_data(i)),
111181
eval_batch.get(), false /*is_output*/);
112182
}

0 commit comments

Comments
 (0)