Skip to content

Commit

Permalink
[table_store] Refactor Table Compaction to Reduce Memory Fragmentation.
Browse files Browse the repository at this point in the history
Summary:
Using the new `table/internal` utilities, this diff refactors the Table implementation to always compact to the compaction size (previously, it never split hot batches into smaller batches).
In addition, `BatchSlice`'s are replaced by a `Cursor` object that keeps track of where in the table a consumer is. The previous BatchSlice interface was complicated, and this new Cursor interface is much cleaner, and has the added benefit of making the refactor easier.

The reduction in memory fragmentation from this change is significant. The first image below shows memory utilization statistics during the course of multiple table store compaction cycles before this change. The next image shows the same after this change.
{F183775}
{F183776}
As seen in the images, memory utilization before the change went down to about 50% after the first two compactions. After this change, memory utilization after the first two compactions is >80%.

There is a cost to this change, since writes to the table store are now considerably more expensive, previously a single write would take about 0.5us now it takes 1us. However, compaction time is reduced significantly as well. So there is an expectation with this diff of some amortized performance decrease, but the memory savings are significant enough that the minor overall performance decrease should be acceptable.

Here are the benchmarks:
{P182}
And the p-value's for these benchmarks:
{P181}
Keep in mind that the p-value being low just means theres a statistically significant difference between the two, the sign of the difference has to be taken from the mean benchmarks above.

Test Plan: Added more table tests (and refactored existing ones to use the new Cursor). Ran microbenchmarks as well as integrated memory study shown above. Tested on a skaffolded cluster to make sure things work as normal.

Reviewers: nserrino, philkuz, vihang, zasgar

Reviewed By: nserrino

Signed-off-by: James Bartlett <jamesbartlett@pixielabs.ai>

Differential Revision: https://phab.corp.pixielabs.ai/D11199

GitOrigin-RevId: 2789884
  • Loading branch information
