Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
358d7a4
matching in strategies.scala
octaviansima Feb 10, 2021
7b95d90
Separate IN PR (#124)
Chenyu-Shi Jan 30, 2021
87e821a
Merge branch 'bnlj-only' into left-outer
octaviansima Feb 23, 2021
4158b40
delete Join.cpp
octaviansima Feb 23, 2021
0bfd2e1
Merge branch 'master' into left-outer
octaviansima Feb 25, 2021
83f239e
cleanup
octaviansima Feb 25, 2021
5981151
Merge branch 'master' into left-outer (C++ conflicts not yet resolved)
octaviansima Feb 26, 2021
51518e4
code works but it looks terrible
octaviansima Feb 26, 2021
49343c4
is_left_existence
octaviansima Feb 26, 2021
3ae240c
renaming for clarity
octaviansima Feb 26, 2021
c312a6d
calls to write_output_rows
octaviansima Feb 26, 2021
b0c3cea
write_output_rows optional argument
octaviansima Feb 26, 2021
022c888
renaming
octaviansima Feb 26, 2021
ae1f96c
cleanup
octaviansima Feb 26, 2021
39a03c1
is left anti or outer in join_expr_eval
octaviansima Mar 1, 2021
c4c7f4c
left outer broadcast works
octaviansima Mar 1, 2021
a17d512
still works, better
octaviansima Mar 1, 2021
97fbc2f
put code in EncryptAddDummyRows
octaviansima Mar 1, 2021
8fb04f6
equi join works too
octaviansima Mar 1, 2021
2f2a1fd
ignore empty case for equi joins
octaviansima Mar 1, 2021
cf79530
right outer 2 failing, but didn't start matching
octaviansima Mar 1, 2021
aa537ca
some refactoring
octaviansima Mar 1, 2021
f6fa2d1
more refactoring
octaviansima Mar 1, 2021
26d48eb
tests are passing
octaviansima Mar 1, 2021
b8a8fc8
cleanup done
octaviansima Mar 1, 2021
8c6752b
Merge branch 'master' into left-outer
octaviansima Mar 1, 2021
aa3a577
more clean up
octaviansima Mar 1, 2021
a5af1ec
addnulls for equi join as well, right outer join 2 failing occasionally
octaviansima Mar 2, 2021
5b5208c
left outer join 2 failing sporadically as well in multi case, is because
octaviansima Mar 2, 2021
a3dd8ce
wip
octaviansima Mar 2, 2021
8819c33
using last_primary_of_group instead of foreign
octaviansima Mar 2, 2021
46096f8
comments, attempt to explain
octaviansima Mar 2, 2021
9ed1b2a
mend
octaviansima Mar 2, 2021
1642a26
no more primary/foreign
octaviansima Mar 2, 2021
a461179
left outer join 2 passing
octaviansima Mar 2, 2021
7c7ffb0
primary/foreign switching back in operators.scala
octaviansima Mar 2, 2021
f2c618a
EncryptedAddDummyRowsExec no longer splitting one partition into two
octaviansima Mar 2, 2021
c0d4862
left outer join 1 (empty foreign table) passes
octaviansima Mar 2, 2021
8d7b3ce
13 passes
octaviansima Mar 2, 2021
ae5cd35
move null row creation into enclave
octaviansima Mar 2, 2021
ffb7341
cleanup
octaviansima Mar 2, 2021
01422a8
throw error if ExistenceJoin or FullOuter
octaviansima Mar 3, 2021
5ea7cd8
dummy row first
octaviansima Mar 3, 2021
7a7414a
Merge branch 'master' into left-outer
octaviansima Mar 4, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions src/enclave/Enclave/BroadcastNestedLoopJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,99 @@ void broadcast_nested_loop_join(uint8_t *join_expr, size_t join_expr_length, uin
FlatbuffersJoinExprEvaluator join_expr_eval(join_expr, join_expr_length);
const tuix::JoinType join_type = join_expr_eval.get_join_type();

switch(join_type) {
case tuix::JoinType_LeftSemi:
case tuix::JoinType_LeftAnti:
default_join(join_expr, join_expr_length,
outer_rows, outer_rows_length,
inner_rows, inner_rows_length,
output_rows, output_rows_length);
break;
case tuix::JoinType_LeftOuter:
case tuix::JoinType_RightOuter:
outer_join(join_expr, join_expr_length,
outer_rows, outer_rows_length,
inner_rows, inner_rows_length,
output_rows, output_rows_length);
break;
default:
throw std::runtime_error(
std::string("Join type not supported: ")
+ std::string(to_string(join_type)));
}
}

void outer_join(
uint8_t *join_expr, size_t join_expr_length,
uint8_t *outer_rows, size_t outer_rows_length,
uint8_t *inner_rows, size_t inner_rows_length,
uint8_t **output_rows, size_t *output_rows_length) {

FlatbuffersJoinExprEvaluator join_expr_eval(join_expr, join_expr_length);
const tuix::JoinType join_type = join_expr_eval.get_join_type();

RowReader outer_r(BufferRefView<tuix::EncryptedBlocks>(outer_rows, outer_rows_length));
RowWriter w;

FlatbuffersTemporaryRow last_inner;

while (outer_r.has_next()) {
const tuix::Row *outer = outer_r.next();
bool o_i_match = false;
RowReader inner_r(BufferRefView<tuix::EncryptedBlocks>(inner_rows, inner_rows_length));

// Use peek() to get the schema of the inner table
const tuix::Row *inner = inner_r.peek();

while (inner_r.has_next()) {
inner = inner_r.next();
bool condition_met = join_expr_eval.is_right_join() ?
join_expr_eval.eval_condition(inner, outer) :
join_expr_eval.eval_condition(outer, inner);
if (!inner->is_dummy() && condition_met) {
switch(join_type) {
case tuix::JoinType_LeftOuter:
w.append(outer, inner);
break;
case tuix::JoinType_RightOuter:
w.append(inner, outer);
break;
default:
break;
}
o_i_match |= condition_met;
}
}

switch(join_type) {
case tuix::JoinType_LeftOuter:
if (!o_i_match) {
// Values of inner (right) do not matter: they are all set to null
w.append(outer, inner, false, true);
}
break;
case tuix::JoinType_RightOuter:
if (!o_i_match) {
// Values of inner (left) do not matter: they are all set to null
w.append(inner, outer, true, false);
}
break;
default:
break;
}
}
w.output_buffer(output_rows, output_rows_length);
}

void default_join(
uint8_t *join_expr, size_t join_expr_length,
uint8_t *outer_rows, size_t outer_rows_length,
uint8_t *inner_rows, size_t inner_rows_length,
uint8_t **output_rows, size_t *output_rows_length) {

FlatbuffersJoinExprEvaluator join_expr_eval(join_expr, join_expr_length);
const tuix::JoinType join_type = join_expr_eval.get_join_type();

RowReader outer_r(BufferRefView<tuix::EncryptedBlocks>(outer_rows, outer_rows_length));
RowWriter w;

Expand Down
21 changes: 17 additions & 4 deletions src/enclave/Enclave/BroadcastNestedLoopJoin.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,20 @@
#include <cstddef>
#include <cstdint>

void broadcast_nested_loop_join(uint8_t *join_expr, size_t join_expr_length, uint8_t *outer_rows,
size_t outer_rows_length, uint8_t *inner_rows,
size_t inner_rows_length, uint8_t **output_rows,
size_t *output_rows_length);
void broadcast_nested_loop_join(
uint8_t *join_expr, size_t join_expr_length,
uint8_t *outer_rows, size_t outer_rows_length,
uint8_t *inner_rows, size_t inner_rows_length,
uint8_t **output_rows, size_t *output_rows_length);

void outer_join(
uint8_t *join_expr, size_t join_expr_length,
uint8_t *outer_rows, size_t outer_rows_length,
uint8_t *inner_rows, size_t inner_rows_length,
uint8_t **output_rows, size_t *output_rows_length);

void default_join(
uint8_t *join_expr, size_t join_expr_length,
uint8_t *outer_rows, size_t outer_rows_length,
uint8_t *inner_rows, size_t inner_rows_length,
uint8_t **output_rows, size_t *output_rows_length);
9 changes: 9 additions & 0 deletions src/enclave/Enclave/ExpressionEvaluation.h
Original file line number Diff line number Diff line change
Expand Up @@ -1683,6 +1683,15 @@ class FlatbuffersJoinExprEvaluator {

tuix::JoinType get_join_type() { return join_type; }

bool is_right_join() {
return join_type == tuix::JoinType_RightOuter;
}

bool is_outer_join() {
return join_type == tuix::JoinType_LeftOuter ||
join_type == tuix::JoinType_RightOuter;
}

private:
flatbuffers::FlatBufferBuilder builder;
tuix::JoinType join_type;
Expand Down
4 changes: 4 additions & 0 deletions src/enclave/Enclave/FlatbuffersReaders.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ const tuix::Row *RowReader::next() {
return block_reader.next();
}

const tuix::Row *RowReader::peek() {
return block_reader.peek();
}

void RowReader::init_block_reader() {
if (block_idx < encrypted_blocks->blocks()->size()) {
block_reader.reset(encrypted_blocks->blocks()->Get(block_idx));
Expand Down
6 changes: 6 additions & 0 deletions src/enclave/Enclave/FlatbuffersReaders.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ class EncryptedBlockToRowReader {

const tuix::Row *next() { return rows->rows()->Get(row_idx++); }

const tuix::Row *peek() {
return rows->rows()->Get(row_idx);
}

flatbuffers::Vector<flatbuffers::Offset<tuix::Row>>::const_iterator begin() {
return rows->rows()->begin();
}
Expand Down Expand Up @@ -47,6 +51,8 @@ class RowReader {
bool has_next();
/** Access the next Row. Invalidates any previously-returned Row pointers. */
const tuix::Row *next();
/** Access the next Row without incrementing the reader. */
const tuix::Row *peek();

private:
void init_block_reader();
Expand Down
14 changes: 7 additions & 7 deletions src/enclave/Enclave/FlatbuffersWriters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,32 @@ void RowWriter::clear() {
finished = false;
}

void RowWriter::append(const tuix::Row *row) {
rows_vector.push_back(flatbuffers_copy(row, builder));
void RowWriter::append(const tuix::Row *row, bool force_null) {
rows_vector.push_back(flatbuffers_copy(row, builder, force_null));
total_num_rows++;
maybe_finish_block();
}

void RowWriter::append(const std::vector<const tuix::Field *> &row_fields) {
void RowWriter::append(const std::vector<const tuix::Field *> &row_fields, bool force_null) {
flatbuffers::uoffset_t num_fields = row_fields.size();
std::vector<flatbuffers::Offset<tuix::Field>> field_values(num_fields);
for (flatbuffers::uoffset_t i = 0; i < num_fields; i++) {
field_values[i] = flatbuffers_copy<tuix::Field>(row_fields[i], builder);
field_values[i] = flatbuffers_copy<tuix::Field>(row_fields[i], builder, force_null);
}
rows_vector.push_back(tuix::CreateRowDirect(builder, &field_values));
total_num_rows++;
maybe_finish_block();
}

void RowWriter::append(const tuix::Row *row1, const tuix::Row *row2) {
void RowWriter::append(const tuix::Row *row1, const tuix::Row *row2, bool row1_force_null, bool row2_force_null) {
flatbuffers::uoffset_t num_fields = row1->field_values()->size() + row2->field_values()->size();
std::vector<flatbuffers::Offset<tuix::Field>> field_values(num_fields);
flatbuffers::uoffset_t i = 0;
for (auto it = row1->field_values()->begin(); it != row1->field_values()->end(); ++it, ++i) {
field_values[i] = flatbuffers_copy<tuix::Field>(*it, builder);
field_values[i] = flatbuffers_copy<tuix::Field>(*it, builder, row1_force_null);
}
for (auto it = row2->field_values()->begin(); it != row2->field_values()->end(); ++it, ++i) {
field_values[i] = flatbuffers_copy<tuix::Field>(*it, builder);
field_values[i] = flatbuffers_copy<tuix::Field>(*it, builder, row2_force_null);
}
rows_vector.push_back(tuix::CreateRowDirect(builder, &field_values));
total_num_rows++;
Expand Down
9 changes: 4 additions & 5 deletions src/enclave/Enclave/FlatbuffersWriters.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,13 @@ class RowWriter {
void clear();

/** Append the given Row. */
void append(const tuix::Row *row);
void append(const tuix::Row *row, bool force_null = false);

/** Append the given `Field`s as a Row. */
void append(const std::vector<const tuix::Field *> &row_fields);
void append(const std::vector<const tuix::Field *> &row_fields, bool force_null = false);

/** Concatenate the fields of the two given `Row`s and append the resulting
* single Row. */
void append(const tuix::Row *row1, const tuix::Row *row2);
/** Concatenate the fields of the two given `Row`s and append the resulting single Row. */
void append(const tuix::Row *row1, const tuix::Row *row2, bool row1_force_null = false, bool row2_force_null = false);

/** Expose the stored rows as a buffer. */
UntrustedBufferRef<tuix::EncryptedBlocks> output_buffer();
Expand Down
94 changes: 65 additions & 29 deletions src/enclave/Enclave/NonObliviousSortMergeJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,27 @@ void test_rows_same_group(FlatbuffersJoinExprEvaluator &join_expr_eval, const tu
}
}

void write_output_rows(RowWriter &group, RowWriter &w) {
auto group_buffer = group.output_buffer();
RowReader group_reader(group_buffer.view());

while (group_reader.has_next()) {
const tuix::Row *row = group_reader.next();
w.append(row);
}
void write_output_rows(RowWriter &input,
RowWriter &output,
tuix::JoinType join_type,
const tuix::Row *foreign_row = nullptr) {
auto input_buffer = input.output_buffer();
RowReader input_reader(input_buffer.view());

while (input_reader.has_next()) {
const tuix::Row *row = input_reader.next();
if (foreign_row == nullptr) {
output.append(row);
} else if (join_type == tuix::JoinType_LeftOuter) {
output.append(row, foreign_row, false, true);
} else if (join_type == tuix::JoinType_RightOuter) {
output.append(foreign_row, row, true, false);
} else {
throw std::runtime_error(
std::string("write_output_rows should not take a foreign row with join type ")
+ to_string(join_type));
}
}
}

/**
Expand Down Expand Up @@ -61,32 +74,43 @@ void non_oblivious_sort_merge_join(uint8_t *join_expr, size_t join_expr_length,
RowWriter w;

RowWriter primary_group;
FlatbuffersTemporaryRow last_primary_of_group;
RowWriter primary_matched_rows,
primary_unmatched_rows; // This is only used for left semi/anti join
primary_unmatched_rows; // These are used for all joins but inner
FlatbuffersTemporaryRow last_primary_of_group;

// Used for outer rows to get the schema of the foreign table.
// A "dummy" row with the desired schema is added for each partition,
// so last_foreign_row.get() is guaranteed to not be null.
FlatbuffersTemporaryRow last_foreign_row;

while (r.has_next()) {
const tuix::Row *current = r.next();
if (current->is_dummy()) {
last_foreign_row.set(current);
continue;
}

if (join_expr_eval.is_primary(current)) {
if (last_primary_of_group.get() &&
join_expr_eval.is_same_group(last_primary_of_group.get(), current)) {

// Add this primary row to the current group
// If this is a left semi/anti join, also add the rows to
// primary_unmatched_rows
primary_group.append(current);
if (join_type == tuix::JoinType_LeftSemi || join_type == tuix::JoinType_LeftAnti) {
if (join_type != tuix::JoinType_Inner) {
primary_unmatched_rows.append(current);
}
last_primary_of_group.set(current);

} else {
// If a new primary group is encountered
if (join_type == tuix::JoinType_LeftSemi) {
write_output_rows(primary_matched_rows, w);
write_output_rows(primary_matched_rows, w, join_type);
} else if (join_type == tuix::JoinType_LeftAnti) {
write_output_rows(primary_unmatched_rows, w);
write_output_rows(primary_unmatched_rows, w, join_type);
} else if (join_expr_eval.is_outer_join()) {
// Dummy row is always guaranteed to be the first row, so last_foreign_row.get() cannot
// be null.
write_output_rows(primary_unmatched_rows, w, join_type, last_foreign_row.get());
}

primary_group.clear();
Expand All @@ -98,20 +122,27 @@ void non_oblivious_sort_merge_join(uint8_t *join_expr, size_t join_expr_length,
last_primary_of_group.set(current);
}
} else {
if (last_primary_of_group.get() &&
join_expr_eval.is_same_group(last_primary_of_group.get(), current)) {
if (join_type == tuix::JoinType_Inner) {
last_foreign_row.set(current);
if (last_primary_of_group.get()
&& join_expr_eval.is_same_group(last_primary_of_group.get(), current)) {
if (join_type == tuix::JoinType_Inner || join_expr_eval.is_outer_join()) {
auto primary_group_buffer = primary_group.output_buffer();
RowReader primary_group_reader(primary_group_buffer.view());

while (primary_group_reader.has_next()) {
const tuix::Row *primary = primary_group_reader.next();
test_rows_same_group(join_expr_eval, primary, current);

if (join_expr_eval.eval_condition(primary, current)) {
w.append(primary, current);
if (join_expr_eval.is_right_join()) {
w.append(current, primary);
} else {
w.append(primary, current);
}
}
}
} else if (join_type == tuix::JoinType_LeftSemi || join_type == tuix::JoinType_LeftAnti) {
}
if (join_type != tuix::JoinType_Inner) {
auto primary_unmatched_rows_buffer = primary_unmatched_rows.output_buffer();
RowReader primary_unmatched_rows_reader(primary_unmatched_rows_buffer.view());
RowWriter new_primary_unmatched_rows;
Expand All @@ -128,20 +159,25 @@ void non_oblivious_sort_merge_join(uint8_t *join_expr, size_t join_expr_length,

// Reset primary_unmatched_rows
primary_unmatched_rows.clear();
auto new_primary_unmatched_rows_buffer = new_primary_unmatched_rows.output_buffer();
RowReader new_primary_unmatched_rows_reader(new_primary_unmatched_rows_buffer.view());
while (new_primary_unmatched_rows_reader.has_next()) {
primary_unmatched_rows.append(new_primary_unmatched_rows_reader.next());
}
write_output_rows(new_primary_unmatched_rows, primary_unmatched_rows, join_type);
}
}
}
}

if (join_type == tuix::JoinType_LeftSemi) {
write_output_rows(primary_matched_rows, w);
} else if (join_type == tuix::JoinType_LeftAnti) {
write_output_rows(primary_unmatched_rows, w);
switch (join_type) {
case tuix::JoinType_LeftSemi:
write_output_rows(primary_matched_rows, w, join_type);
break;
case tuix::JoinType_LeftAnti:
write_output_rows(primary_unmatched_rows, w, join_type);
break;
case tuix::JoinType_LeftOuter:
case tuix::JoinType_RightOuter:
write_output_rows(primary_unmatched_rows, w, join_type, last_foreign_row.get());
break;
default:
break;
}

w.output_buffer(output_rows, output_rows_length);
Expand Down
Loading