Skip to content

Commit 0068b6a

Browse files
author
Pindikura Ravindra
committed
ARROW-3459: [C++][Gandiva] support for string o/p
- If the output vectors aren't provided, allow resizable data buffers. - If the output vectors are provided, assert that the data buffer is resizeable. - use a cpp function to write to string-like o/p buffers, this checks for capacity and updates the offset vector.
1 parent dff73a4 commit 0068b6a

File tree

14 files changed

+314
-50
lines changed

14 files changed

+314
-50
lines changed

cpp/src/gandiva/annotator.cc

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,30 +31,35 @@ FieldDescriptorPtr Annotator::CheckAndAddInputFieldDescriptor(FieldPtr field) {
3131
return found->second;
3232
}
3333

34-
auto desc = MakeDesc(field);
34+
auto desc = MakeDesc(field, false /*is_output*/);
3535
in_name_to_desc_[field->name()] = desc;
3636
return desc;
3737
}
3838

3939
FieldDescriptorPtr Annotator::AddOutputFieldDescriptor(FieldPtr field) {
40-
auto desc = MakeDesc(field);
40+
auto desc = MakeDesc(field, true /*is_output*/);
4141
out_descs_.push_back(desc);
4242
return desc;
4343
}
4444

45-
FieldDescriptorPtr Annotator::MakeDesc(FieldPtr field) {
45+
FieldDescriptorPtr Annotator::MakeDesc(FieldPtr field, bool is_output) {
4646
int data_idx = buffer_count_++;
4747
int validity_idx = buffer_count_++;
4848
int offsets_idx = FieldDescriptor::kInvalidIdx;
4949
if (arrow::is_binary_like(field->type()->id())) {
5050
offsets_idx = buffer_count_++;
5151
}
52-
return std::make_shared<FieldDescriptor>(field, data_idx, validity_idx, offsets_idx);
52+
int data_buffer_ptr_idx = FieldDescriptor::kInvalidIdx;
53+
if (is_output) {
54+
data_buffer_ptr_idx = buffer_count_++;
55+
}
56+
return std::make_shared<FieldDescriptor>(field, data_idx, validity_idx, offsets_idx,
57+
data_buffer_ptr_idx);
5358
}
5459

5560
void Annotator::PrepareBuffersForField(const FieldDescriptor& desc,
5661
const arrow::ArrayData& array_data,
57-
EvalBatch* eval_batch) {
62+
EvalBatch* eval_batch, bool is_output) {
5863
int buffer_idx = 0;
5964

6065
// The validity buffer is optional. Use nullptr if it does not have one.
@@ -74,7 +79,12 @@ void Annotator::PrepareBuffersForField(const FieldDescriptor& desc,
7479

7580
uint8_t* data_buf = const_cast<uint8_t*>(array_data.buffers[buffer_idx]->data());
7681
eval_batch->SetBuffer(desc.data_idx(), data_buf);
77-
++buffer_idx;
82+
if (is_output) {
83+
// pass in the Buffer object for output data buffers. Can be used for resizing.
84+
uint8_t* data_buf_ptr =
85+
reinterpret_cast<uint8_t*>(array_data.buffers[buffer_idx].get());
86+
eval_batch->SetBuffer(desc.data_buffer_ptr_idx(), data_buf_ptr);
87+
}
7888
}
7989

8090
EvalBatchPtr Annotator::PrepareEvalBatch(const arrow::RecordBatch& record_batch,
@@ -92,14 +102,14 @@ EvalBatchPtr Annotator::PrepareEvalBatch(const arrow::RecordBatch& record_batch,
92102
}
93103

94104
PrepareBuffersForField(*(found->second), *(record_batch.column(i))->data(),
95-
eval_batch.get());
105+
eval_batch.get(), false /*is_output*/);
96106
}
97107

98108
// Fill in the entries for the output fields.
99109
int idx = 0;
100110
for (auto& arraydata : out_vector) {
101111
const FieldDescriptorPtr& desc = out_descs_.at(idx);
102-
PrepareBuffersForField(*desc, *arraydata, eval_batch.get());
112+
PrepareBuffersForField(*desc, *arraydata, eval_batch.get(), true /*is_output*/);
103113
++idx;
104114
}
105115
return eval_batch;

cpp/src/gandiva/annotator.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,13 @@ class GANDIVA_EXPORT Annotator {
5454

5555
private:
5656
/// Annotate a field and return the descriptor.
57-
FieldDescriptorPtr MakeDesc(FieldPtr field);
57+
FieldDescriptorPtr MakeDesc(FieldPtr field, bool is_output);
5858

5959
/// Populate eval_batch by extracting the raw buffers from the arrow array, whose
6060
/// contents are represent by the annotated descriptor 'desc'.
6161
void PrepareBuffersForField(const FieldDescriptor& desc,
62-
const arrow::ArrayData& array_data, EvalBatch* eval_batch);
62+
const arrow::ArrayData& array_data, EvalBatch* eval_batch,
63+
bool is_output);
6364

6465
/// The list of input/output buffers (includes bitmap buffers, value buffers and
6566
/// offset buffers).

cpp/src/gandiva/annotator_test.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ TEST_F(TestAnnotator, TestAdd) {
7373
EXPECT_EQ(desc_sum->field(), field_sum);
7474
EXPECT_EQ(desc_sum->data_idx(), 4);
7575
EXPECT_EQ(desc_sum->validity_idx(), 5);
76+
EXPECT_EQ(desc_sum->data_buffer_ptr_idx(), 6);
7677

7778
// prepare record batch
7879
int num_records = 100;
@@ -85,7 +86,7 @@ TEST_F(TestAnnotator, TestAdd) {
8586

8687
auto arrow_sum = MakeInt32Array(num_records);
8788
EvalBatchPtr batch = annotator.PrepareEvalBatch(*record_batch, {arrow_sum->data()});
88-
EXPECT_EQ(batch->GetNumBuffers(), 6);
89+
EXPECT_EQ(batch->GetNumBuffers(), 7);
8990

9091
auto buffers = batch->GetBufferArray();
9192
EXPECT_EQ(buffers[desc_a->validity_idx()], arrow_v0->data()->buffers.at(0)->data());
@@ -94,6 +95,8 @@ TEST_F(TestAnnotator, TestAdd) {
9495
EXPECT_EQ(buffers[desc_b->data_idx()], arrow_v1->data()->buffers.at(1)->data());
9596
EXPECT_EQ(buffers[desc_sum->validity_idx()], arrow_sum->data()->buffers.at(0)->data());
9697
EXPECT_EQ(buffers[desc_sum->data_idx()], arrow_sum->data()->buffers.at(1)->data());
98+
EXPECT_EQ(buffers[desc_sum->data_buffer_ptr_idx()],
99+
reinterpret_cast<uint8_t*>(arrow_sum->data()->buffers.at(1).get()));
97100

98101
auto bitmaps = batch->GetLocalBitMapArray();
99102
EXPECT_EQ(bitmaps, nullptr);

cpp/src/gandiva/expr_validator.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,12 @@ Status ExprValidator::Visit(const IfNode& node) {
8989
auto then_node_ret_type = node.then_node()->return_type();
9090
auto else_node_ret_type = node.else_node()->return_type();
9191

92+
// condition must be of boolean type.
93+
ARROW_RETURN_IF(
94+
!node.condition()->return_type()->Equals(arrow::boolean()),
95+
Status::ExpressionValidationError("condition must be of boolean type, found type ",
96+
node.condition()->return_type()->ToString()));
97+
9298
// Then-branch return type must match.
9399
ARROW_RETURN_IF(!if_node_ret_type->Equals(*then_node_ret_type),
94100
Status::ExpressionValidationError(

cpp/src/gandiva/field_descriptor.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,12 @@ class FieldDescriptor {
3131
static const int kInvalidIdx = -1;
3232

3333
FieldDescriptor(FieldPtr field, int data_idx, int validity_idx = kInvalidIdx,
34-
int offsets_idx = kInvalidIdx)
34+
int offsets_idx = kInvalidIdx, int data_buffer_ptr_idx = kInvalidIdx)
3535
: field_(field),
3636
data_idx_(data_idx),
3737
validity_idx_(validity_idx),
38-
offsets_idx_(offsets_idx) {}
38+
offsets_idx_(offsets_idx),
39+
data_buffer_ptr_idx_(data_buffer_ptr_idx) {}
3940

4041
/// Index of validity array in the array-of-buffers
4142
int validity_idx() const { return validity_idx_; }
@@ -46,18 +47,24 @@ class FieldDescriptor {
4647
/// Index of offsets array in the array-of-buffers
4748
int offsets_idx() const { return offsets_idx_; }
4849

50+
/// Index of data buffer pointer in the array-of-buffers
51+
int data_buffer_ptr_idx() const { return data_buffer_ptr_idx_; }
52+
4953
FieldPtr field() const { return field_; }
5054

5155
const std::string& Name() const { return field_->name(); }
5256
DataTypePtr Type() const { return field_->type(); }
5357

5458
bool HasOffsetsIdx() const { return offsets_idx_ != kInvalidIdx; }
5559

60+
bool HasDataBufferPtrIdx() const { return data_buffer_ptr_idx_ != kInvalidIdx; }
61+
5662
private:
5763
FieldPtr field_;
5864
int data_idx_;
5965
int validity_idx_;
6066
int offsets_idx_;
67+
int data_buffer_ptr_idx_;
6168
};
6269

6370
} // namespace gandiva

cpp/src/gandiva/gdv_function_stubs.cc

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,31 @@ bool gdv_fn_in_expr_lookup_utf8(int64_t ptr, const char* data, int data_len,
7272
reinterpret_cast<gandiva::InHolder<std::string>*>(ptr);
7373
return holder->HasValue(std::string(data, data_len));
7474
}
75+
76+
int32_t gdv_fn_populate_varlen_vector(int64_t context_ptr, int8_t* data_ptr,
77+
int32_t* offsets, int64_t slot,
78+
const char* entry_buf, int32_t entry_len) {
79+
auto buffer = reinterpret_cast<arrow::ResizableBuffer*>(data_ptr);
80+
int32_t offset = static_cast<int32_t>(buffer->size());
81+
82+
// This also sets the size in the buffer.
83+
auto status = buffer->Resize(offset + entry_len, false /*shrink*/);
84+
if (!status.ok()) {
85+
gandiva::ExecutionContext* context =
86+
reinterpret_cast<gandiva::ExecutionContext*>(context_ptr);
87+
88+
context->set_error_msg(status.message().c_str());
89+
return -1;
90+
}
91+
92+
// append the new entry.
93+
memcpy(buffer->mutable_data() + offset, entry_buf, entry_len);
94+
95+
// update offsets buffer.
96+
offsets[slot] = offset;
97+
offsets[slot + 1] = offset + entry_len;
98+
return 0;
99+
}
75100
}
76101

77102
namespace gandiva {
@@ -135,6 +160,18 @@ void ExportedStubFunctions::AddMappings(Engine* engine) const {
135160
engine->AddGlobalMappingForFunc("gdv_fn_in_expr_lookup_utf8",
136161
types->i1_type() /*return_type*/, args,
137162
reinterpret_cast<void*>(gdv_fn_in_expr_lookup_utf8));
163+
164+
// gdv_fn_populate_varlen_vector
165+
args = {types->i64_type(), // int64_t execution_context
166+
types->i8_ptr_type(), // int8_t* data ptr
167+
types->i32_ptr_type(), // int32_t* offsets ptr
168+
types->i64_type(), // int64_t slot
169+
types->i8_ptr_type(), // const char* entry_buf
170+
types->i32_type()}; // int32_t entry__len
171+
172+
engine->AddGlobalMappingForFunc("gdv_fn_populate_varlen_vector",
173+
types->i32_type() /*return_type*/, args,
174+
reinterpret_cast<void*>(gdv_fn_populate_varlen_vector));
138175
}
139176

140177
} // namespace gandiva

cpp/src/gandiva/jni/jni_common.cc

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -632,6 +632,32 @@ JNIEXPORT jlong JNICALL Java_org_apache_arrow_gandiva_evaluator_JniWrapper_build
632632
return module_id;
633633
}
634634

635+
///
636+
/// \brief Resizable buffer which resizes by doing a callback into java.
637+
///
638+
class JavaResizableBuffer : public arrow::ResizableBuffer {
639+
public:
640+
JavaResizableBuffer(uint8_t* buffer, int32_t len) : ResizableBuffer(buffer, len) {
641+
size_ = 0;
642+
}
643+
644+
Status Resize(const int64_t new_size, bool shrink_to_fit) override {
645+
if (shrink_to_fit == true) {
646+
return Status::NotImplemented("shrink not implemented");
647+
} else if (new_size < capacity()) {
648+
size_ = new_size;
649+
return Status::OK();
650+
} else {
651+
// TODO: callback into java to re-alloc the buffer.
652+
return Status::NotImplemented("buffer expand not implemented");
653+
}
654+
}
655+
656+
Status Reserve(const int64_t new_capacity) override {
657+
return Status::NotImplemented("reserve not implemented");
658+
}
659+
};
660+
635661
#define CHECK_OUT_BUFFER_IDX_AND_BREAK(idx, len) \
636662
if (idx >= len) { \
637663
status = gandiva::Status::Invalid("insufficient number of out_buf_addrs"); \
@@ -710,20 +736,31 @@ Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector(
710736
int buf_idx = 0;
711737
int sz_idx = 0;
712738
for (FieldPtr field : ret_types) {
739+
std::vector<std::shared_ptr<arrow::Buffer>> buffers;
740+
713741
CHECK_OUT_BUFFER_IDX_AND_BREAK(buf_idx, out_bufs_len);
714742
uint8_t* validity_buf = reinterpret_cast<uint8_t*>(out_bufs[buf_idx++]);
715743
jlong bitmap_sz = out_sizes[sz_idx++];
716-
std::shared_ptr<arrow::MutableBuffer> bitmap_buf =
717-
std::make_shared<arrow::MutableBuffer>(validity_buf, bitmap_sz);
744+
buffers.push_back(std::make_shared<arrow::MutableBuffer>(validity_buf, bitmap_sz));
745+
746+
if (arrow::is_binary_like(field->type()->id())) {
747+
CHECK_OUT_BUFFER_IDX_AND_BREAK(buf_idx, out_bufs_len);
748+
uint8_t* offsets_buf = reinterpret_cast<uint8_t*>(out_bufs[buf_idx++]);
749+
jlong offsets_sz = out_sizes[sz_idx++];
750+
buffers.push_back(
751+
std::make_shared<arrow::MutableBuffer>(offsets_buf, offsets_sz));
752+
}
718753

719754
CHECK_OUT_BUFFER_IDX_AND_BREAK(buf_idx, out_bufs_len);
720755
uint8_t* value_buf = reinterpret_cast<uint8_t*>(out_bufs[buf_idx++]);
721756
jlong data_sz = out_sizes[sz_idx++];
722-
std::shared_ptr<arrow::MutableBuffer> data_buf =
723-
std::make_shared<arrow::MutableBuffer>(value_buf, data_sz);
757+
if (arrow::is_binary_like(field->type()->id())) {
758+
buffers.push_back(std::make_shared<JavaResizableBuffer>(value_buf, data_sz));
759+
} else {
760+
buffers.push_back(std::make_shared<arrow::MutableBuffer>(value_buf, data_sz));
761+
}
724762

725-
auto array_data =
726-
arrow::ArrayData::Make(field->type(), output_row_count, {bitmap_buf, data_buf});
763+
auto array_data = arrow::ArrayData::Make(field->type(), output_row_count, buffers);
727764
output.push_back(array_data);
728765
}
729766
status = holder->projector()->Evaluate(*in_batch, selection_vector.get(), output);

cpp/src/gandiva/llvm_generator.cc

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,14 @@ llvm::Value* LLVMGenerator::GetValidityReference(llvm::Value* arg_addrs, int idx
155155
return ir_builder()->CreateIntToPtr(load, types()->i64_ptr_type(), name + "_varray");
156156
}
157157

158+
/// Get reference to data array at specified index in the args list.
159+
llvm::Value* LLVMGenerator::GetDataBufferPtrReference(llvm::Value* arg_addrs, int idx,
160+
FieldPtr field) {
161+
const std::string& name = field->name();
162+
llvm::Value* load = LoadVectorAtIndex(arg_addrs, idx, name);
163+
return ir_builder()->CreateIntToPtr(load, types()->i8_ptr_type(), name + "_buf_ptr");
164+
}
165+
158166
/// Get reference to data array at specified index in the args list.
159167
llvm::Value* LLVMGenerator::GetDataReference(llvm::Value* arg_addrs, int idx,
160168
FieldPtr field) {
@@ -293,6 +301,10 @@ Status LLVMGenerator::CodeGenExprValue(DexPtr value_expr, FieldDescriptorPtr out
293301
builder->SetInsertPoint(loop_entry);
294302
llvm::Value* output_ref =
295303
GetDataReference(arg_addrs, output->data_idx(), output->field());
304+
llvm::Value* output_buffer_ptr_ref = GetDataBufferPtrReference(
305+
arg_addrs, output->data_buffer_ptr_idx(), output->field());
306+
llvm::Value* output_offset_ref =
307+
GetOffsetsReference(arg_addrs, output->offsets_idx(), output->field());
296308

297309
// Loop body
298310
builder->SetInsertPoint(loop_body);
@@ -323,13 +335,21 @@ Status LLVMGenerator::CodeGenExprValue(DexPtr value_expr, FieldDescriptorPtr out
323335

324336
// save the value in the output vector.
325337
builder->SetInsertPoint(loop_body_tail);
338+
326339
auto output_type_id = output->Type()->id();
327340
if (output_type_id == arrow::Type::BOOL) {
328341
SetPackedBitValue(output_ref, loop_var, output_value->data());
329342
} else if (arrow::is_primitive(output_type_id) ||
330343
output_type_id == arrow::Type::DECIMAL) {
331344
llvm::Value* slot_offset = builder->CreateGEP(output_ref, loop_var);
332345
builder->CreateStore(output_value->data(), slot_offset);
346+
} else if (arrow::is_binary_like(output_type_id)) {
347+
// Var-len output. Make a function call to populate the data.
348+
// if there is an error, the fn sets it in the context. And, will be returned at the
349+
// end of this row batch.
350+
AddFunctionCall("gdv_fn_populate_varlen_vector", types()->i32_type(),
351+
{arg_context_ptr, output_buffer_ptr_ref, output_offset_ref, loop_var,
352+
output_value->data(), output_value->length()});
333353
} else {
334354
return Status::NotImplemented("output type ", output->Type()->ToString(),
335355
" not supported");

cpp/src/gandiva/llvm_generator.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,9 @@ class GANDIVA_EXPORT LLVMGenerator {
180180
/// Generate code to load the vector at specified index and cast it as offsets array.
181181
llvm::Value* GetOffsetsReference(llvm::Value* arg_addrs, int idx, FieldPtr field);
182182

183+
/// Generate code to load the vector at specified index and cast it as buffer pointer.
184+
llvm::Value* GetDataBufferPtrReference(llvm::Value* arg_addrs, int idx, FieldPtr field);
185+
183186
/// Generate code for the value array of one expression.
184187
Status CodeGenExprValue(DexPtr value_expr, FieldDescriptorPtr output, int suffix_idx,
185188
llvm::Function** fn,

0 commit comments

Comments
 (0)