JamesMBartlett authored and copybaranaut committed Apr 18, 2022
1 parent 2d76524 commit 480882d
Show file tree
Hide file tree
Showing 11 changed files with 742 additions and 1,251 deletions.
10 changes: 3 additions & 7 deletions src/carnot/carnot_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -352,10 +352,8 @@ TEST_F(CarnotTest, range_test_multiple_rbs) {
std::vector<types::Time64NSValue> col0_out1;
std::vector<types::Float64Value> col1_out1;
std::vector<types::Int64Value> col2_out1;
auto slice = big_table_->FirstBatch();
auto batch =
big_table_->GetRowBatchSlice(slice, {0}, arrow::default_memory_pool()).ConsumeValueOrDie();

table_store::Table::Cursor cursor(big_table_.get());
auto batch = cursor.GetNextRowBatch({0}).ConsumeValueOrDie();
for (int64_t i = 0; i < batch->ColumnAt(0)->length(); i++) {
if (CarnotTestUtils::big_test_col1[i].val >= 2 && CarnotTestUtils::big_test_col1[i].val < 6) {
col0_out1.emplace_back(CarnotTestUtils::big_test_col1[i].val);
Expand All @@ -373,9 +371,7 @@ TEST_F(CarnotTest, range_test_multiple_rbs) {
std::vector<types::Time64NSValue> col0_out2;
std::vector<types::Float64Value> col1_out2;
std::vector<types::Int64Value> col2_out2;
auto next_batch =
big_table_->GetRowBatchSlice(big_table_->NextBatch(slice), {0}, arrow::default_memory_pool())
.ConsumeValueOrDie();
auto next_batch = cursor.GetNextRowBatch({0}).ConsumeValueOrDie();
for (int64_t i = batch->ColumnAt(0)->length();
i < batch->ColumnAt(0)->length() + next_batch->ColumnAt(0)->length(); i++) {
if (CarnotTestUtils::big_test_col1[i].val >= start_time &&
Expand Down
79 changes: 25 additions & 54 deletions src/carnot/exec/exec_graph_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -191,18 +191,11 @@ TEST_P(ExecGraphExecuteTest, execute) {
auto output_table = exec_state_->table_store()->GetTable("output");
std::vector<types::Float64Value> out_in1 = {4.8, 16.4, 26.4};
std::vector<types::Float64Value> out_in2 = {14.8, 12.4};
auto slice = output_table->FirstBatch();
EXPECT_TRUE(
output_table->GetRowBatchSlice(slice, std::vector<int64_t>({0}), arrow::default_memory_pool())
.ConsumeValueOrDie()
->ColumnAt(0)
->Equals(types::ToArrow(out_in1, arrow::default_memory_pool())));
slice = output_table->NextBatch(slice);
EXPECT_TRUE(
output_table->GetRowBatchSlice(slice, std::vector<int64_t>({0}), arrow::default_memory_pool())
.ConsumeValueOrDie()
->ColumnAt(0)
->Equals(types::ToArrow(out_in2, arrow::default_memory_pool())));
table_store::Table::Cursor cursor(output_table);
EXPECT_TRUE(cursor.GetNextRowBatch({0}).ConsumeValueOrDie()->ColumnAt(0)->Equals(
types::ToArrow(out_in1, arrow::default_memory_pool())));
EXPECT_TRUE(cursor.GetNextRowBatch({0}).ConsumeValueOrDie()->ColumnAt(0)->Equals(
types::ToArrow(out_in2, arrow::default_memory_pool())));
}

std::vector<std::tuple<int32_t>> calls_to_execute = {
Expand Down Expand Up @@ -279,18 +272,11 @@ TEST_F(ExecGraphTest, execute_time) {
auto output_table = exec_state_->table_store()->GetTable("output");
std::vector<types::Float64Value> out_in1 = {4.8, 16.4, 26.4};
std::vector<types::Float64Value> out_in2 = {14.8, 12.4};
auto slice = output_table->FirstBatch();
EXPECT_TRUE(
output_table->GetRowBatchSlice(slice, std::vector<int64_t>({0}), arrow::default_memory_pool())
.ConsumeValueOrDie()
->ColumnAt(0)
->Equals(types::ToArrow(out_in1, arrow::default_memory_pool())));
slice = output_table->NextBatch(slice);
EXPECT_TRUE(
output_table->GetRowBatchSlice(slice, std::vector<int64_t>({0}), arrow::default_memory_pool())
.ConsumeValueOrDie()
->ColumnAt(0)
->Equals(types::ToArrow(out_in2, arrow::default_memory_pool())));
table_store::Table::Cursor cursor(output_table);
EXPECT_TRUE(cursor.GetNextRowBatch({0}).ConsumeValueOrDie()->ColumnAt(0)->Equals(
types::ToArrow(out_in1, arrow::default_memory_pool())));
EXPECT_TRUE(cursor.GetNextRowBatch({0}).ConsumeValueOrDie()->ColumnAt(0)->Equals(
types::ToArrow(out_in2, arrow::default_memory_pool())));
}

TEST_F(ExecGraphTest, two_limits_dont_interfere) {
Expand Down Expand Up @@ -349,16 +335,11 @@ TEST_F(ExecGraphTest, two_limits_dont_interfere) {
std::vector<types::Int64Value> out_col1 = {1, 2};
std::vector<types::BoolValue> out_col2 = {true, false};
std::vector<types::Float64Value> out_col3 = {1.4, 6.2};
auto out_rb1 =
output_table1
->GetRowBatchSlice(output_table1->FirstBatch(), std::vector<int64_t>({0, 1, 2}),
arrow::default_memory_pool())
.ConsumeValueOrDie();
auto out_rb2 =
output_table2
->GetRowBatchSlice(output_table2->FirstBatch(), std::vector<int64_t>({0, 1, 2}),
arrow::default_memory_pool())
.ConsumeValueOrDie();
table_store::Table::Cursor cursor1(output_table1);
table_store::Table::Cursor cursor2(output_table2);

auto out_rb1 = cursor1.GetNextRowBatch(std::vector<int64_t>({0, 1, 2})).ConsumeValueOrDie();
auto out_rb2 = cursor2.GetNextRowBatch(std::vector<int64_t>({0, 1, 2})).ConsumeValueOrDie();
EXPECT_TRUE(out_rb1->ColumnAt(0)->Equals(types::ToArrow(out_col1, arrow::default_memory_pool())));
EXPECT_TRUE(out_rb1->ColumnAt(1)->Equals(types::ToArrow(out_col2, arrow::default_memory_pool())));
EXPECT_TRUE(out_rb1->ColumnAt(2)->Equals(types::ToArrow(out_col3, arrow::default_memory_pool())));
Expand Down Expand Up @@ -421,10 +402,8 @@ TEST_F(ExecGraphTest, limit_w_multiple_srcs) {
std::vector<types::Int64Value> out_col1 = {1, 2};
std::vector<types::BoolValue> out_col2 = {true, false};
std::vector<types::Float64Value> out_col3 = {1.4, 6.2};
auto out_rb = output_table
->GetRowBatchSlice(output_table->FirstBatch(), std::vector<int64_t>({0, 1, 2}),
arrow::default_memory_pool())
.ConsumeValueOrDie();
table_store::Table::Cursor cursor(output_table);
auto out_rb = cursor.GetNextRowBatch(std::vector<int64_t>({0, 1, 2})).ConsumeValueOrDie();
EXPECT_TRUE(out_rb->ColumnAt(0)->Equals(types::ToArrow(out_col1, arrow::default_memory_pool())));
EXPECT_TRUE(out_rb->ColumnAt(1)->Equals(types::ToArrow(out_col2, arrow::default_memory_pool())));
EXPECT_TRUE(out_rb->ColumnAt(2)->Equals(types::ToArrow(out_col3, arrow::default_memory_pool())));
Expand Down Expand Up @@ -485,10 +464,8 @@ TEST_F(ExecGraphTest, two_sequential_limits) {
std::vector<types::Int64Value> out_col1 = {1, 2};
std::vector<types::BoolValue> out_col2 = {true, false};
std::vector<types::Float64Value> out_col3 = {1.4, 6.2};
auto out_rb = output_table
->GetRowBatchSlice(output_table->FirstBatch(), std::vector<int64_t>({0, 1, 2}),
arrow::default_memory_pool())
.ConsumeValueOrDie();
table_store::Table::Cursor cursor(output_table);
auto out_rb = cursor.GetNextRowBatch({0, 1, 2}).ConsumeValueOrDie();
EXPECT_TRUE(out_rb->ColumnAt(0)->Equals(types::ToArrow(out_col1, arrow::default_memory_pool())));
EXPECT_TRUE(out_rb->ColumnAt(1)->Equals(types::ToArrow(out_col2, arrow::default_memory_pool())));
EXPECT_TRUE(out_rb->ColumnAt(2)->Equals(types::ToArrow(out_col3, arrow::default_memory_pool())));
Expand Down Expand Up @@ -549,18 +526,12 @@ TEST_F(ExecGraphTest, execute_with_two_limits) {
auto output_table_1 = exec_state_->table_store()->GetTable("output1");
auto output_table_2 = exec_state_->table_store()->GetTable("output2");
std::vector<types::Float64Value> out_in1 = {1.4, 6.2};
EXPECT_TRUE(output_table_1
->GetRowBatchSlice(output_table_1->FirstBatch(), std::vector<int64_t>({2}),
arrow::default_memory_pool())
.ConsumeValueOrDie()
->ColumnAt(0)
->Equals(types::ToArrow(out_in1, arrow::default_memory_pool())));
EXPECT_TRUE(output_table_2
->GetRowBatchSlice(output_table_2->FirstBatch(), std::vector<int64_t>({2}),
arrow::default_memory_pool())
.ConsumeValueOrDie()
->ColumnAt(0)
->Equals(types::ToArrow(out_in1, arrow::default_memory_pool())));
table_store::Table::Cursor cursor1(output_table_1);
EXPECT_TRUE(cursor1.GetNextRowBatch({2}).ConsumeValueOrDie()->ColumnAt(0)->Equals(
types::ToArrow(out_in1, arrow::default_memory_pool())));
table_store::Table::Cursor cursor2(output_table_2);
EXPECT_TRUE(cursor2.GetNextRowBatch({2}).ConsumeValueOrDie()->ColumnAt(0)->Equals(
types::ToArrow(out_in1, arrow::default_memory_pool())));
}

class YieldingExecGraphTest : public BaseExecGraphTest {
Expand Down
13 changes: 7 additions & 6 deletions src/carnot/exec/memory_sink_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ TEST_F(MemorySinkNodeTest, basic) {
false, 0);

auto table = exec_state_->table_store()->GetTable("cpu_15s");
auto slice = table->FirstBatch();
auto batch_or_s = table->GetRowBatchSlice(slice, {0, 1}, arrow::default_memory_pool());
table_store::Table::Cursor cursor(table);
auto batch_or_s = cursor.GetNextRowBatch({0, 1});
EXPECT_OK(batch_or_s);
auto batch = batch_or_s.ConsumeValueOrDie();
EXPECT_EQ(types::DataType::INT64, batch->desc().type(0));
Expand All @@ -103,8 +103,9 @@ TEST_F(MemorySinkNodeTest, basic) {
false, 0)
.Close();

slice = table->NextBatch(slice);
batch_or_s = table->GetRowBatchSlice(slice, {0, 1}, arrow::default_memory_pool());
// Update stop spec of the cursor to include the new row batch.
cursor.UpdateStopSpec(table_store::Table::Cursor::StopSpec{});
batch_or_s = cursor.GetNextRowBatch({0, 1});
EXPECT_OK(batch_or_s);
batch = batch_or_s.ConsumeValueOrDie();
EXPECT_TRUE(batch->ColumnAt(0)->Equals(col1_rb2_arrow));
Expand Down Expand Up @@ -146,8 +147,8 @@ TEST_F(MemorySinkNodeTest, zero_row_row_batch_not_eos) {
.Close();

auto table = exec_state_->table_store()->GetTable("cpu_15s");
auto slice = table->FirstBatch();
auto batch_or_s = table->GetRowBatchSlice(slice, {0, 1}, arrow::default_memory_pool());
table_store::Table::Cursor cursor(table);
auto batch_or_s = cursor.GetNextRowBatch({0, 1});
EXPECT_OK(batch_or_s);
auto batch = batch_or_s.ConsumeValueOrDie();
EXPECT_TRUE(batch->ColumnAt(0)->Equals(col1_rb2_arrow));
Expand Down
68 changes: 27 additions & 41 deletions src/carnot/exec/memory_source_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/

#include "src/carnot/exec/memory_source_node.h"
#include "src/table_store/table/table.h"

#include <limits>
#include <string>
Expand All @@ -31,6 +32,9 @@ namespace px {
namespace carnot {
namespace exec {

using StartSpec = Table::Cursor::StartSpec;
using StopSpec = Table::Cursor::StopSpec;

std::string MemorySourceNode::DebugStringImpl() {
return absl::Substitute("Exec::MemorySourceNode: <name: $0, output: $1>", plan_node_->TableName(),
output_descriptor_->DebugString());
Expand All @@ -57,21 +61,25 @@ Status MemorySourceNode::OpenImpl(ExecState* exec_state) {
return error::NotFound("Table '$0' not found", plan_node_->TableName());
}

StartSpec start_spec;
if (plan_node_->HasStartTime()) {
PL_ASSIGN_OR_RETURN(current_batch_, table_->FindBatchSliceGreaterThanOrEqual(
plan_node_->start_time(), exec_state->exec_mem_pool()));
start_spec.type = StartSpec::StartType::StartAtTime;
start_spec.start_time = plan_node_->start_time();
} else {
current_batch_ = table_->FirstBatch();
start_spec.type = StartSpec::StartType::CurrentStartOfTable;
}

StopSpec stop_spec;
if (plan_node_->HasStopTime()) {
PL_ASSIGN_OR_RETURN(stop_, table_->FindStopPositionForTime(plan_node_->stop_time(),
exec_state->exec_mem_pool()));
stop_spec.type = StopSpec::StopType::StopAtTime;
stop_spec.stop_time = plan_node_->stop_time();
} else if (infinite_stream_) {
stop_spec.type = StopSpec::StopType::Infinite;
} else {
// Determine table_end at Open() time because Stirling may be pushing to the table
stop_ = table_->End();
stop_spec.type = StopSpec::StopType::CurrentEndOfTable;
}
current_batch_ = table_->SliceIfPastStop(current_batch_, stop_);
cursor_ = std::make_unique<Table::Cursor>(table_, start_spec, stop_spec);

return Status::OK();
}
Expand All @@ -81,44 +89,28 @@ Status MemorySourceNode::CloseImpl(ExecState*) {
return Status::OK();
}

StatusOr<std::unique_ptr<RowBatch>> MemorySourceNode::GetNextRowBatch(ExecState* exec_state) {
StatusOr<std::unique_ptr<RowBatch>> MemorySourceNode::GetNextRowBatch(ExecState*) {
DCHECK(table_ != nullptr);

if (infinite_stream_ && wait_for_valid_next_) {
// If it's an infinite_stream that has read out all the current data in the table, we have to
// keep around the last batch the infinite stream output and keep checking if the next batch
// after that is valid so that when stirling writes more data we are able to access it.
stop_ = table_->End();
auto next_batch = table_->NextBatch(current_batch_, stop_);
if (!next_batch.IsValid()) {
return RowBatch::WithZeroRows(*output_descriptor_, /* eow */ false, /* eos */ false);
}
current_batch_ = next_batch;
wait_for_valid_next_ = false;
}

if (!current_batch_.IsValid()) {
return RowBatch::WithZeroRows(*output_descriptor_, /* eow */ !infinite_stream_,
/* eos */ !infinite_stream_);
if (!cursor_->NextBatchReady()) {
// If the NextBatch is not ready, but the cursor is not yet exhausted, then we need to output
// 0-row row batches, while we wait for more data to be added. This currently only occurs in the
// case of an infinite stream. In the future, it should also occur when a stop time is set in
// the future, but this is not yet supported by Table.
// If the cursor is exhausted, then we return a 0-row row batch with eow=eos=true.
return RowBatch::WithZeroRows(*output_descriptor_, /* eow */ cursor_->Done(),
/* eos */ cursor_->Done());
}

PL_ASSIGN_OR_RETURN(
auto row_batch,
table_->GetRowBatchSlice(current_batch_, plan_node_->Columns(), exec_state->exec_mem_pool()));
PL_ASSIGN_OR_RETURN(auto row_batch, cursor_->GetNextRowBatch(plan_node_->Columns()));

rows_processed_ += row_batch->num_rows();
bytes_processed_ += row_batch->NumBytes();
auto next_batch = table_->NextBatch(current_batch_, stop_);
if (infinite_stream_ && !next_batch.IsValid()) {
wait_for_valid_next_ = true;
} else {
current_batch_ = next_batch;
}

// If infinite stream is set, we don't send Eow or Eos. Infinite streams therefore never cause
// HasBatchesRemaining to be false. Instead the outer loop that calls GenerateNext() is
// responsible for managing whether we continue the stream or end it.
if (!current_batch_.IsValid() && !infinite_stream_) {
if (cursor_->Done() && !infinite_stream_) {
row_batch->set_eow(true);
row_batch->set_eos(true);
}
Expand All @@ -131,13 +123,7 @@ Status MemorySourceNode::GenerateNextImpl(ExecState* exec_state) {
return Status::OK();
}

bool MemorySourceNode::InfiniteStreamNextBatchReady() {
if (!wait_for_valid_next_) {
return current_batch_.IsValid();
}
auto next_batch = table_->NextBatch(current_batch_);
return next_batch.IsValid();
}
bool MemorySourceNode::InfiniteStreamNextBatchReady() { return cursor_->NextBatchReady(); }

bool MemorySourceNode::NextBatchReady() {
// Next batch is ready if we haven't seen an eow and if it's an infinite_stream that has batches
Expand Down
9 changes: 4 additions & 5 deletions src/carnot/exec/memory_source_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@
#include "src/common/base/base.h"
#include "src/common/base/status.h"
#include "src/table_store/schema/row_batch.h"
#include "src/table_store/table/table.h"
#include "src/table_store/table_store.h"

namespace px {
namespace carnot {
namespace exec {

using table_store::Table;
using table_store::schema::RowBatch;

class MemorySourceNode : public SourceNode {
Expand All @@ -58,11 +60,8 @@ class MemorySourceNode : public SourceNode {
// Whether this memory source will stream infinitely. Can be stopped by the
// exec_state_->keep_running() call in exec_graph.
bool infinite_stream_ = false;
// An infinite stream will set this to true once its exceeded the current data in the table, and
// then will keep checking for new data.
bool wait_for_valid_next_ = false;
table_store::BatchSlice current_batch_;
table_store::Table::StopPosition stop_;

std::unique_ptr<Table::Cursor> cursor_;

std::unique_ptr<plan::MemorySourceOperator> plan_node_;
table_store::Table* table_ = nullptr;
Expand Down
8 changes: 4 additions & 4 deletions src/carnot/exec/memory_source_node_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -398,17 +398,17 @@ TEST_F(MemorySourceNodeTest, table_compact_between_open_and_exec) {
EXPECT_OK(cpu_table_->CompactHotToCold(arrow::default_memory_pool()));

tester.GenerateNextResult().ExpectRowBatch(
RowBatchBuilder(output_rd, 1, /*eow*/ false, /*eos*/ false)
.AddColumn<types::Time64NSValue>({3})
RowBatchBuilder(output_rd, 2, /*eow*/ false, /*eos*/ false)
.AddColumn<types::Time64NSValue>({3, 5})
.get());
EXPECT_TRUE(tester.node()->HasBatchesRemaining());

// Force a second compaction to check between Exec and a subsequent Exec.
EXPECT_OK(cpu_table_->CompactHotToCold(arrow::default_memory_pool()));

tester.GenerateNextResult().ExpectRowBatch(
RowBatchBuilder(output_rd, 2, /*eow*/ true, /*eos*/ true)
.AddColumn<types::Time64NSValue>({5, 6})
RowBatchBuilder(output_rd, 1, /*eow*/ true, /*eos*/ true)
.AddColumn<types::Time64NSValue>({6})
.get());
EXPECT_FALSE(tester.node()->HasBatchesRemaining());
tester.Close();
Expand Down
1 change: 1 addition & 0 deletions src/table_store/table/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pl_cc_library(
"//src/shared/types:cc_library",
"//src/table_store/schema:cc_library",
"//src/table_store/schemapb:schema_pl_cc_proto",
"//src/table_store/table/internal:cc_library",
"@com_github_apache_arrow//:arrow",
],
)
Expand Down
Loading

0 comments on commit 480882d

Please sign in to comment.