Skip to content

Commit 4fb5d3f

Browse files
committed
Add --use_zero_copy_filter flag, which allows PEMs to perform zero-copy filters but with signficant batching. This trades off memory allocations for incrased compute time (decreases data locality and batch efficiency)
Signed-off-by: Dom Del Nano <ddelnano@gmail.com>
1 parent 1f326fb commit 4fb5d3f

File tree

4 files changed

+73
-35
lines changed

4 files changed

+73
-35
lines changed

src/carnot/exec/filter_node.cc

Lines changed: 61 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
#include "src/shared/types/types.h"
4040
#include "src/shared/types/typespb/wrapper/types_pb_wrapper.h"
4141

42+
DEFINE_bool(use_zero_copy_filter, false, "Uses a non optimized, but zero-copy filter that can run on PEMs. If this setting is disabled, this assumes that filters must run on Kelvin");
43+
4244
namespace px {
4345
namespace carnot {
4446
namespace exec {
@@ -132,42 +134,66 @@ Status PredicateCopyValues<types::STRING>(const types::BoolValueColumnWrapper& p
132134
Status FilterNode::ConsumeNextImpl(ExecState* exec_state, const RowBatch& rb, size_t) {
133135
// Current implementation does not merge across row batches, we should
134136
// consider this for cases where the filter has really low selectivity.
135-
/* PX_ASSIGN_OR_RETURN(auto pred_col, evaluator_->EvaluateSingleExpression( */
136-
/* exec_state, rb, *plan_node_->expression())); */
137-
138-
/* // Verify that the type of the column is boolean. */
139-
/* DCHECK_EQ(pred_col->data_type(), types::BOOLEAN) << "Predicate expression must be a boolean"; */
140-
141-
/* const types::BoolValueColumnWrapper& pred_col_wrapper = */
142-
/* *static_cast<types::BoolValueColumnWrapper*>(pred_col.get()); */
143-
/* size_t num_pred = pred_col_wrapper.Size(); */
144-
145-
/* DCHECK_EQ(static_cast<size_t>(rb.num_rows()), num_pred); */
146-
147-
// Find out how many of them returned true;
148-
size_t num_output_records = rb.num_rows();
149-
/* for (size_t i = 0; i < num_pred; ++i) { */
150-
/* if (pred_col_wrapper[i].val) { */
151-
/* ++num_output_records; */
152-
/* } */
153-
/* } */
154-
155-
RowBatch output_rb(*output_descriptor_, num_output_records);
156-
DCHECK_EQ(output_descriptor_->size(), plan_node_->selected_cols().size());
157-
158-
for (const auto& [output_col_idx, input_col_idx] : Enumerate(plan_node_->selected_cols())) {
159-
auto input_col = rb.ColumnAt(input_col_idx);
160-
PX_RETURN_IF_ERROR(output_rb.AddColumn(input_col));
161-
/* auto col_type = output_descriptor_->type(output_col_idx); */
162-
/* #define TYPE_CASE(_dt_) \ */
163-
/* PX_RETURN_IF_ERROR(PredicateCopyValues<_dt_>(pred_col_wrapper, input_col.get(), &output_rb)); */
164-
/* PX_SWITCH_FOREACH_DATATYPE(col_type, TYPE_CASE); */
165-
/* #undef TYPE_CASE */
166-
}
137+
if (FLAGS_use_zero_copy_filter) {
138+
PX_ASSIGN_OR_RETURN(auto pred_col, evaluator_->EvaluateSingleExpression(
139+
exec_state, rb, *plan_node_->expression()));
140+
141+
// Verify that the type of the column is boolean.
142+
DCHECK_EQ(pred_col->data_type(), types::BOOLEAN) << "Predicate expression must be a boolean";
143+
144+
const types::BoolValueColumnWrapper& pred_col_wrapper =
145+
*static_cast<types::BoolValueColumnWrapper*>(pred_col.get());
146+
size_t num_pred = pred_col_wrapper.Size();
147+
148+
DCHECK_EQ(static_cast<size_t>(rb.num_rows()), num_pred);
149+
150+
std::vector<RowBatch> output_rbs;
151+
int64_t segment_start = -1;
152+
for (size_t idx = 0; idx < num_pred; ++idx) {
153+
if (pred_col_wrapper[idx].val) {
154+
if (segment_start == -1) {
155+
segment_start = idx;
156+
}
157+
} else {
158+
if (segment_start != -1) {
159+
output_rbs.emplace_back(rb.UnsafeSlice(segment_start, idx - segment_start));
160+
segment_start = -1;
161+
}
162+
}
163+
}
164+
165+
if (output_rbs.empty() && segment_start != -1) {
166+
output_rbs.emplace_back(rb.UnsafeSlice(segment_start, num_pred - segment_start));
167+
} else if (output_rbs.empty()) {
168+
// No rows matched the filter.
169+
PX_ASSIGN_OR_RETURN(auto empty_rb,
170+
RowBatch::WithZeroRows(*output_descriptor_, rb.eow(), rb.eos()));
171+
RowBatch& empty_rb_ref = *empty_rb;
172+
output_rbs.push_back(empty_rb_ref);
173+
}
167174

168-
output_rb.set_eow(rb.eow());
169-
output_rb.set_eos(rb.eos());
170-
PX_RETURN_IF_ERROR(SendRowBatchToChildren(exec_state, output_rb));
175+
auto& back = output_rbs.back();
176+
back.set_eow(rb.eow());
177+
back.set_eos(rb.eos());
178+
179+
for (auto it = output_rbs.begin(); it != output_rbs.end(); ++it) {
180+
auto& output_rb = *it;
181+
PX_RETURN_IF_ERROR(SendRowBatchToChildren(exec_state, output_rb));
182+
}
183+
} else {
184+
size_t num_output_records = rb.num_rows();
185+
RowBatch output_rb(*output_descriptor_, num_output_records);
186+
DCHECK_EQ(output_descriptor_->size(), plan_node_->selected_cols().size());
187+
188+
for (const auto& [output_col_idx, input_col_idx] : Enumerate(plan_node_->selected_cols())) {
189+
auto input_col = rb.ColumnAt(input_col_idx);
190+
PX_RETURN_IF_ERROR(output_rb.AddColumn(input_col));
191+
}
192+
193+
output_rb.set_eow(rb.eow());
194+
output_rb.set_eos(rb.eos());
195+
PX_RETURN_IF_ERROR(SendRowBatchToChildren(exec_state, output_rb));
196+
}
171197
return Status::OK();
172198
}
173199

src/carnot/exec/filter_node.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
#include "src/common/base/status.h"
3333
#include "src/table_store/table_store.h"
3434

35+
DECLARE_bool(use_zero_copy_filter);
36+
3537
namespace px {
3638
namespace carnot {
3739
namespace exec {

src/table_store/schema/row_batch.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,15 @@ StatusOr<std::unique_ptr<RowBatch>> RowBatch::Slice(int64_t offset, int64_t leng
265265
return output_rb;
266266
}
267267

268+
RowBatch RowBatch::UnsafeSlice(int64_t offset, int64_t length) const {
269+
auto output_rb = RowBatch(desc(), length);
270+
for (int64_t input_col_idx = 0; input_col_idx < num_columns(); ++input_col_idx) {
271+
auto col = ColumnAt(input_col_idx);
272+
PX_UNUSED(output_rb.AddColumn(col->Slice(offset, length)));
273+
}
274+
return output_rb;
275+
}
276+
268277
} // namespace schema
269278
} // namespace table_store
270279
} // namespace px

src/table_store/schema/row_batch.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ class RowBatch {
7575
* @return StatusOr<std::unique_ptr<RowBatch>>
7676
*/
7777
StatusOr<std::unique_ptr<RowBatch>> Slice(int64_t offset, int64_t length) const;
78+
RowBatch UnsafeSlice(int64_t offset, int64_t length) const;
7879

7980
/**
8081
* Adds the given column to the row batch, given that it correctly fits the schema.

0 commit comments

Comments
 (0)