From 480882d418629e9f3d07bad1638f3a597443cb4f Mon Sep 17 00:00:00 2001 From: James Bartlett Date: Wed, 23 Mar 2022 13:26:42 -0700 Subject: [PATCH] [table_store] Refactor Table Compaction to Reduce Memory Fragmentation. Summary: Using the new `table/internal` utilities, this diff refactors the Table implementation to always compact to the compaction size (previously, it never split hot batches into smaller batches). In addition, `BatchSlice`'s are replaced by a `Cursor` object that keeps track of where in the table a consumer is. The previous BatchSlice interface was complicated, and this new Cursor interface is much cleaner, and has the added benefit of making the refactor easier. The reduction in memory fragmentation from this change is significant. The first image below shows memory utilization statistics during the course of multiple table store compaction cycles before this change. The next image shows the same after this change. {F183775} {F183776} As seen in the images, memory utilization before the change went down to about 50% after the first two compactions. After this change, memory utilization after the first two compactions is >80%. There is a cost to this change, since writes to the table store are now considerably more expensive, previously a single write would take about 0.5us now it takes 1us. However, compaction time is reduced significantly as well. So there is an expectation with this diff of some amortized performance decrease, but the memory savings are significant enough that the minor overall performance decrease should be acceptable. Here are the benchmarks: {P182} And the p-value's for these benchmarks: {P181} Keep in mind that the p-value being low just means theres a statistically significant difference between the two, the sign of the difference has to be taken from the mean benchmarks above. Test Plan: Added more table tests (and refactored existing ones to use the new Cursor). Ran microbenchmarks as well as integrated memory study shown above. Tested on a skaffolded cluster to make sure things work as normal. Reviewers: nserrino, philkuz, vihang, zasgar Reviewed By: nserrino Signed-off-by: James Bartlett Differential Revision: https://phab.corp.pixielabs.ai/D11199 GitOrigin-RevId: 2789884fe4a244bdcba56b9379e627f9c2cb4fc8 --- src/carnot/carnot_test.cc | 10 +- src/carnot/exec/exec_graph_test.cc | 79 +- src/carnot/exec/memory_sink_node_test.cc | 13 +- src/carnot/exec/memory_source_node.cc | 68 +- src/carnot/exec/memory_source_node.h | 9 +- src/carnot/exec/memory_source_node_test.cc | 8 +- src/table_store/table/BUILD.bazel | 1 + src/table_store/table/table.cc | 903 ++++++--------------- src/table_store/table/table.h | 400 ++++----- src/table_store/table/table_benchmark.cc | 88 +- src/table_store/table/table_test.cc | 414 +++++----- 11 files changed, 742 insertions(+), 1251 deletions(-) diff --git a/src/carnot/carnot_test.cc b/src/carnot/carnot_test.cc index a70b30747d3..ad29ee66c48 100644 --- a/src/carnot/carnot_test.cc +++ b/src/carnot/carnot_test.cc @@ -352,10 +352,8 @@ TEST_F(CarnotTest, range_test_multiple_rbs) { std::vector col0_out1; std::vector col1_out1; std::vector col2_out1; - auto slice = big_table_->FirstBatch(); - auto batch = - big_table_->GetRowBatchSlice(slice, {0}, arrow::default_memory_pool()).ConsumeValueOrDie(); - + table_store::Table::Cursor cursor(big_table_.get()); + auto batch = cursor.GetNextRowBatch({0}).ConsumeValueOrDie(); for (int64_t i = 0; i < batch->ColumnAt(0)->length(); i++) { if (CarnotTestUtils::big_test_col1[i].val >= 2 && CarnotTestUtils::big_test_col1[i].val < 6) { col0_out1.emplace_back(CarnotTestUtils::big_test_col1[i].val); @@ -373,9 +371,7 @@ TEST_F(CarnotTest, range_test_multiple_rbs) { std::vector col0_out2; std::vector col1_out2; std::vector col2_out2; - auto next_batch = - big_table_->GetRowBatchSlice(big_table_->NextBatch(slice), {0}, arrow::default_memory_pool()) - .ConsumeValueOrDie(); + auto next_batch = cursor.GetNextRowBatch({0}).ConsumeValueOrDie(); for (int64_t i = batch->ColumnAt(0)->length(); i < batch->ColumnAt(0)->length() + next_batch->ColumnAt(0)->length(); i++) { if (CarnotTestUtils::big_test_col1[i].val >= start_time && diff --git a/src/carnot/exec/exec_graph_test.cc b/src/carnot/exec/exec_graph_test.cc index 72e1c04e91b..a341a033d4a 100644 --- a/src/carnot/exec/exec_graph_test.cc +++ b/src/carnot/exec/exec_graph_test.cc @@ -191,18 +191,11 @@ TEST_P(ExecGraphExecuteTest, execute) { auto output_table = exec_state_->table_store()->GetTable("output"); std::vector out_in1 = {4.8, 16.4, 26.4}; std::vector out_in2 = {14.8, 12.4}; - auto slice = output_table->FirstBatch(); - EXPECT_TRUE( - output_table->GetRowBatchSlice(slice, std::vector({0}), arrow::default_memory_pool()) - .ConsumeValueOrDie() - ->ColumnAt(0) - ->Equals(types::ToArrow(out_in1, arrow::default_memory_pool()))); - slice = output_table->NextBatch(slice); - EXPECT_TRUE( - output_table->GetRowBatchSlice(slice, std::vector({0}), arrow::default_memory_pool()) - .ConsumeValueOrDie() - ->ColumnAt(0) - ->Equals(types::ToArrow(out_in2, arrow::default_memory_pool()))); + table_store::Table::Cursor cursor(output_table); + EXPECT_TRUE(cursor.GetNextRowBatch({0}).ConsumeValueOrDie()->ColumnAt(0)->Equals( + types::ToArrow(out_in1, arrow::default_memory_pool()))); + EXPECT_TRUE(cursor.GetNextRowBatch({0}).ConsumeValueOrDie()->ColumnAt(0)->Equals( + types::ToArrow(out_in2, arrow::default_memory_pool()))); } std::vector> calls_to_execute = { @@ -279,18 +272,11 @@ TEST_F(ExecGraphTest, execute_time) { auto output_table = exec_state_->table_store()->GetTable("output"); std::vector out_in1 = {4.8, 16.4, 26.4}; std::vector out_in2 = {14.8, 12.4}; - auto slice = output_table->FirstBatch(); - EXPECT_TRUE( - output_table->GetRowBatchSlice(slice, std::vector({0}), arrow::default_memory_pool()) - .ConsumeValueOrDie() - ->ColumnAt(0) - ->Equals(types::ToArrow(out_in1, arrow::default_memory_pool()))); - slice = output_table->NextBatch(slice); - EXPECT_TRUE( - output_table->GetRowBatchSlice(slice, std::vector({0}), arrow::default_memory_pool()) - .ConsumeValueOrDie() - ->ColumnAt(0) - ->Equals(types::ToArrow(out_in2, arrow::default_memory_pool()))); + table_store::Table::Cursor cursor(output_table); + EXPECT_TRUE(cursor.GetNextRowBatch({0}).ConsumeValueOrDie()->ColumnAt(0)->Equals( + types::ToArrow(out_in1, arrow::default_memory_pool()))); + EXPECT_TRUE(cursor.GetNextRowBatch({0}).ConsumeValueOrDie()->ColumnAt(0)->Equals( + types::ToArrow(out_in2, arrow::default_memory_pool()))); } TEST_F(ExecGraphTest, two_limits_dont_interfere) { @@ -349,16 +335,11 @@ TEST_F(ExecGraphTest, two_limits_dont_interfere) { std::vector out_col1 = {1, 2}; std::vector out_col2 = {true, false}; std::vector out_col3 = {1.4, 6.2}; - auto out_rb1 = - output_table1 - ->GetRowBatchSlice(output_table1->FirstBatch(), std::vector({0, 1, 2}), - arrow::default_memory_pool()) - .ConsumeValueOrDie(); - auto out_rb2 = - output_table2 - ->GetRowBatchSlice(output_table2->FirstBatch(), std::vector({0, 1, 2}), - arrow::default_memory_pool()) - .ConsumeValueOrDie(); + table_store::Table::Cursor cursor1(output_table1); + table_store::Table::Cursor cursor2(output_table2); + + auto out_rb1 = cursor1.GetNextRowBatch(std::vector({0, 1, 2})).ConsumeValueOrDie(); + auto out_rb2 = cursor2.GetNextRowBatch(std::vector({0, 1, 2})).ConsumeValueOrDie(); EXPECT_TRUE(out_rb1->ColumnAt(0)->Equals(types::ToArrow(out_col1, arrow::default_memory_pool()))); EXPECT_TRUE(out_rb1->ColumnAt(1)->Equals(types::ToArrow(out_col2, arrow::default_memory_pool()))); EXPECT_TRUE(out_rb1->ColumnAt(2)->Equals(types::ToArrow(out_col3, arrow::default_memory_pool()))); @@ -421,10 +402,8 @@ TEST_F(ExecGraphTest, limit_w_multiple_srcs) { std::vector out_col1 = {1, 2}; std::vector out_col2 = {true, false}; std::vector out_col3 = {1.4, 6.2}; - auto out_rb = output_table - ->GetRowBatchSlice(output_table->FirstBatch(), std::vector({0, 1, 2}), - arrow::default_memory_pool()) - .ConsumeValueOrDie(); + table_store::Table::Cursor cursor(output_table); + auto out_rb = cursor.GetNextRowBatch(std::vector({0, 1, 2})).ConsumeValueOrDie(); EXPECT_TRUE(out_rb->ColumnAt(0)->Equals(types::ToArrow(out_col1, arrow::default_memory_pool()))); EXPECT_TRUE(out_rb->ColumnAt(1)->Equals(types::ToArrow(out_col2, arrow::default_memory_pool()))); EXPECT_TRUE(out_rb->ColumnAt(2)->Equals(types::ToArrow(out_col3, arrow::default_memory_pool()))); @@ -485,10 +464,8 @@ TEST_F(ExecGraphTest, two_sequential_limits) { std::vector out_col1 = {1, 2}; std::vector out_col2 = {true, false}; std::vector out_col3 = {1.4, 6.2}; - auto out_rb = output_table - ->GetRowBatchSlice(output_table->FirstBatch(), std::vector({0, 1, 2}), - arrow::default_memory_pool()) - .ConsumeValueOrDie(); + table_store::Table::Cursor cursor(output_table); + auto out_rb = cursor.GetNextRowBatch({0, 1, 2}).ConsumeValueOrDie(); EXPECT_TRUE(out_rb->ColumnAt(0)->Equals(types::ToArrow(out_col1, arrow::default_memory_pool()))); EXPECT_TRUE(out_rb->ColumnAt(1)->Equals(types::ToArrow(out_col2, arrow::default_memory_pool()))); EXPECT_TRUE(out_rb->ColumnAt(2)->Equals(types::ToArrow(out_col3, arrow::default_memory_pool()))); @@ -549,18 +526,12 @@ TEST_F(ExecGraphTest, execute_with_two_limits) { auto output_table_1 = exec_state_->table_store()->GetTable("output1"); auto output_table_2 = exec_state_->table_store()->GetTable("output2"); std::vector out_in1 = {1.4, 6.2}; - EXPECT_TRUE(output_table_1 - ->GetRowBatchSlice(output_table_1->FirstBatch(), std::vector({2}), - arrow::default_memory_pool()) - .ConsumeValueOrDie() - ->ColumnAt(0) - ->Equals(types::ToArrow(out_in1, arrow::default_memory_pool()))); - EXPECT_TRUE(output_table_2 - ->GetRowBatchSlice(output_table_2->FirstBatch(), std::vector({2}), - arrow::default_memory_pool()) - .ConsumeValueOrDie() - ->ColumnAt(0) - ->Equals(types::ToArrow(out_in1, arrow::default_memory_pool()))); + table_store::Table::Cursor cursor1(output_table_1); + EXPECT_TRUE(cursor1.GetNextRowBatch({2}).ConsumeValueOrDie()->ColumnAt(0)->Equals( + types::ToArrow(out_in1, arrow::default_memory_pool()))); + table_store::Table::Cursor cursor2(output_table_2); + EXPECT_TRUE(cursor2.GetNextRowBatch({2}).ConsumeValueOrDie()->ColumnAt(0)->Equals( + types::ToArrow(out_in1, arrow::default_memory_pool()))); } class YieldingExecGraphTest : public BaseExecGraphTest { diff --git a/src/carnot/exec/memory_sink_node_test.cc b/src/carnot/exec/memory_sink_node_test.cc index ed9d61b17ac..d0a50b4ac85 100644 --- a/src/carnot/exec/memory_sink_node_test.cc +++ b/src/carnot/exec/memory_sink_node_test.cc @@ -85,8 +85,8 @@ TEST_F(MemorySinkNodeTest, basic) { false, 0); auto table = exec_state_->table_store()->GetTable("cpu_15s"); - auto slice = table->FirstBatch(); - auto batch_or_s = table->GetRowBatchSlice(slice, {0, 1}, arrow::default_memory_pool()); + table_store::Table::Cursor cursor(table); + auto batch_or_s = cursor.GetNextRowBatch({0, 1}); EXPECT_OK(batch_or_s); auto batch = batch_or_s.ConsumeValueOrDie(); EXPECT_EQ(types::DataType::INT64, batch->desc().type(0)); @@ -103,8 +103,9 @@ TEST_F(MemorySinkNodeTest, basic) { false, 0) .Close(); - slice = table->NextBatch(slice); - batch_or_s = table->GetRowBatchSlice(slice, {0, 1}, arrow::default_memory_pool()); + // Update stop spec of the cursor to include the new row batch. + cursor.UpdateStopSpec(table_store::Table::Cursor::StopSpec{}); + batch_or_s = cursor.GetNextRowBatch({0, 1}); EXPECT_OK(batch_or_s); batch = batch_or_s.ConsumeValueOrDie(); EXPECT_TRUE(batch->ColumnAt(0)->Equals(col1_rb2_arrow)); @@ -146,8 +147,8 @@ TEST_F(MemorySinkNodeTest, zero_row_row_batch_not_eos) { .Close(); auto table = exec_state_->table_store()->GetTable("cpu_15s"); - auto slice = table->FirstBatch(); - auto batch_or_s = table->GetRowBatchSlice(slice, {0, 1}, arrow::default_memory_pool()); + table_store::Table::Cursor cursor(table); + auto batch_or_s = cursor.GetNextRowBatch({0, 1}); EXPECT_OK(batch_or_s); auto batch = batch_or_s.ConsumeValueOrDie(); EXPECT_TRUE(batch->ColumnAt(0)->Equals(col1_rb2_arrow)); diff --git a/src/carnot/exec/memory_source_node.cc b/src/carnot/exec/memory_source_node.cc index 452ae073a94..4c97ac3022c 100644 --- a/src/carnot/exec/memory_source_node.cc +++ b/src/carnot/exec/memory_source_node.cc @@ -17,6 +17,7 @@ */ #include "src/carnot/exec/memory_source_node.h" +#include "src/table_store/table/table.h" #include #include @@ -31,6 +32,9 @@ namespace px { namespace carnot { namespace exec { +using StartSpec = Table::Cursor::StartSpec; +using StopSpec = Table::Cursor::StopSpec; + std::string MemorySourceNode::DebugStringImpl() { return absl::Substitute("Exec::MemorySourceNode: ", plan_node_->TableName(), output_descriptor_->DebugString()); @@ -57,21 +61,25 @@ Status MemorySourceNode::OpenImpl(ExecState* exec_state) { return error::NotFound("Table '$0' not found", plan_node_->TableName()); } + StartSpec start_spec; if (plan_node_->HasStartTime()) { - PL_ASSIGN_OR_RETURN(current_batch_, table_->FindBatchSliceGreaterThanOrEqual( - plan_node_->start_time(), exec_state->exec_mem_pool())); + start_spec.type = StartSpec::StartType::StartAtTime; + start_spec.start_time = plan_node_->start_time(); } else { - current_batch_ = table_->FirstBatch(); + start_spec.type = StartSpec::StartType::CurrentStartOfTable; } + StopSpec stop_spec; if (plan_node_->HasStopTime()) { - PL_ASSIGN_OR_RETURN(stop_, table_->FindStopPositionForTime(plan_node_->stop_time(), - exec_state->exec_mem_pool())); + stop_spec.type = StopSpec::StopType::StopAtTime; + stop_spec.stop_time = plan_node_->stop_time(); + } else if (infinite_stream_) { + stop_spec.type = StopSpec::StopType::Infinite; } else { // Determine table_end at Open() time because Stirling may be pushing to the table - stop_ = table_->End(); + stop_spec.type = StopSpec::StopType::CurrentEndOfTable; } - current_batch_ = table_->SliceIfPastStop(current_batch_, stop_); + cursor_ = std::make_unique(table_, start_spec, stop_spec); return Status::OK(); } @@ -81,44 +89,28 @@ Status MemorySourceNode::CloseImpl(ExecState*) { return Status::OK(); } -StatusOr> MemorySourceNode::GetNextRowBatch(ExecState* exec_state) { +StatusOr> MemorySourceNode::GetNextRowBatch(ExecState*) { DCHECK(table_ != nullptr); - if (infinite_stream_ && wait_for_valid_next_) { - // If it's an infinite_stream that has read out all the current data in the table, we have to - // keep around the last batch the infinite stream output and keep checking if the next batch - // after that is valid so that when stirling writes more data we are able to access it. - stop_ = table_->End(); - auto next_batch = table_->NextBatch(current_batch_, stop_); - if (!next_batch.IsValid()) { - return RowBatch::WithZeroRows(*output_descriptor_, /* eow */ false, /* eos */ false); - } - current_batch_ = next_batch; - wait_for_valid_next_ = false; - } - - if (!current_batch_.IsValid()) { - return RowBatch::WithZeroRows(*output_descriptor_, /* eow */ !infinite_stream_, - /* eos */ !infinite_stream_); + if (!cursor_->NextBatchReady()) { + // If the NextBatch is not ready, but the cursor is not yet exhausted, then we need to output + // 0-row row batches, while we wait for more data to be added. This currently only occurs in the + // case of an infinite stream. In the future, it should also occur when a stop time is set in + // the future, but this is not yet supported by Table. + // If the cursor is exhausted, then we return a 0-row row batch with eow=eos=true. + return RowBatch::WithZeroRows(*output_descriptor_, /* eow */ cursor_->Done(), + /* eos */ cursor_->Done()); } - PL_ASSIGN_OR_RETURN( - auto row_batch, - table_->GetRowBatchSlice(current_batch_, plan_node_->Columns(), exec_state->exec_mem_pool())); + PL_ASSIGN_OR_RETURN(auto row_batch, cursor_->GetNextRowBatch(plan_node_->Columns())); rows_processed_ += row_batch->num_rows(); bytes_processed_ += row_batch->NumBytes(); - auto next_batch = table_->NextBatch(current_batch_, stop_); - if (infinite_stream_ && !next_batch.IsValid()) { - wait_for_valid_next_ = true; - } else { - current_batch_ = next_batch; - } // If infinite stream is set, we don't send Eow or Eos. Infinite streams therefore never cause // HasBatchesRemaining to be false. Instead the outer loop that calls GenerateNext() is // responsible for managing whether we continue the stream or end it. - if (!current_batch_.IsValid() && !infinite_stream_) { + if (cursor_->Done() && !infinite_stream_) { row_batch->set_eow(true); row_batch->set_eos(true); } @@ -131,13 +123,7 @@ Status MemorySourceNode::GenerateNextImpl(ExecState* exec_state) { return Status::OK(); } -bool MemorySourceNode::InfiniteStreamNextBatchReady() { - if (!wait_for_valid_next_) { - return current_batch_.IsValid(); - } - auto next_batch = table_->NextBatch(current_batch_); - return next_batch.IsValid(); -} +bool MemorySourceNode::InfiniteStreamNextBatchReady() { return cursor_->NextBatchReady(); } bool MemorySourceNode::NextBatchReady() { // Next batch is ready if we haven't seen an eow and if it's an infinite_stream that has batches diff --git a/src/carnot/exec/memory_source_node.h b/src/carnot/exec/memory_source_node.h index fb5d5d6929d..174dd525317 100644 --- a/src/carnot/exec/memory_source_node.h +++ b/src/carnot/exec/memory_source_node.h @@ -29,12 +29,14 @@ #include "src/common/base/base.h" #include "src/common/base/status.h" #include "src/table_store/schema/row_batch.h" +#include "src/table_store/table/table.h" #include "src/table_store/table_store.h" namespace px { namespace carnot { namespace exec { +using table_store::Table; using table_store::schema::RowBatch; class MemorySourceNode : public SourceNode { @@ -58,11 +60,8 @@ class MemorySourceNode : public SourceNode { // Whether this memory source will stream infinitely. Can be stopped by the // exec_state_->keep_running() call in exec_graph. bool infinite_stream_ = false; - // An infinite stream will set this to true once its exceeded the current data in the table, and - // then will keep checking for new data. - bool wait_for_valid_next_ = false; - table_store::BatchSlice current_batch_; - table_store::Table::StopPosition stop_; + + std::unique_ptr cursor_; std::unique_ptr plan_node_; table_store::Table* table_ = nullptr; diff --git a/src/carnot/exec/memory_source_node_test.cc b/src/carnot/exec/memory_source_node_test.cc index 7fd10840a4d..91026486378 100644 --- a/src/carnot/exec/memory_source_node_test.cc +++ b/src/carnot/exec/memory_source_node_test.cc @@ -398,8 +398,8 @@ TEST_F(MemorySourceNodeTest, table_compact_between_open_and_exec) { EXPECT_OK(cpu_table_->CompactHotToCold(arrow::default_memory_pool())); tester.GenerateNextResult().ExpectRowBatch( - RowBatchBuilder(output_rd, 1, /*eow*/ false, /*eos*/ false) - .AddColumn({3}) + RowBatchBuilder(output_rd, 2, /*eow*/ false, /*eos*/ false) + .AddColumn({3, 5}) .get()); EXPECT_TRUE(tester.node()->HasBatchesRemaining()); @@ -407,8 +407,8 @@ TEST_F(MemorySourceNodeTest, table_compact_between_open_and_exec) { EXPECT_OK(cpu_table_->CompactHotToCold(arrow::default_memory_pool())); tester.GenerateNextResult().ExpectRowBatch( - RowBatchBuilder(output_rd, 2, /*eow*/ true, /*eos*/ true) - .AddColumn({5, 6}) + RowBatchBuilder(output_rd, 1, /*eow*/ true, /*eos*/ true) + .AddColumn({6}) .get()); EXPECT_FALSE(tester.node()->HasBatchesRemaining()); tester.Close(); diff --git a/src/table_store/table/BUILD.bazel b/src/table_store/table/BUILD.bazel index 113c6cdcde8..8e3b7195496 100644 --- a/src/table_store/table/BUILD.bazel +++ b/src/table_store/table/BUILD.bazel @@ -33,6 +33,7 @@ pl_cc_library( "//src/shared/types:cc_library", "//src/table_store/schema:cc_library", "//src/table_store/schemapb:schema_pl_cc_proto", + "//src/table_store/table/internal:cc_library", "@com_github_apache_arrow//:arrow", ], ) diff --git a/src/table_store/table/table.cc b/src/table_store/table/table.cc index e53c55a1548..395d829df9f 100644 --- a/src/table_store/table/table.cc +++ b/src/table_store/table/table.cc @@ -21,16 +21,21 @@ #include #include #include +#include #include #include #include #include +#include "internal/store_with_row_accounting.h" #include "src/common/base/base.h" #include "src/common/base/status.h" #include "src/shared/types/arrow_adapter.h" #include "src/shared/types/type_utils.h" #include "src/table_store/schema/relation.h" +#include "src/table_store/table/internal/batch_size_accountant.h" +#include "src/table_store/table/internal/record_or_row_batch.h" +#include "src/table_store/table/internal/types.h" #include "src/table_store/table/table.h" // Note: this value is not used in most cases. @@ -43,61 +48,107 @@ DEFINE_int32(table_store_table_size_limit, namespace px { namespace table_store { -ArrowArrayCompactor::ArrowArrayCompactor(const schema::Relation& rel, arrow::MemoryPool* mem_pool) - : output_columns_(rel.NumColumns()), column_types_(rel.col_types()) { - for (auto col_type : column_types_) { - builders_.push_back(types::MakeArrowBuilder(col_type, mem_pool)); +Table::Cursor::Cursor(const Table* table, StartSpec start, StopSpec stop) + : table_(table), hints_(internal::BatchHints{}) { + AdvanceToStart(start); + StopStateFromSpec(std::move(stop)); +} + +void Table::Cursor::AdvanceToStart(const StartSpec& start) { + switch (start.type) { + case StartSpec::StartType::StartAtTime: { + last_read_row_id_ = table_->FindRowIDFromTimeFirstGreaterThanOrEqual(start.start_time) - 1; + break; + } + case StartSpec::StartType::CurrentStartOfTable: { + if (table_->FirstRowID() == -1) { + last_read_row_id_ = -1; + } else { + last_read_row_id_ = table_->FirstRowID() - 1; + } + break; + } } } -template <> -Status ArrowArrayCompactor::AppendColumnTyped( - int64_t col_idx, std::shared_ptr arr) { - auto builder_untyped = builders_[col_idx].get(); - auto builder = static_cast(builder_untyped); - auto typed_arr = std::static_pointer_cast(arr); - auto size = types::GetArrowArrayBytes(typed_arr.get()); - PL_RETURN_IF_ERROR(builder->Reserve(typed_arr->length())); - PL_RETURN_IF_ERROR(builder->ReserveData(size)); - for (int i = 0; i < typed_arr->length(); ++i) { - builder->UnsafeAppend(typed_arr->GetString(i)); - } - bytes_ += size; - return Status::OK(); +void Table::Cursor::StopStateFromSpec(StopSpec&& stop) { + stop_.spec = std::move(stop); + switch (stop_.spec.type) { + case StopSpec::StopType::CurrentEndOfTable: { + if (table_->LastRowID() == -1) { + stop_.stop_row_id = -1; + } else { + stop_.stop_row_id = table_->LastRowID() + 1; + } + break; + } + case StopSpec::StopType::StopAtTime: { + // TODO(james): this needs to be changed to support stopping at the provided time regardless + // of what's currently in the table. + stop_.stop_row_id = table_->FindRowIDFromTimeFirstGreaterThan(stop_.spec.stop_time); + break; + } + default: + // Ignore StopType::Infinte, because it doesn't require stop_row_id. + break; + } } -Status ArrowArrayCompactor::AppendColumn(int64_t col_idx, std::shared_ptr arr) { -#define TYPE_CASE(_dt_) PL_RETURN_IF_ERROR(AppendColumnTyped<_dt_>(col_idx, arr)); - PL_SWITCH_FOREACH_DATATYPE(column_types_[col_idx], TYPE_CASE); -#undef TYPE_CASE - return Status::OK(); +bool Table::Cursor::NextBatchReady() { + // TODO(james): this needs to be changed to support stopping at the provided time regardless + // of what's currently in the table. + if (stop_.spec.type != StopSpec::StopType::Infinite) { + return !Done(); + } + return table_->LastRowID() > last_read_row_id_; } -Status ArrowArrayCompactor::Finish() { - for (const auto& [col_idx, col_type] : Enumerate(column_types_)) { -#define TYPE_CASE(_dt_) PL_RETURN_IF_ERROR(FinishTyped<_dt_>(col_idx)); - PL_SWITCH_FOREACH_DATATYPE(col_type, TYPE_CASE); -#undef TYPE_CASE +bool Table::Cursor::Done() { + if (stop_.spec.type == StopSpec::StopType::Infinite) { + return false; } - return Status::OK(); + auto next_row_id = last_read_row_id_ + 1; + return next_row_id >= stop_.stop_row_id; +} + +void Table::Cursor::UpdateStopSpec(Cursor::StopSpec stop) { StopStateFromSpec(std::move(stop)); } + +internal::RowID* Table::Cursor::LastReadRowID() { return &last_read_row_id_; } + +internal::BatchHints* Table::Cursor::Hints() { return &hints_; } + +std::optional Table::Cursor::StopRowID() const { + if (stop_.spec.type == StopSpec::StopType::Infinite) { + return std::nullopt; + } + return stop_.stop_row_id; +} + +StatusOr> Table::Cursor::GetNextRowBatch( + const std::vector& cols) { + return table_->GetNextRowBatch(this, cols); } Table::Table(std::string_view table_name, const schema::Relation& relation, size_t max_table_size, - size_t min_cold_batch_size) + size_t compacted_batch_size) : metrics_(&(GetMetricsRegistry()), std::string(table_name)), rel_(relation), max_table_size_(max_table_size), - min_cold_batch_size_(min_cold_batch_size), - ring_capacity_(max_table_size / min_cold_batch_size) { - absl::MutexLock gen_lock(&generation_lock_); - absl::MutexLock cold_lock(&cold_lock_); - absl::MutexLock hot_lock(&hot_lock_); + compacted_batch_size_(compacted_batch_size), + // TODO(james): move mem_pool into constructor. + compactor_(rel_, arrow::default_memory_pool()) { + absl::base_internal::SpinLockHolder cold_lock(&cold_lock_); + absl::base_internal::SpinLockHolder hot_lock(&hot_lock_); for (const auto& [i, col_name] : Enumerate(rel_.col_names())) { if (col_name == "time_" && rel_.GetColumnType(i) == types::DataType::TIME64NS) { time_col_idx_ = i; } - cold_column_buffers_.emplace_back(ring_capacity_); } + batch_size_accountant_ = internal::BatchSizeAccountant::Create(rel_, compacted_batch_size_); + hot_store_ = std::make_unique>( + rel_, time_col_idx_); + cold_store_ = std::make_unique>( + rel_, time_col_idx_); } Status Table::ToProto(table_store::schemapb::Table* table_proto) const { @@ -107,11 +158,10 @@ Status Table::ToProto(table_store::schemapb::Table* table_proto) const { col_selector.push_back(i); } - for (auto it = FirstBatch(); it.IsValid(); it = NextBatch(it)) { - PL_ASSIGN_OR_RETURN(auto cur_rb, - GetRowBatchSlice(it, col_selector, arrow::default_memory_pool())); - auto next = NextBatch(it); - auto eos = !next.IsValid(); + Cursor cursor(this); + while (!cursor.Done()) { + PL_ASSIGN_OR_RETURN(auto cur_rb, cursor.GetNextRowBatch(col_selector)); + auto eos = cursor.Done(); cur_rb->set_eow(eos); cur_rb->set_eos(eos); PL_RETURN_IF_ERROR(cur_rb->ToProto(table_proto->add_row_batches())); @@ -121,21 +171,32 @@ Status Table::ToProto(table_store::schemapb::Table* table_proto) const { return Status::OK(); } -StatusOr> Table::GetRowBatchSlice( - const BatchSlice& slice, const std::vector& cols, arrow::MemoryPool* mem_pool) const { - if (!slice.IsValid()) - return error::InvalidArgument("GetRowBatchSlice called on invalid BatchSlice"); - // Get column types for row descriptor. - std::vector rb_types; - for (int64_t col_idx : cols) { - DCHECK(static_cast(col_idx) < rel_.NumColumns()); - rb_types.push_back(rel_.col_types()[col_idx]); - } - - auto batch_size = slice.Size(); - auto output_rb = std::make_unique(schema::RowDescriptor(rb_types), batch_size); - PL_RETURN_IF_ERROR(AddBatchSliceToRowBatch(slice, cols, output_rb.get(), mem_pool)); - return output_rb; +StatusOr> Table::GetNextRowBatch( + Cursor* cursor, const std::vector& cols) const { + DCHECK(!cursor->Done()) << "Calling GetNextRowBatch on an exhausted Cursor"; + absl::base_internal::SpinLockHolder cold_lock(&cold_lock_); + PL_ASSIGN_OR_RETURN(auto rb, + cold_store_->GetNextRowBatch(cursor->LastReadRowID(), cursor->Hints(), + cursor->StopRowID(), cols)); + if (rb == nullptr) { + absl::base_internal::SpinLockHolder hot_lock(&hot_lock_); + PL_ASSIGN_OR_RETURN(rb, hot_store_->GetNextRowBatch(cursor->LastReadRowID(), cursor->Hints(), + cursor->StopRowID(), cols)); + if (rb == nullptr && hot_store_->Size() > 0) { + // If the cursor was pointing to an expired row batch, update the cursor to point to the start + // of the table, then try to get the next row batch. + *cursor->LastReadRowID() = hot_store_->FirstRowID() - 1; + if (!cursor->Done()) { + PL_ASSIGN_OR_RETURN(rb, + hot_store_->GetNextRowBatch(cursor->LastReadRowID(), cursor->Hints(), + cursor->StopRowID(), cols)); + } + } + } + if (rb == nullptr) { + return error::InvalidArgument("Data after Cursor is not in the table."); + } + return rb; } Status Table::ExpireRowBatches(int64_t row_batch_size) { @@ -145,15 +206,18 @@ Status Table::ExpireRowBatches(int64_t row_batch_size) { } int64_t bytes; { - absl::base_internal::SpinLockHolder lock(&stats_lock_); - bytes = cold_bytes_ + hot_bytes_; + absl::base_internal::SpinLockHolder hot_lock(&hot_lock_); + bytes = batch_size_accountant_->HotBytes() + batch_size_accountant_->ColdBytes(); } while (bytes + row_batch_size > max_table_size_) { PL_RETURN_IF_ERROR(ExpireBatch()); + { + absl::base_internal::SpinLockHolder hot_lock(&hot_lock_); + bytes = batch_size_accountant_->HotBytes() + batch_size_accountant_->ColdBytes(); + } { absl::base_internal::SpinLockHolder lock(&stats_lock_); batches_expired_++; - bytes = cold_bytes_ + hot_bytes_; } } return Status::OK(); @@ -164,30 +228,10 @@ Status Table::WriteRowBatch(const schema::RowBatch& rb) { if (rb.num_columns() == 0 || rb.ColumnAt(0)->length() == 0) { return Status::OK(); } - int64_t rb_bytes = 0; - // Check for matching types - auto received_num_columns = rb.num_columns(); - auto expected_num_columns = rel_.NumColumns(); - CHECK_EQ(expected_num_columns, received_num_columns) - << absl::StrFormat("Table schema mismatch: expected=%u received=%u)", expected_num_columns, - received_num_columns); - - for (int i = 0; i < rb.num_columns(); ++i) { - auto received_type = rb.desc().type(i); - auto expected_type = rel_.col_types().at(i); - DCHECK_EQ(expected_type, received_type) - << absl::StrFormat("Type mismatch [column=%u]: expected=%s received=%s", i, - ToString(expected_type), ToString(received_type)); -#define TYPE_CASE(_dt_) rb_bytes += types::GetArrowArrayBytes<_dt_>(rb.columns().at(i).get()); - PL_SWITCH_FOREACH_DATATYPE(received_type, TYPE_CASE); -#undef TYPE_CASE - } - - PL_RETURN_IF_ERROR(ExpireRowBatches(rb_bytes)); - PL_RETURN_IF_ERROR(WriteHot(rb)); - absl::base_internal::SpinLockHolder lock(&stats_lock_); - hot_bytes_ += rb_bytes; - ++batches_added_; + + internal::RecordOrRowBatch record_or_row_batch(rb); + + PL_RETURN_IF_ERROR(WriteHot(std::move(record_or_row_batch))); return Status::OK(); } @@ -198,122 +242,106 @@ Status Table::TransferRecordBatch( return Status::OK(); } - // Check for matching types - auto received_num_columns = record_batch->size(); - auto expected_num_columns = rel_.NumColumns(); - CHECK_EQ(expected_num_columns, received_num_columns) - << absl::StrFormat("Table schema mismatch: expected=%u received=%u)", expected_num_columns, - received_num_columns); + auto record_batch_w_cache = internal::RecordBatchWithCache{ + std::move(record_batch), + std::vector(rel_.NumColumns()), + std::vector(rel_.NumColumns(), false), + }; + internal::RecordOrRowBatch record_or_row_batch(std::move(record_batch_w_cache)); + + PL_RETURN_IF_ERROR(WriteHot(std::move(record_or_row_batch))); + return Status::OK(); +} - uint32_t i = 0; - int64_t rb_bytes = 0; - for (const auto& col : *record_batch) { - auto received_type = col->data_type(); - auto expected_type = rel_.col_types().at(i); - DCHECK_EQ(expected_type, received_type) - << absl::StrFormat("Type mismatch [column=%u]: expected=%s received=%s", i, - ToString(expected_type), ToString(received_type)); - rb_bytes += col->Bytes(); - ++i; - } +Status Table::WriteHot(internal::RecordOrRowBatch&& record_or_row_batch) { + // See BatchSizeAccountantNonMutableState for an explanation of the thread safety and necessity of + // NonMutableState. + auto batch_stats = internal::BatchSizeAccountant::CalcBatchStats( + ABSL_TS_UNCHECKED_READ(batch_size_accountant_)->NonMutableState(), record_or_row_batch); + + PL_RETURN_IF_ERROR(ExpireRowBatches(batch_stats.bytes)); - PL_RETURN_IF_ERROR(ExpireRowBatches(rb_bytes)); - PL_RETURN_IF_ERROR(WriteHot(std::move(record_batch))); + absl::base_internal::SpinLockHolder hot_lock(&hot_lock_); + auto batch_length = record_or_row_batch.Length(); + batch_size_accountant_->NewHotBatch(std::move(batch_stats)); + hot_store_->EmplaceBack(next_row_id_, std::move(record_or_row_batch)); + next_row_id_ += batch_length; absl::base_internal::SpinLockHolder lock(&stats_lock_); - hot_bytes_ += rb_bytes; ++batches_added_; - return Status::OK(); } -static inline bool IntervalComparatorLowerBound(const std::pair interval, - int64_t val) { - return interval.second < val; +Table::RowID Table::FirstRowID() const { + absl::base_internal::SpinLockHolder cold_lock(&cold_lock_); + if (cold_store_->Size() > 0) { + return cold_store_->FirstRowID(); + } + absl::base_internal::SpinLockHolder hot_lock(&hot_lock_); + if (hot_store_->Size() > 0) { + return hot_store_->FirstRowID(); + } + return -1; } -static inline bool IntervalComparatorUpperBound(int64_t val, - const std::pair interval) { - return val < interval.first; +Table::RowID Table::LastRowID() const { + absl::base_internal::SpinLockHolder cold_lock(&cold_lock_); + absl::base_internal::SpinLockHolder hot_lock(&hot_lock_); + if (hot_store_->Size() > 0) { + return hot_store_->LastRowID(); + } + if (cold_store_->Size() > 0) { + return cold_store_->LastRowID(); + } + return -1; } -StatusOr Table::FindBatchSliceGreaterThanOrEqual(int64_t time, - arrow::MemoryPool* mem_pool) const { - if (time_col_idx_ == -1) { - return error::InvalidArgument( - "Cannot call FindBatchSliceGreaterThanOrEqual on table without a time column."); +Table::RowID Table::FindRowIDFromTimeFirstGreaterThanOrEqual(Time time) const { + absl::base_internal::SpinLockHolder cold_lock(&cold_lock_); + auto optional_row_id = cold_store_->FindRowIDFromTimeFirstGreaterThanOrEqual(time); + if (optional_row_id.has_value()) { + return optional_row_id.value(); } - absl::MutexLock gen_lock(&generation_lock_); - { - absl::MutexLock cold_lock(&cold_lock_); - auto it = - std::lower_bound(cold_time_.begin(), cold_time_.end(), time, IntervalComparatorLowerBound); - if (it != cold_time_.end()) { - auto index = std::distance(cold_time_.begin(), it); - auto ring_index = RingIndexUnlocked(index); - auto time_col = cold_column_buffers_[time_col_idx_][ring_index]; - auto row_offset = types::SearchArrowArrayGreaterThanOrEqual( - time_col.get(), time); - auto row_ids = cold_row_ids_[index]; - return BatchSlice::Cold(ring_index, row_offset, time_col->length() - 1, generation_, - row_ids.first + row_offset, row_ids.second); - } + absl::base_internal::SpinLockHolder hot_lock(&hot_lock_); + optional_row_id = hot_store_->FindRowIDFromTimeFirstGreaterThanOrEqual(time); + if (optional_row_id.has_value()) { + return optional_row_id.value(); } - // If the time wasn't found in the cold batches, we look in the hot batches. - absl::MutexLock hot_lock(&hot_lock_); - auto it = - std::lower_bound(hot_time_.begin(), hot_time_.end(), time, IntervalComparatorLowerBound); - if (it == hot_time_.end()) { - return BatchSlice::Invalid(); - } - auto index = std::distance(hot_time_.begin(), it); - - ArrowArrayPtr time_col; - if (std::holds_alternative(hot_batches_[index])) { - auto record_batch_ptr = std::get_if(&hot_batches_[index]); - time_col = GetHotColumnUnlocked(record_batch_ptr, time_col_idx_, mem_pool); - } else { - auto row_batch = std::get(hot_batches_[index]); - time_col = row_batch.columns().at(time_col_idx_); - } - - auto row_offset = - types::SearchArrowArrayGreaterThanOrEqual(time_col.get(), time); - auto row_ids = hot_row_ids_[index]; - return BatchSlice::Hot(index, row_offset, time_col->length() - 1, generation_, - row_ids.first + row_offset, row_ids.second); + return next_row_id_; } -StatusOr Table::FindStopPositionForTime(int64_t time, - arrow::MemoryPool* mem_pool) const { - if (time_col_idx_ == -1) { - return error::InvalidArgument( - "Cannot call FindStopPositionForTime on table without a time column."); +Table::RowID Table::FindRowIDFromTimeFirstGreaterThan(Time time) const { + absl::base_internal::SpinLockHolder cold_lock(&cold_lock_); + auto optional_row_id = cold_store_->FindRowIDFromTimeFirstGreaterThan(time); + if (optional_row_id.has_value()) { + return optional_row_id.value(); } - auto stop = FindStopTime(time, mem_pool); - if (stop == -1) { - // If all the data is after the stop time then we return the first unique row identifier in the - // table, which will cause no results to be returned. - return FirstBatch().uniq_row_start_idx; + absl::base_internal::SpinLockHolder hot_lock(&hot_lock_); + optional_row_id = hot_store_->FindRowIDFromTimeFirstGreaterThan(time); + if (optional_row_id.has_value()) { + return optional_row_id.value(); } - return stop + 1; + return next_row_id_; } schema::Relation Table::GetRelation() const { return rel_; } TableStats Table::GetTableStats() const { TableStats info; - auto num_batches = NumBatches(); int64_t min_time = -1; + int64_t num_batches = 0; + int64_t hot_bytes = 0; + int64_t cold_bytes = 0; { - absl::MutexLock cold_lock(&cold_lock_); - if (time_col_idx_ != -1 && !cold_time_.empty()) { - min_time = cold_time_[0].first; - } else { - absl::MutexLock hot_lock(&hot_lock_); - if (time_col_idx_ != -1 && !hot_time_.empty()) { - min_time = hot_time_[0].first; - } + absl::base_internal::SpinLockHolder cold_lock(&cold_lock_); + min_time = cold_store_->MinTime(); + num_batches += cold_store_->Size(); + absl::base_internal::SpinLockHolder hot_lock(&hot_lock_); + num_batches += hot_store_->Size(); + hot_bytes = batch_size_accountant_->HotBytes(); + cold_bytes = batch_size_accountant_->ColdBytes(); + if (min_time == -1) { + min_time = hot_store_->MinTime(); } } absl::base_internal::SpinLockHolder lock(&stats_lock_); @@ -321,8 +349,8 @@ TableStats Table::GetTableStats() const { info.batches_added = batches_added_; info.batches_expired = batches_expired_; info.num_batches = num_batches; - info.bytes = hot_bytes_ + cold_bytes_; - info.cold_bytes = cold_bytes_; + info.bytes = hot_bytes + cold_bytes; + info.cold_bytes = cold_bytes; info.compacted_batches = compacted_batches_; info.max_table_size = max_table_size_; info.min_time = min_time; @@ -330,209 +358,78 @@ TableStats Table::GetTableStats() const { return info; } -Status Table::UpdateTimeRowIndices(types::ColumnWrapperRecordBatch* record_batch) { - auto batch_length = record_batch->at(0)->Size(); - DCHECK_GT(batch_length, 0); - if (time_col_idx_ != -1) { - auto first_time = record_batch->at(time_col_idx_)->Get(0); - auto last_time = record_batch->at(time_col_idx_)->Get(batch_length - 1); - hot_time_.emplace_back(first_time.val, last_time.val); - } - auto first_row_id = next_row_id_; - next_row_id_ += batch_length; - hot_row_ids_.emplace_back(first_row_id, next_row_id_ - 1); - return Status::OK(); -} +Status Table::CompactSingleBatchUnlocked(arrow::MemoryPool*) { + const auto& compaction_spec = batch_size_accountant_->GetNextCompactedBatchSpec(); -Status Table::WriteHot(RecordBatchPtr record_batch) { - absl::MutexLock hot_lock(&hot_lock_); - PL_RETURN_IF_ERROR(UpdateTimeRowIndices(record_batch.get())); - auto rb = RecordBatchWithCache{ - std::move(record_batch), - std::vector(rel_.NumColumns()), - std::vector(rel_.NumColumns(), false), - }; - hot_batches_.emplace_back(std::move(rb)); - return Status::OK(); -} + PL_RETURN_IF_ERROR( + compactor_.Reserve(compaction_spec.num_rows, compaction_spec.variable_col_bytes)); -Status Table::UpdateTimeRowIndices(const schema::RowBatch& rb) { - auto batch_length = rb.ColumnAt(0)->length(); - DCHECK_GT(batch_length, 0); - if (time_col_idx_ != -1) { - auto time_col = rb.ColumnAt(time_col_idx_); - auto first_time = types::GetValueFromArrowArray(time_col.get(), 0); - auto last_time = - types::GetValueFromArrowArray(time_col.get(), batch_length - 1); - hot_time_.emplace_back(first_time, last_time); - } - auto first_row_id = next_row_id_; - next_row_id_ += batch_length; - hot_row_ids_.emplace_back(first_row_id, next_row_id_ - 1); - return Status::OK(); -} - -Status Table::WriteHot(const schema::RowBatch& rb) { - absl::MutexLock hot_lock(&hot_lock_); - PL_RETURN_IF_ERROR(UpdateTimeRowIndices(rb)); - hot_batches_.emplace_back(rb); - return Status::OK(); -} + RowID first_row_id = -1; + for (auto hot_slice : compaction_spec.hot_slices) { + if (first_row_id == -1) { + first_row_id = hot_store_->FirstRowID() + hot_slice.start_row; + } -Status Table::CompactSingleBatch(arrow::MemoryPool* mem_pool) { - ArrowArrayCompactor builder(rel_, mem_pool); - int64_t first_time = -1; - int64_t last_time = -1; - int64_t first_row_id = -1; - int64_t last_row_id = -1; - absl::MutexLock gen_lock(&generation_lock_); - // We first get the necessary batches to compact from hot storage. Then we compact those batches - // into one batch. Then we push that batch into cold storage. - { - absl::MutexLock hot_lock(&hot_lock_); - for (auto it = hot_batches_.begin(); it != hot_batches_.end();) { - if (builder.Size() >= min_cold_batch_size_) { - break; - } - std::vector> column_arrays; - if (std::holds_alternative(*it)) { - auto record_batch_ptr = std::get_if(&(*it)); - for (int64_t col_idx = 0; col_idx < static_cast(rel_.NumColumns()); ++col_idx) { - if (record_batch_ptr->cache_validity[col_idx]) { - PL_RETURN_IF_ERROR( - builder.AppendColumn(col_idx, record_batch_ptr->arrow_cache[col_idx])); - } else { - PL_RETURN_IF_ERROR(builder.AppendColumn( - col_idx, record_batch_ptr->record_batch->at(col_idx)->ConvertToArrow(mem_pool))); - } - } - } else { - auto row_batch = std::get(*it); - for (auto [col_idx, col] : Enumerate(row_batch.columns())) { - PL_RETURN_IF_ERROR(builder.AppendColumn(col_idx, col)); - } - } - auto row_ids = hot_row_ids_.front(); - if (first_row_id == -1) { - first_row_id = row_ids.first; - } - last_row_id = row_ids.second; - - it = hot_batches_.erase(it); - hot_row_ids_.pop_front(); - - if (time_col_idx_ != -1) { - auto times = hot_time_.front(); - if (first_time == -1) { - first_time = times.first; - } - last_time = times.second; - hot_time_.pop_front(); - } + compactor_.UnsafeAppendBatchSlice(hot_store_->front(), hot_slice.start_row, hot_slice.end_row); + if (hot_slice.last_slice_for_batch) { + hot_store_->PopFront(); } } - PL_RETURN_IF_ERROR(builder.Finish()); - { - absl::MutexLock cold_lock(&cold_lock_); - PL_RETURN_IF_ERROR(AdvanceRingBufferUnlocked()); - for (const auto& [col_idx, col] : Enumerate(builder.output_columns())) { - cold_column_buffers_[col_idx][ring_back_idx_] = col; - } - cold_row_ids_.emplace_back(first_row_id, last_row_id); - if (time_col_idx_ != -1) { - cold_time_.emplace_back(first_time, last_time); - } + + PL_ASSIGN_OR_RETURN(std::vector out_columns, compactor_.Finish()); + + cold_store_->EmplaceBack(first_row_id, out_columns); + + auto num_rows_to_remove = batch_size_accountant_->FinishCompactedBatch(); + if (num_rows_to_remove > 0) { + hot_store_->RemovePrefix(num_rows_to_remove); } + { absl::base_internal::SpinLockHolder stat_lock(&stats_lock_); - hot_bytes_ -= builder.Size(); - cold_bytes_ += builder.Size(); compacted_batches_++; } - generation_++; return Status::OK(); } Status Table::CompactHotToCold(arrow::MemoryPool* mem_pool) { - for (size_t i = 0; i < kMaxBatchesPerCompactionCall; ++i) { - { - absl::base_internal::SpinLockHolder stats_lock(&stats_lock_); - if (hot_bytes_ < min_cold_batch_size_) { - return Status::OK(); - } + bool next_ready = false; + { + absl::base_internal::SpinLockHolder hot_lock(&hot_lock_); + next_ready = batch_size_accountant_->CompactedBatchReady(); + } + while (next_ready) { + absl::base_internal::SpinLockHolder cold_lock(&cold_lock_); + absl::base_internal::SpinLockHolder hot_lock(&hot_lock_); + // We have to check CompactedBatchReady() again, in case hot batches were expired since the last + // check. + if (!batch_size_accountant_->CompactedBatchReady()) { + break; } - PL_RETURN_IF_ERROR(CompactSingleBatch(mem_pool)); + PL_RETURN_IF_ERROR(CompactSingleBatchUnlocked(mem_pool)); + next_ready = batch_size_accountant_->CompactedBatchReady(); } - return Status::OK(); } StatusOr Table::ExpireCold() { - int64_t rb_bytes = 0; - { - absl::MutexLock gen_lock(&generation_lock_); - absl::MutexLock cold_lock(&cold_lock_); - if (RingSizeUnlocked() == 0) { - return false; - } - cold_row_ids_.pop_front(); - if (time_col_idx_ != -1) cold_time_.pop_front(); - - for (size_t col_idx = 0; col_idx < rel_.NumColumns(); col_idx++) { -#define TYPE_CASE(_dt_) \ - rb_bytes += types::GetArrowArrayBytes<_dt_>(cold_column_buffers_[col_idx][ring_front_idx_].get()); - PL_SWITCH_FOREACH_DATATYPE(rel_.GetColumnType(col_idx), TYPE_CASE); -#undef TYPE_CASE - cold_column_buffers_[col_idx][ring_front_idx_].reset(); - } - if (ring_front_idx_ == ring_back_idx_) { - // The batch we are expiring is the last batch in the ring buffer, so we reset the indices. - ring_front_idx_ = 0; - ring_back_idx_ = -1; - } else { - ring_front_idx_ = (ring_front_idx_ + 1) % ring_capacity_; - } - generation_++; + absl::base_internal::SpinLockHolder cold_lock(&cold_lock_); + if (cold_store_->Size() == 0) { + return false; } - absl::base_internal::SpinLockHolder lock(&stats_lock_); - cold_bytes_ -= rb_bytes; + cold_store_->PopFront(); + absl::base_internal::SpinLockHolder hot_lock(&hot_lock_); + batch_size_accountant_->ExpireColdBatch(); return true; } Status Table::ExpireHot() { - RecordOrRowBatch record_or_row_batch; - { - absl::MutexLock gen_lock(&generation_lock_); - absl::MutexLock hot_lock(&hot_lock_); - if (hot_batches_.size() == 0) { - return error::InvalidArgument("Failed to expire row batch, no row batches in table"); - } - if (time_col_idx_ != -1) hot_time_.pop_front(); - hot_row_ids_.pop_front(); - record_or_row_batch = std::move(hot_batches_.front()); - hot_batches_.pop_front(); - // Expire the first hot batch invalidates all hot indices, so we have to increase the - // generation. - generation_++; - } - int64_t rb_bytes = 0; - if (std::holds_alternative(record_or_row_batch)) { - auto record_batch = std::move(std::get(record_or_row_batch)); - for (const auto& col : *record_batch.record_batch) { - rb_bytes += col->Bytes(); - } - } else { - auto row_batch = std::get(record_or_row_batch); - for (const auto& [col_idx, col] : Enumerate(row_batch.columns())) { -#define TYPE_CASE(_dt_) rb_bytes += types::GetArrowArrayBytes<_dt_>(col.get()); - PL_SWITCH_FOREACH_DATATYPE(rel_.GetColumnType(col_idx), TYPE_CASE); -#undef TYPE_CASE - } - } - { - absl::base_internal::SpinLockHolder lock(&stats_lock_); - hot_bytes_ -= rb_bytes; + absl::base_internal::SpinLockHolder hot_lock(&hot_lock_); + if (hot_store_->Size() == 0) { + return error::InvalidArgument("Failed to expire row batch, no row batches in table"); } + hot_store_->PopFront(); + batch_size_accountant_->ExpireHotBatch(); return Status::OK(); } @@ -545,307 +442,5 @@ Status Table::ExpireBatch() { // batch. return ExpireHot(); } - -Status Table::AddBatchSliceToRowBatch(const BatchSlice& slice, const std::vector& cols, - schema::RowBatch* output_rb, - arrow::MemoryPool* mem_pool) const { - absl::MutexLock gen_lock(&generation_lock_); - PL_RETURN_IF_ERROR(UpdateSliceUnlocked(slice)); - // After this point, as long as gen_lock is held, the unsafe properties of slice are valid. - if (!slice.unsafe_is_hot) { - absl::MutexLock cold_lock(&cold_lock_); - for (auto col_idx : cols) { - auto arr = cold_column_buffers_[col_idx][slice.unsafe_batch_index]->Slice( - slice.unsafe_row_start, slice.unsafe_row_end + 1 - slice.unsafe_row_start); - PL_RETURN_IF_ERROR(output_rb->AddColumn(arr)); - } - return Status::OK(); - } - - absl::MutexLock hot_lock(&hot_lock_); - if (std::holds_alternative(hot_batches_[slice.unsafe_batch_index])) { - auto record_batch_ptr = - std::get_if(&hot_batches_[slice.unsafe_batch_index]); - for (auto col_idx : cols) { - if (record_batch_ptr->cache_validity[col_idx]) { - auto arr = record_batch_ptr->arrow_cache[col_idx]->Slice( - slice.unsafe_row_start, slice.unsafe_row_end + 1 - slice.unsafe_row_start); - PL_RETURN_IF_ERROR(output_rb->AddColumn(arr)); - continue; - } - // Arrow array wasn't in cache, Convert to arrow and then add to cache. - auto arr = record_batch_ptr->record_batch->at(col_idx)->ConvertToArrow(mem_pool); - record_batch_ptr->arrow_cache[col_idx] = arr; - record_batch_ptr->cache_validity[col_idx] = true; - PL_RETURN_IF_ERROR(output_rb->AddColumn( - arr->Slice(slice.unsafe_row_start, slice.unsafe_row_end + 1 - slice.unsafe_row_start))); - } - } else { - const auto& row_batch = std::get(hot_batches_[slice.unsafe_batch_index]); - for (auto col_idx : cols) { - auto arr = row_batch.ColumnAt(col_idx)->Slice( - slice.unsafe_row_start, slice.unsafe_row_end + 1 - slice.unsafe_row_start); - PL_RETURN_IF_ERROR(output_rb->AddColumn(arr)); - } - } - return Status::OK(); -} - -int64_t Table::NumBatches() const { - absl::MutexLock gen_lock(&generation_lock_); - absl::MutexLock cold_lock(&cold_lock_); - absl::MutexLock hot_lock(&hot_lock_); - return RingSizeUnlocked() + hot_batches_.size(); -} - -BatchSlice Table::FirstBatch() const { - absl::MutexLock gen_lock(&generation_lock_); - { - absl::MutexLock cold_lock(&cold_lock_); - if (ring_back_idx_ != -1) { - auto row_ids = cold_row_ids_.front(); - return BatchSlice::Cold(ring_front_idx_, 0, ColdBatchLengthUnlocked(ring_front_idx_) - 1, - generation_, row_ids); - } - } - // No cold batches, return first hot batch or invalid if there are no hot batches. - absl::MutexLock hot_lock(&hot_lock_); - if (hot_batches_.size() == 0) { - return BatchSlice::Invalid(); - } - auto row_ids = hot_row_ids_.front(); - return BatchSlice::Hot(0, 0, HotBatchLengthUnlocked(0) - 1, generation_, row_ids); -} - -int64_t Table::End() const { - absl::MutexLock hot_lock(&hot_lock_); - return next_row_id_; -} - -BatchSlice Table::NextBatch(const BatchSlice& slice, int64_t stop_row_id) const { - auto next_slice = NextBatchWithoutStop(slice); - return SliceIfPastStop(next_slice, stop_row_id); -} - -BatchSlice Table::NextBatchWithoutStop(const BatchSlice& slice) const { - absl::MutexLock gen_lock(&generation_lock_); - auto status = UpdateSliceUnlocked(slice); - if (!status.ok()) { - return BatchSlice::Invalid(); - } - if (!slice.unsafe_is_hot) { - absl::MutexLock cold_lock(&cold_lock_); - auto batch_length = ColdBatchLengthUnlocked(slice.unsafe_batch_index); - // We first check if the previous slice had already output all the rows in its batch. If it - // didn't then we need to output a batch with the remaining rows. - if (slice.unsafe_row_end < batch_length - 1) { - auto new_batch_size = batch_length - slice.unsafe_row_end; - return BatchSlice::Cold(slice.unsafe_batch_index, slice.unsafe_row_end + 1, batch_length - 1, - generation_, slice.uniq_row_end_idx + 1, - slice.uniq_row_end_idx + new_batch_size - 1); - } - - auto next_ring_index = RingNextAddrUnlocked(slice.unsafe_batch_index); - if (next_ring_index == -1) { - absl::MutexLock hot_lock(&hot_lock_); - // This is the last cold batch so return the first hot batch. If there are no hot batches - // return an invalid batch. - if (hot_batches_.size() == 0) { - return BatchSlice::Invalid(); - } - return BatchSlice::Hot(0, 0, HotBatchLengthUnlocked(0) - 1, generation_, - hot_row_ids_.front()); - } - - // At this point we can just take the next cold batch. - return BatchSlice::Cold(next_ring_index, 0, ColdBatchLengthUnlocked(next_ring_index) - 1, - generation_, cold_row_ids_[RingVectorIndexUnlocked(next_ring_index)]); - } - - absl::MutexLock hot_lock(&hot_lock_); - auto batch_length = HotBatchLengthUnlocked(slice.unsafe_batch_index); - if (slice.unsafe_row_end < batch_length - 1) { - auto new_batch_size = batch_length - slice.unsafe_row_end; - return BatchSlice::Hot(slice.unsafe_batch_index, slice.unsafe_row_end + 1, batch_length - 1, - generation_, slice.uniq_row_end_idx + 1, - slice.uniq_row_end_idx + new_batch_size - 1); - } - - auto next_index = slice.unsafe_batch_index + 1; - if (next_index >= static_cast(hot_batches_.size())) { - return BatchSlice::Invalid(); - } - auto next_length = HotBatchLengthUnlocked(next_index); - return BatchSlice::Hot(next_index, 0, next_length - 1, generation_, hot_row_ids_[next_index]); -} - -int64_t Table::FindStopTime(int64_t time, arrow::MemoryPool* mem_pool) const { - absl::MutexLock gen_lock(&generation_lock_); - { - absl::MutexLock hot_lock(&hot_lock_); - auto it = - std::upper_bound(hot_time_.begin(), hot_time_.end(), time, IntervalComparatorUpperBound); - if (it != hot_time_.begin()) { - it--; - auto index = std::distance(hot_time_.begin(), it); - ArrowArrayPtr time_col; - if (std::holds_alternative(hot_batches_[index])) { - auto record_batch_ptr = std::get_if(&hot_batches_[index]); - time_col = GetHotColumnUnlocked(record_batch_ptr, time_col_idx_, mem_pool); - } else { - auto row_batch = std::get(hot_batches_[index]); - time_col = row_batch.columns().at(time_col_idx_); - } - auto row_offset = - types::SearchArrowArrayLessThanOrEqual(time_col.get(), time); - return hot_row_ids_[index].first + row_offset; - } - } - absl::MutexLock cold_lock(&cold_lock_); - auto it = - std::upper_bound(cold_time_.begin(), cold_time_.end(), time, IntervalComparatorUpperBound); - if (it == cold_time_.begin()) { - return -1; - } - it--; - auto index = it - cold_time_.begin(); - auto ring_index = RingIndexUnlocked(index); - auto time_col = cold_column_buffers_[time_col_idx_][ring_index]; - auto row_offset = - types::SearchArrowArrayLessThanOrEqual(time_col.get(), time); - return cold_row_ids_[index].first + row_offset; -} - -int64_t Table::ColdBatchLengthUnlocked(int64_t index) const { - return cold_column_buffers_[0].at(index)->length(); -} -int64_t Table::HotBatchLengthUnlocked(int64_t index) const { - if (std::holds_alternative(hot_batches_[index])) { - auto record_batch_ptr = std::get_if(&hot_batches_[index]); - return record_batch_ptr->record_batch->at(0)->Size(); - } else { - auto row_batch = std::get(hot_batches_[index]); - return row_batch.num_rows(); - } -} - -Table::ArrowArrayPtr Table::GetHotColumnUnlocked(const RecordBatchWithCache* record_batch_ptr, - int64_t col_idx, - arrow::MemoryPool* mem_pool) const { - if (record_batch_ptr->cache_validity[col_idx]) { - return record_batch_ptr->arrow_cache[col_idx]; - } - auto arrow_array_sptr = record_batch_ptr->record_batch->at(col_idx)->ConvertToArrow(mem_pool); - record_batch_ptr->arrow_cache[col_idx] = arrow_array_sptr; - record_batch_ptr->cache_validity[col_idx] = true; - return arrow_array_sptr; -} - -BatchSlice Table::SliceIfPastStop(const BatchSlice& slice, int64_t stop_row_id) const { - if (!slice.IsValid()) { - return slice; - } - - if (slice.uniq_row_start_idx >= stop_row_id) { - return BatchSlice::Invalid(); - } - if (slice.uniq_row_end_idx >= stop_row_id) { - auto new_slice = slice; - new_slice.uniq_row_end_idx = stop_row_id - 1; - // Force update of unsafe properties. - new_slice.generation = -1; - return new_slice; - } - return slice; -} - -int64_t Table::RingVectorIndexUnlocked(int64_t ring_index) const { - // This function assumes the ring_index is valid. If it's not valid the resulting vector index - // will also not be valid. - if (ring_index >= ring_front_idx_) { - return ring_index - ring_front_idx_; - } - return ring_index + (ring_capacity_ - ring_front_idx_); -} - -int64_t Table::RingIndexUnlocked(int64_t vector_index) const { - // This function assumes the vector_index is valid. If it's not valid the resulting ring index - // will also not be valid. - return (ring_front_idx_ + vector_index) % ring_capacity_; -} - -int64_t Table::RingSizeUnlocked() const { - if (ring_back_idx_ == -1) { - return 0; - } - if (ring_front_idx_ <= ring_back_idx_) { - return 1 + (ring_back_idx_ - ring_front_idx_); - } - return (ring_back_idx_ + 1) + (ring_capacity_ - ring_front_idx_); -} - -int64_t Table::RingNextAddrUnlocked(int64_t ring_index) const { - // This function assumes the passed in ring_index is valid, and then checks that the next index is - // valid. - if (ring_back_idx_ == -1 || ring_index == ring_back_idx_) { - return -1; - } - return (ring_index + 1) % ring_capacity_; -} - -Status Table::AdvanceRingBufferUnlocked() { - auto next_ring_back_idx = (ring_back_idx_ + 1) % ring_capacity_; - if (ring_back_idx_ != -1 && next_ring_back_idx == ring_front_idx_) { - return error::Internal( - "Failed to add new cold batch, ring buffer alreadRingNextAddrUnlocked full"); - } - ring_back_idx_ = next_ring_back_idx; - return Status::OK(); -} - -Status Table::UpdateSliceUnlocked(const BatchSlice& slice) const { - if (slice.generation == generation_) { - return Status::OK(); - } - { - absl::MutexLock cold_lock(&cold_lock_); - auto it = std::lower_bound(cold_row_ids_.begin(), cold_row_ids_.end(), slice.uniq_row_start_idx, - IntervalComparatorLowerBound); - - if (it != cold_row_ids_.end()) { - if (slice.uniq_row_end_idx < it->first) { - // All data in this slice has been expired from the table. - return error::InvalidArgument( - "Requested RowBatch Slice has already been expired from the table"); - } - auto vector_index = std::distance(cold_row_ids_.begin(), it); - auto ring_index = RingIndexUnlocked(vector_index); - slice.unsafe_is_hot = false; - slice.unsafe_batch_index = ring_index; - slice.unsafe_row_start = slice.uniq_row_start_idx - it->first; - slice.unsafe_row_end = slice.uniq_row_end_idx - it->first; - slice.generation = generation_; - return Status::OK(); - } - } - absl::MutexLock hot_lock(&hot_lock_); - auto it = std::lower_bound(hot_row_ids_.begin(), hot_row_ids_.end(), slice.uniq_row_start_idx, - IntervalComparatorLowerBound); - if (it == hot_row_ids_.end()) { - return error::InvalidArgument("Request RowBatch Slice is not in bounds of the table"); - } - if (slice.uniq_row_end_idx < it->first) { - // All data in this slice has been expired from the table. - return error::InvalidArgument( - "Requested RowBatch Slice has already been expired from the table"); - } - slice.unsafe_is_hot = true; - slice.unsafe_batch_index = std::distance(hot_row_ids_.begin(), it); - slice.unsafe_row_start = slice.uniq_row_start_idx - it->first; - slice.unsafe_row_end = slice.uniq_row_end_idx - it->first; - slice.generation = generation_; - return Status::OK(); -} - } // namespace table_store } // namespace px diff --git a/src/table_store/table/table.h b/src/table_store/table/table.h index 8b6253fd15d..91280135ca3 100644 --- a/src/table_store/table/table.h +++ b/src/table_store/table/table.h @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -40,6 +41,11 @@ #include "src/table_store/schema/row_batch.h" #include "src/table_store/schema/row_descriptor.h" #include "src/table_store/schemapb/schema.pb.h" +#include "src/table_store/table/internal/arrow_array_compactor.h" +#include "src/table_store/table/internal/batch_size_accountant.h" +#include "src/table_store/table/internal/record_or_row_batch.h" +#include "src/table_store/table/internal/store_with_row_accounting.h" +#include "src/table_store/table/internal/types.h" #include "src/table_store/table/table_metrics.h" DECLARE_int32(table_store_table_size_limit); @@ -60,136 +66,48 @@ struct TableStats { int64_t min_time; }; -struct BatchSlice { - // All properties with the unsafe_ prefix should not be touched except inside of Table with the - // proper lock held. - mutable bool unsafe_is_hot; - mutable int64_t unsafe_batch_index; - mutable int64_t unsafe_row_start; - mutable int64_t unsafe_row_end; - - mutable int64_t generation = -1; - int64_t uniq_row_start_idx = -1; - int64_t uniq_row_end_idx = -1; - - int64_t Size() const { return uniq_row_end_idx - uniq_row_start_idx + 1; } - bool IsValid() const { return uniq_row_start_idx != -1 && uniq_row_end_idx != -1; } - static BatchSlice Invalid() { return BatchSlice{false, -1, -1, -1}; } - static BatchSlice Cold(int64_t cold_index, int64_t row_start, int64_t row_end, int64_t generation, - std::pair row_ids) { - return BatchSlice{false, cold_index, row_start, row_end, - generation, row_ids.first, row_ids.second}; - } - static BatchSlice Cold(int64_t cold_index, int64_t row_start, int64_t row_end, int64_t generation, - int64_t uniq_row_start_idx, int64_t uniq_row_end_idx) { - return BatchSlice{false, cold_index, row_start, row_end, - generation, uniq_row_start_idx, uniq_row_end_idx}; - } - static BatchSlice Hot(int64_t hot_index, int64_t row_start, int64_t row_end, int64_t generation, - std::pair row_ids) { - return BatchSlice{true, hot_index, row_start, row_end, - generation, row_ids.first, row_ids.second}; - } - static BatchSlice Hot(int64_t hot_index, int64_t row_start, int64_t row_end, int64_t generation, - int64_t uniq_row_start_idx, int64_t uniq_row_end_idx) { - return BatchSlice{true, hot_index, row_start, row_end, - generation, uniq_row_start_idx, uniq_row_end_idx}; - } -}; - -class ArrowArrayCompactor { - public: - ArrowArrayCompactor(const schema::Relation& rel, arrow::MemoryPool* mem_pool); - Status AppendColumn(int64_t col_idx, std::shared_ptr arr); - - Status Finish(); - const std::vector>& output_columns() const { - return output_columns_; - } - int64_t Size() const { return bytes_; } - - private: - int64_t bytes_ = 0; - std::vector> output_columns_; - std::vector> builders_; - std::vector column_types_; - - template - Status AppendColumnTyped(int64_t col_idx, std::shared_ptr arr) { - auto builder_untyped = builders_[col_idx].get(); - auto builder = static_cast::arrow_builder_type*>( - builder_untyped); - auto typed_arr = - std::static_pointer_cast::arrow_array_type>(arr); - PL_RETURN_IF_ERROR(builder->Reserve(typed_arr->length())); - for (int i = 0; i < typed_arr->length(); ++i) { - builder->UnsafeAppend(typed_arr->Value(i)); - } - bytes_ += types::GetArrowArrayBytes(typed_arr.get()); - return Status::OK(); - } - - template - Status FinishTyped(int64_t col_idx) { - auto builder_untyped = builders_[col_idx].get(); - auto builder = static_cast::arrow_builder_type*>( - builder_untyped); - PL_RETURN_IF_ERROR(builder->Finish(&output_columns_[col_idx])); - return Status::OK(); - } -}; - /** * Table stores data in two separate partitions, hot and cold. Hot data is "hot" from the * perspective of writes, in other words data is first written to the hot partitiion, and then later * moved to the cold partition. Reads can hit both hot and cold data. Hot data can be written in * RecordBatch format (i.e. for writes from stirling) or schema::RowBatch format (i.e. for writes - * from MemorySinkNodes, which are not currently used). Hot data is stored in a deque, while cold - * data is stored in a ring buffer. Hot data is eventually converted to arrow arrays either during - * compaction and transfer to cold or during a read. If the conversion happens on read then we store - * the arrow array in a cache with the hot batch so that future reads, before this batch is - * transferred to cold, don't also need to convert to arrow. + * from MemorySinkNodes, which are not currently used). Both stores use a wrapper around std::deque + * to store the data while keeping track of row and time indexes (see `StoreWithRowTimeAccounting` + * and `Time and Row Indexing` below). * * Synchronization Scheme: - * The hot and cold partitions are synchronized separately with spinlocks. Additionally, the - * generation of the store is protected by a spinlock. + * The hot and cold partitions are synchronized separately with spinlocks. * * Compaction Scheme: - * Hot batches are compacted into batches of minimum size min_cold_batch_size_ bytes. The compaction - * routine should be called periodically but that is not the responsibility of this class. + * Hot batches are compacted into batches of size roughly `compacted_batch_size_` +/- the size of a + * single row. The compaction routine should be called periodically but that is not the + * responsibility of this class. * * Time and Row Indexing: - * The first and last values of the time columns for each batch are stored as intervals in - * (hot/cold)_time_, which internally maintains a sorted list for O(logN) time lookup. Additionally, - * this class supports multiple batches with the same timestamps. In order to support this, we keep - * track of an incrementing identifier for each row (we store only the identifiers of the first and - * last rows in a batch). This allows us to support returning only valid data even if a compaction - * occurs in the middle of query execution. For example, suppose we have two batches of data all - * with the same timestamps. Suppose a query reads through all this data. Now after reading the - * first batch, compaction is called and both batches are compacted together and put into cold - * storage. Then if the query were to naively try to access the "second" batch since it already saw - * the "first" batch, there wouldn't be any more data and the query will have skipped all the rows - * in what was initially the "second" batch. Instead, the BatchSlice object stores the unique row - * identifiers of the first and last row of that batch, so that when NextBatch is called on that - * batch it can work out that it needs to return a slice of the batch with the original "second" - * batch's data. + * The first and last values of the time columns for each batch are stored in + * `StoreWithRowTimeAccounting` which internally maintains a sorted list for O(logN) time lookup. + * See that class for more details. Additionally, this class supports multiple batches with the same + * timestamps. In order to support this, we keep track of an incrementing identifier for each row + * (we store only the identifiers of the first and last rows in a batch). This allows us to support + * returning only valid data even if a compaction occurs in the middle of query execution. For + * example, suppose we have two batches of data all with the same timestamps. Suppose a query reads + * through all this data. Now after reading the first batch, compaction is called and both batches + * are compacted together and put into cold storage. Then if the query were to naively try to access + * the "second" batch since it already saw the "first" batch, there wouldn't be any more data and + * the query will have skipped all the rows in what was initially the "second" batch. Instead, the + * Cursor stores the unique row identifier of the last read row, so + * that when GetNextRowBatch is called on the cursor it can work out that it needs to return a slice + * of the batch with the original "second" batch's data. */ class Table : public NotCopyable { - using RecordBatchPtr = std::unique_ptr; - using ArrowArrayPtr = std::shared_ptr; - using ColumnBuffer = std::vector; - using TimeInterval = std::pair; - using RowIDInterval = std::pair; - - struct RecordBatchWithCache { - RecordBatchPtr record_batch; - // Whenever we have to convert a hot batch to an arrow array, we store the arrow array in - // this cache. Compaction will eventually take these arrow arrays and move them into cold. - mutable std::vector arrow_cache; - mutable std::vector cache_validity; - }; - - using RecordOrRowBatch = std::variant; + using RecordBatchPtr = internal::RecordBatchPtr; + using ArrowArrayPtr = internal::ArrowArrayPtr; + using ColdBatch = internal::ColdBatch; + using Time = internal::Time; + using TimeInterval = internal::TimeInterval; + using RowID = internal::RowID; + using RowIDInterval = internal::RowIDInterval; + using BatchID = internal::BatchID; static inline constexpr int64_t kDefaultColdBatchMinSize = 64 * 1024; @@ -203,6 +121,82 @@ class Table : public NotCopyable { new Table(table_name, relation, FLAGS_table_store_table_size_limit)); } + /** + * Cursor allows iterating the table, while guaranteeing that no row is returned twice (even when + * compactions occur between accesses). {Start,Stop}Spec specify what rows the cursor should begin + * and end at when iterating the cursor. + */ + class Cursor { + public: + /** + * StartSpec defines where a Cursor should begin within the table. Current options are to start + * at a given time, or start at the first row currently in the table. + */ + struct StartSpec { + enum StartType { + StartAtTime, + CurrentStartOfTable, + }; + StartType type = CurrentStartOfTable; + Time start_time = -1; + }; + + /** + * StopSpec defines when a Cursor should stop and be considered exhausted. Current options are + * to stop at a given time, stop at the last row currently in the table, or infinite (i.e. the + * Cursor never becomes exhausted). + */ + struct StopSpec { + enum StopType { + // Currently, StopAtTime will stop at either the time provided or the current end of the + // table, whichever comes first. + // TODO(james): make StopAtTime stop at the provided time, regardless of whether there is + // currently data for it. + StopAtTime, + CurrentEndOfTable, + Infinite, + }; + StopType type = CurrentEndOfTable; + Time stop_time = -1; + }; + + explicit Cursor(const Table* table) : Cursor(table, StartSpec{}, StopSpec{}) {} + Cursor(const Table* table, StartSpec start, StopSpec stop); + + // In the case of StopType == Infinite or StopType == StopAtTime, this returns whether the table + // has the next batch ready. In the case of StopType == CurrentEndOfTable, this returns !Done(). + // Note that `NextBatchReady() == true` doesn't guarantee that `GetNextRowBatch` will succeed. + // For instance, the desired row batch could have been expired between the call to + // `NextBatchReady()` and `GetNextRowBatch(...)`, and then the row batch after the expired one + // is past the stopping condition. In this case `GetNextRowBatch(...)` will return an error. + bool NextBatchReady(); + StatusOr> GetNextRowBatch(const std::vector& cols); + // In the case of StopType == Infinite, this function always returns false. + bool Done(); + // Change the StopSpec of the cursor. + void UpdateStopSpec(StopSpec stop); + + private: + void AdvanceToStart(const StartSpec& start); + void StopStateFromSpec(StopSpec&& stop); + + // The following methods are made private so that they are only accessible from Table. + internal::RowID* LastReadRowID(); + internal::BatchHints* Hints(); + std::optional StopRowID() const; + + struct StopState { + StopSpec spec; + RowID stop_row_id; + }; + const Table* table_; + internal::BatchHints hints_; + RowID last_read_row_id_; + StopState stop_; + + friend class Table; + }; + /** * @brief Construct a new Table object along with its columns. Can be used to create * a table (along with columns) based on a subscription message from Stirling. @@ -216,18 +210,46 @@ class Table : public NotCopyable { : Table(table_name, relation, max_table_size, kDefaultColdBatchMinSize) {} Table(std::string_view table_name, const schema::Relation& relation, size_t max_table_size, - size_t min_cold_batch_size); + size_t compacted_batch_size_); /** - * Get a RowBatch of data corresponding to the passed in BatchSlice. - * @param slice the BatchSlice to get the data for. + * Get a RowBatch of data corresponding to the next data after the given cursor. + * @param cursor the Table::Cursor to get the next row batch after. * @param cols a vector of column indices to get data for. - * @param mem_pool the arrow memory pool to use if the slice is in hot storage. * @return a unique ptr to a RowBatch with the requested data. */ - StatusOr> GetRowBatchSlice(const BatchSlice& slice, - const std::vector& cols, - arrow::MemoryPool* mem_pool) const; + StatusOr> GetNextRowBatch( + Cursor* cursor, const std::vector& cols) const; + + /** + * Get the unique identifier of the first row in the table. + * If all the data is expired from the table, this returns the last row id that was in the table. + * @return unique identifier of the first row. + */ + RowID FirstRowID() const; + + /** + * Get the unique identifier of the last row in the table. + * If all the data is expired from the table, this returns the last row id that was in the table. + * @return unique identifier of the last row. + */ + RowID LastRowID() const; + + /** + * Find the unique identifier of the first row for which its corresponding time is greater than or + * equal to the given time. + * @param time the time to search for. + * @return unique identifier of the first row with time greater than or equal to the given time. + */ + RowID FindRowIDFromTimeFirstGreaterThanOrEqual(Time time) const; + + /** + * Find the unique identifier of the first row for which its corresponding time is greater than + * the given time. + * @param time the time to search for. + * @return unique identifier of the first row with time greater than the given time. + */ + RowID FindRowIDFromTimeFirstGreaterThan(Time time) const; /** * Writes a row batch to the table. @@ -246,23 +268,6 @@ class Table : public NotCopyable { schema::Relation GetRelation() const; StatusOr> GetTableAsRecordBatches() const; - /** - * @param time the timestamp to search for. - * @param mem_pool the arrow memory pool. - * @return the BatchSlice of the first row with timestamp greater than or equal to the given time, - * until the end of its corresponding row batch. - */ - StatusOr FindBatchSliceGreaterThanOrEqual(int64_t time, - arrow::MemoryPool* mem_pool) const; - - /** - * @param time the timestamp to search for. - * @param mem_pool the arrow memory pool. - * @return the BatchSlice of the last row with timestamp less than or equal to the given time, - * until the end of its corresponding row batch. - */ - StatusOr FindStopPositionForTime(int64_t time, arrow::MemoryPool* mem_pool) const; - /** * Covert the table and store in passed in proto. * @param table_proto The table proto to write to. @@ -273,42 +278,7 @@ class Table : public NotCopyable { TableStats GetTableStats() const; /** - * Gets the BatchSlice corresponding to the next batch after the given batch. - * The BatchSlice will be cut short to ensure it doesn't extend past the given StopPosition. - * If there are no more batches or the next BatchSlice would be entirely beyond the given - * StopPosition then an Invalid BatchSlice is returned. Using this function iteratively will - * ensure that all rows are seen even if a table compaction occurs during the iteration. This - * means in some cases NextBatch will return a BatchSlice with the same batch index as the passed - * in batch, but with a different slice of rows. - * - * @param slice The BatchSlice to get the next batch for. - * @param stop The StopPosition (unique row identifier) that the NextBatch should not extend - * beyond. - * @return A BatchSlice corresponding to the next slice of rows after the given slice but before - * the stop position. - */ - BatchSlice NextBatch(const BatchSlice& slice, StopPosition stop) const; - BatchSlice NextBatch(const BatchSlice& slice) const { return NextBatch(slice, End()); } - /** - * @returns a BatchSlice corresponding to the first batch in the table. - */ - BatchSlice FirstBatch() const; - /** - * @return A stop position 1 row past the end of the table. - */ - StopPosition End() const; - - /** - * Reduces the extent of a BatchSlice to ensure that it doesn't include rows past the given stop - * position. - * @param slice the BatchSlice to cut short if its past the stop position. - * @param stop the StopPosition to cut short at. - * @return a new BatchSlice that doesn't extend past the StopPosition. - */ - BatchSlice SliceIfPastStop(const BatchSlice& slice, StopPosition stop) const; - - /** - * Compacts hot batches into min_cold_batch_size_ sized cold batches. Each call to + * Compacts hot batches into compacted_batch_size_ sized cold batches. Each call to * CompactHotToCold will create a maximum of kMaxBatchesPerCompactionCall cold batches. * @param mem_pool arrow MemoryPool to be used for creating new cold batches. */ @@ -316,7 +286,6 @@ class Table : public NotCopyable { private: TableMetrics metrics_; - Status ExpireRowBatches(int64_t row_batch_size); schema::Relation rel_; @@ -327,74 +296,35 @@ class Table : public NotCopyable { int64_t batches_added_ ABSL_GUARDED_BY(stats_lock_) = 0; int64_t compacted_batches_ ABSL_GUARDED_BY(stats_lock_) = 0; int64_t max_table_size_ = 0; - int64_t min_cold_batch_size_; - - mutable absl::Mutex hot_lock_; - std::deque hot_batches_ ABSL_GUARDED_BY(hot_lock_); + const int64_t compacted_batch_size_; + mutable absl::base_internal::SpinLock hot_lock_; + std::unique_ptr> hot_store_ + ABSL_GUARDED_BY(hot_lock_); - mutable absl::Mutex cold_lock_; - std::vector cold_column_buffers_ ABSL_GUARDED_BY(cold_lock_); - - // The generation lock must be held during compaction and - // expiration, and anytime one would like to access the unsafe_ attributes of BatchSlice. - mutable absl::Mutex generation_lock_; - // Generation of the HotColdDataStore is incremented whenever a change to the store would - // invalidate some BatchSlice', eg. during compaction or hot expiration. - int64_t generation_ ABSL_GUARDED_BY(generation_lock_); - - // We store ring buffer properties at the table level rather than for each individual Column. - int64_t ring_front_idx_ ABSL_GUARDED_BY(cold_lock_) = 0; - int64_t ring_back_idx_ ABSL_GUARDED_BY(cold_lock_) = -1; - int64_t ring_capacity_ ABSL_GUARDED_BY(cold_lock_); + mutable absl::base_internal::SpinLock cold_lock_; + std::unique_ptr> cold_store_ + ABSL_GUARDED_BY(cold_lock_); + std::deque cold_batch_bytes_ ABSL_GUARDED_BY(cold_lock_); // Counter to assign a unique row ID to each row. Synchronized by hot_lock_ since its only // accessed on a hot write. int64_t next_row_id_ ABSL_GUARDED_BY(hot_lock_) = 0; - std::deque hot_row_ids_ ABSL_GUARDED_BY(hot_lock_); - std::deque hot_time_ ABSL_GUARDED_BY(hot_lock_); - std::deque cold_row_ids_ ABSL_GUARDED_BY(cold_lock_); - std::deque cold_time_ ABSL_GUARDED_BY(cold_lock_); - int64_t time_col_idx_ = -1; - Status WriteHot(RecordBatchPtr record_batch); - Status WriteHot(const schema::RowBatch& rb); - Status UpdateTimeRowIndices(const schema::RowBatch& rb) ABSL_EXCLUSIVE_LOCKS_REQUIRED(hot_lock_); - Status UpdateTimeRowIndices(types::ColumnWrapperRecordBatch* record_batch) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(hot_lock_); + Status WriteHot(internal::RecordOrRowBatch&& record_or_row_batch); Status ExpireBatch(); Status ExpireHot(); StatusOr ExpireCold(); - Status CompactSingleBatch(arrow::MemoryPool* mem_pool); - - Status AddBatchSliceToRowBatch(const BatchSlice& slice, const std::vector& cols, - schema::RowBatch* output_rb, arrow::MemoryPool* mem_pool) const; - ArrowArrayPtr GetHotColumnUnlocked(const RecordBatchWithCache* record_batch_ptr, int64_t col_idx, - arrow::MemoryPool* mem_pool) const - ABSL_EXCLUSIVE_LOCKS_REQUIRED(hot_lock_); - - int64_t NumBatches() const; - int64_t ColdBatchLengthUnlocked(int64_t ring_index) const - ABSL_EXCLUSIVE_LOCKS_REQUIRED(cold_lock_); - int64_t HotBatchLengthUnlocked(int64_t hot_index) const ABSL_EXCLUSIVE_LOCKS_REQUIRED(hot_lock_); - - // Returns the unique identifier of the last row less than or equal to the given time. - int64_t FindStopTime(int64_t time, arrow::MemoryPool* mem_pool) const; - - // Returns the index into cold_row_ids_ or cold_time_ given the ring buffer location. - int64_t RingVectorIndexUnlocked(int64_t ring_index) const - ABSL_EXCLUSIVE_LOCKS_REQUIRED(cold_lock_); - // Returns the index into the ring buffer given a vector index into cold_row_ids_ or cold_time_. - int64_t RingIndexUnlocked(int64_t vector_index) const ABSL_EXCLUSIVE_LOCKS_REQUIRED(cold_lock_); - int64_t RingSizeUnlocked() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(cold_lock_); - int64_t RingNextAddrUnlocked(int64_t ring_index) const ABSL_EXCLUSIVE_LOCKS_REQUIRED(cold_lock_); - Status AdvanceRingBufferUnlocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(cold_lock_); - - Status UpdateSliceUnlocked(const BatchSlice& slice) const - ABSL_EXCLUSIVE_LOCKS_REQUIRED(generation_lock_); - - BatchSlice NextBatchWithoutStop(const BatchSlice& slice) const; + Status ExpireRowBatches(int64_t row_batch_size); + Status CompactSingleBatchUnlocked(arrow::MemoryPool* mem_pool) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(cold_lock_) ABSL_EXCLUSIVE_LOCKS_REQUIRED(hot_lock_); + + std::unique_ptr batch_size_accountant_ ABSL_GUARDED_BY(hot_lock_); + + internal::ArrowArrayCompactor compactor_; + + friend class Cursor; }; } // namespace table_store diff --git a/src/table_store/table/table_benchmark.cc b/src/table_store/table/table_benchmark.cc index af434853842..c73210dfbd9 100644 --- a/src/table_store/table/table_benchmark.cc +++ b/src/table_store/table/table_benchmark.cc @@ -37,10 +37,16 @@ static inline std::unique_ptr MakeTable(int64_t max_size, int64_t compact return std::make_unique
("test_table", rel, max_size, compaction_size); } -static inline std::unique_ptr MakeHotBatch(int64_t batch_size) { +static inline std::unique_ptr MakeHotBatch(int64_t batch_size, + int64_t* time_counter) { std::vector col1_vals(batch_size, 0); std::vector col2_vals(batch_size, 1.234); + for (size_t i = 0; i < col1_vals.size(); ++i) { + col1_vals[i] = *time_counter; + (*time_counter)++; + } + auto wrapper_batch = std::make_unique(); auto col_wrapper1 = std::make_shared(batch_size); col_wrapper1->Clear(); @@ -54,27 +60,31 @@ static inline std::unique_ptr MakeHotBatch(int6 return wrapper_batch; } -static inline void FillTableHot(Table* table, int64_t table_size, int64_t batch_length) { +static inline int64_t FillTableHot(Table* table, int64_t table_size, int64_t batch_length) { int64_t batch_size = batch_length * sizeof(int64_t) + batch_length * sizeof(double); + int64_t time_counter = 0; for (int64_t i = 0; i < (table_size / batch_size); ++i) { - auto batch = MakeHotBatch(batch_length); + auto batch = MakeHotBatch(batch_length, &time_counter); PL_CHECK_OK(table->TransferRecordBatch(std::move(batch))); } + return time_counter; } -static inline void FillTableCold(Table* table, int64_t table_size, int64_t batch_length) { +static inline int64_t FillTableCold(Table* table, int64_t table_size, int64_t batch_length) { int64_t batch_size = batch_length * sizeof(int64_t) + batch_length * sizeof(double); + int64_t time_counter = 0; for (int64_t i = 0; i < (table_size / batch_size); ++i) { - auto batch = MakeHotBatch(batch_length); + auto batch = MakeHotBatch(batch_length, &time_counter); PL_CHECK_OK(table->TransferRecordBatch(std::move(batch))); // Run compaction every time to ensure that all batches get put into cold. PL_CHECK_OK(table->CompactHotToCold(arrow::default_memory_pool())); } + return time_counter; } -static inline void ReadFullTable(Table* table) { - for (auto slice = table->FirstBatch(); slice.IsValid(); slice = table->NextBatch(slice)) { - benchmark::DoNotOptimize(table->GetRowBatchSlice(slice, {0, 1}, arrow::default_memory_pool())); +static inline void ReadFullTable(Table::Cursor* cursor) { + while (!cursor->Done()) { + benchmark::DoNotOptimize(cursor->GetNextRowBatch({0, 1})); } } @@ -88,11 +98,14 @@ static void BM_TableReadAllHot(benchmark::State& state) { CHECK_EQ(table->GetTableStats().bytes, table_size); + Table::Cursor cursor(table.get()); + for (auto _ : state) { - ReadFullTable(table.get()); + ReadFullTable(&cursor); state.PauseTiming(); table = MakeTable(table_size, compaction_size); FillTableHot(table.get(), table_size, batch_length); + cursor = Table::Cursor(table.get()); state.ResumeTiming(); } @@ -107,35 +120,48 @@ static void BM_TableReadAllCold(benchmark::State& state) { auto table = MakeTable(table_size, compaction_size); FillTableCold(table.get(), table_size, batch_length); CHECK_EQ(table->GetTableStats().bytes, table_size); + Table::Cursor cursor(table.get()); for (auto _ : state) { - ReadFullTable(table.get()); + ReadFullTable(&cursor); + + state.PauseTiming(); + cursor = Table::Cursor(table.get()); + state.ResumeTiming(); } state.SetBytesProcessed(state.iterations() * table_size); } +Table::Cursor GetLastBatchCursor(Table* table, int64_t last_time, int64_t batch_length, + const std::vector& cols) { + Table::Cursor cursor(table, + Table::Cursor::StartSpec{Table::Cursor::StartSpec::StartType::StartAtTime, + last_time - 2 * batch_length}, + Table::Cursor::StopSpec{}); + // Advance the cursor so that it points to the last batch and has BatchHints set. + cursor.GetNextRowBatch(cols); + return cursor; +} + // NOLINTNEXTLINE : runtime/references. static void BM_TableReadLastBatchAllHot(benchmark::State& state) { int64_t table_size = 4 * 1024 * 1024; int64_t compaction_size = 64 * 1024; int64_t batch_length = 256; auto table = MakeTable(table_size, compaction_size); - FillTableHot(table.get(), table_size, batch_length); + auto last_time = FillTableHot(table.get(), table_size, batch_length); CHECK_EQ(table->GetTableStats().bytes, table_size); - auto last_slice = table->FirstBatch(); - while (table->NextBatch(last_slice).IsValid()) { - last_slice = table->NextBatch(last_slice); - } + auto last_batch_cursor = GetLastBatchCursor(table.get(), last_time, batch_length, {0, 1}); for (auto _ : state) { - benchmark::DoNotOptimize( - table->GetRowBatchSlice(last_slice, {0, 1}, arrow::default_memory_pool())); + benchmark::DoNotOptimize(last_batch_cursor.GetNextRowBatch({0, 1})); state.PauseTiming(); table = MakeTable(table_size, compaction_size); - FillTableHot(table.get(), table_size, batch_length); + last_time = FillTableHot(table.get(), table_size, batch_length); + last_batch_cursor = GetLastBatchCursor(table.get(), last_time, batch_length, {0, 1}); state.ResumeTiming(); } @@ -149,17 +175,16 @@ static void BM_TableReadLastBatchAllCold(benchmark::State& state) { int64_t compaction_size = 64 * 1024; int64_t batch_length = 256; auto table = MakeTable(table_size, compaction_size); - FillTableCold(table.get(), table_size, batch_length); + auto last_time = FillTableCold(table.get(), table_size, batch_length); CHECK_EQ(table->GetTableStats().bytes, table_size); - auto last_slice = table->FirstBatch(); - while (table->NextBatch(last_slice).IsValid()) { - last_slice = table->NextBatch(last_slice); - } + auto last_batch_cursor = GetLastBatchCursor(table.get(), last_time, batch_length, {0, 1}); for (auto _ : state) { - benchmark::DoNotOptimize( - table->GetRowBatchSlice(last_slice, {0, 1}, arrow::default_memory_pool())); + benchmark::DoNotOptimize(last_batch_cursor.GetNextRowBatch({0, 1})); + state.PauseTiming(); + last_batch_cursor = GetLastBatchCursor(table.get(), last_time, batch_length, {0, 1}); + state.ResumeTiming(); } int64_t batch_size = batch_length * sizeof(int64_t) + batch_length * sizeof(double); @@ -175,7 +200,8 @@ static void BM_TableWriteEmpty(benchmark::State& state) { for (auto _ : state) { state.PauseTiming(); - auto batch = MakeHotBatch(batch_length); + int64_t time_counter = 0; + auto batch = MakeHotBatch(batch_length, &time_counter); state.ResumeTiming(); PL_CHECK_OK(table->TransferRecordBatch(std::move(batch))); state.PauseTiming(); @@ -199,7 +225,8 @@ static void BM_TableWriteFull(benchmark::State& state) { for (auto _ : state) { state.PauseTiming(); - auto batch = MakeHotBatch(batch_length); + int64_t time_counter = 0; + auto batch = MakeHotBatch(batch_length, &time_counter); state.ResumeTiming(); PL_CHECK_OK(table->TransferRecordBatch(std::move(batch))); } @@ -282,12 +309,9 @@ static void BM_TableThreaded(benchmark::State& state) { int64_t batch_counter = 0; while (batch_counter < (num_batches / num_read_threads)) { - auto slice = table_ptr->FirstBatch(); - if (!slice.IsValid()) { - continue; - } + Table::Cursor cursor(table_ptr.get()); auto start = std::chrono::high_resolution_clock::now(); - auto batch_or_s = table_ptr->GetRowBatchSlice(slice, {0}, arrow::default_memory_pool()); + auto batch_or_s = cursor.GetNextRowBatch({0}); auto end = std::chrono::high_resolution_clock::now(); if (!batch_or_s.ok()) { continue; diff --git a/src/table_store/table/table_test.cc b/src/table_store/table/table_test.cc index 3ad3bc08ce3..d9ab4686403 100644 --- a/src/table_store/table/table_test.cc +++ b/src/table_store/table/table_test.cc @@ -57,11 +57,6 @@ std::shared_ptr
TestTable() { } // namespace -static inline BatchSlice BatchSliceFromRowIds(int64_t uniq_row_start_idx, - int64_t uniq_row_end_idx) { - return BatchSlice{false, -1, -1, -1, -1, uniq_row_start_idx, uniq_row_end_idx}; -} - TEST(TableTest, basic_test) { schema::Relation rel({types::DataType::BOOLEAN, types::DataType::INT64}, {"col1", "col2"}); @@ -82,28 +77,15 @@ TEST(TableTest, basic_test) { EXPECT_OK(rb2.AddColumn(types::ToArrow(col2_in2, arrow::default_memory_pool()))); EXPECT_OK(table.WriteRowBatch(rb2)); - auto actual_rb1 = table - .GetRowBatchSlice(table.FirstBatch(), std::vector({0, 1}), - arrow::default_memory_pool()) - .ConsumeValueOrDie(); + Table::Cursor cursor(table_ptr.get()); + + auto actual_rb1 = cursor.GetNextRowBatch(std::vector({0, 1})).ConsumeValueOrDie(); EXPECT_TRUE( actual_rb1->ColumnAt(0)->Equals(types::ToArrow(col1_in1, arrow::default_memory_pool()))); EXPECT_TRUE( actual_rb1->ColumnAt(1)->Equals(types::ToArrow(col2_in1, arrow::default_memory_pool()))); - auto slice = BatchSliceFromRowIds(1, 2); - auto rb1_sliced = - table.GetRowBatchSlice(slice, std::vector({0, 1}), arrow::default_memory_pool()) - .ConsumeValueOrDie(); - EXPECT_TRUE(rb1_sliced->ColumnAt(0)->Equals( - types::ToArrow(std::vector({false, true}), arrow::default_memory_pool()))); - EXPECT_TRUE(rb1_sliced->ColumnAt(1)->Equals( - types::ToArrow(std::vector({2, 3}), arrow::default_memory_pool()))); - - slice = BatchSliceFromRowIds(3, 4); - auto actual_rb2 = - table.GetRowBatchSlice(slice, std::vector({0, 1}), arrow::default_memory_pool()) - .ConsumeValueOrDie(); + auto actual_rb2 = cursor.GetNextRowBatch(std::vector({0, 1})).ConsumeValueOrDie(); EXPECT_TRUE( actual_rb2->ColumnAt(0)->Equals(types::ToArrow(col1_in2, arrow::default_memory_pool()))); EXPECT_TRUE( @@ -124,7 +106,7 @@ TEST(TableTest, bytes_test) { auto col2_rb1_arrow = types::ToArrow(col2_rb1, arrow::default_memory_pool()); EXPECT_OK(rb1.AddColumn(col1_rb1_arrow)); EXPECT_OK(rb1.AddColumn(col2_rb1_arrow)); - int64_t rb1_size = 3 * sizeof(int64_t) + 12 * sizeof(char); + int64_t rb1_size = 3 * sizeof(int64_t) + 12 * sizeof(char) + 3 * sizeof(uint32_t); EXPECT_OK(table.WriteRowBatch(rb1)); EXPECT_EQ(table.GetTableStats().bytes, rb1_size); @@ -136,7 +118,7 @@ TEST(TableTest, bytes_test) { auto col2_rb2_arrow = types::ToArrow(col2_rb2, arrow::default_memory_pool()); EXPECT_OK(rb2.AddColumn(col1_rb2_arrow)); EXPECT_OK(rb2.AddColumn(col2_rb2_arrow)); - int64_t rb2_size = 2 * sizeof(int64_t) + 3 * sizeof(char); + int64_t rb2_size = 2 * sizeof(int64_t) + 3 * sizeof(char) + 2 * sizeof(uint32_t); EXPECT_OK(table.WriteRowBatch(rb2)); EXPECT_EQ(table.GetTableStats().bytes, rb1_size + rb2_size); @@ -156,7 +138,7 @@ TEST(TableTest, bytes_test) { } wrapper_batch_1->push_back(col_wrapper_1); wrapper_batch_1->push_back(col_wrapper_2); - int64_t rb3_size = 3 * sizeof(int64_t) + 9 * sizeof(char); + int64_t rb3_size = 3 * sizeof(int64_t) + 9 * sizeof(char) + 3 * sizeof(uint32_t); EXPECT_OK(table.TransferRecordBatch(std::move(wrapper_batch_1))); @@ -174,7 +156,7 @@ TEST(TableTest, bytes_test_w_compaction) { auto col2_rb1_arrow = types::ToArrow(col2_rb1, arrow::default_memory_pool()); EXPECT_OK(rb1.AddColumn(col1_rb1_arrow)); EXPECT_OK(rb1.AddColumn(col2_rb1_arrow)); - int64_t rb1_size = 3 * sizeof(int64_t) + 12 * sizeof(char); + int64_t rb1_size = 3 * sizeof(int64_t) + 12 * sizeof(char) + 3 * sizeof(uint32_t); schema::RowBatch rb2(rd, 2); std::vector col1_rb2 = {4, 5}; @@ -183,7 +165,7 @@ TEST(TableTest, bytes_test_w_compaction) { auto col2_rb2_arrow = types::ToArrow(col2_rb2, arrow::default_memory_pool()); EXPECT_OK(rb2.AddColumn(col1_rb2_arrow)); EXPECT_OK(rb2.AddColumn(col2_rb2_arrow)); - int64_t rb2_size = 2 * sizeof(int64_t) + 3 * sizeof(char); + int64_t rb2_size = 2 * sizeof(int64_t) + 3 * sizeof(char) + 2 * sizeof(uint32_t); std::vector time_hot_col1 = {1, 5, 3}; std::vector time_hot_col2 = {"test", "abc", "de"}; @@ -200,10 +182,10 @@ TEST(TableTest, bytes_test_w_compaction) { } wrapper_batch_1->push_back(col_wrapper_1); wrapper_batch_1->push_back(col_wrapper_2); - int64_t rb3_size = 3 * sizeof(int64_t) + 9 * sizeof(char); + int64_t rb3_size = 3 * sizeof(int64_t) + 9 * sizeof(char) + 3 * sizeof(uint32_t); - // Make minimum batch size rb1_size + rb2_size so that compaction causes 2 of the 3 batches to be - // compacted into cold. + // Make minimum batch size rb1_size + rb2_size so that compaction causes 2 of the 3 batches to + // be compacted into cold. std::shared_ptr
table_ptr = std::make_shared
("test_table", rel, 128 * 1024, rb1_size + rb2_size); Table& table = *table_ptr; @@ -225,7 +207,7 @@ TEST(TableTest, expiry_test) { auto rd = schema::RowDescriptor({types::DataType::INT64, types::DataType::STRING}); schema::Relation rel(rd.types(), {"col1", "col2"}); - Table table("test_table", rel, 60); + Table table("test_table", rel, 80); schema::RowBatch rb1(rd, 3); std::vector col1_rb1 = {4, 5, 10}; @@ -234,7 +216,7 @@ TEST(TableTest, expiry_test) { auto col2_rb1_arrow = types::ToArrow(col2_rb1, arrow::default_memory_pool()); EXPECT_OK(rb1.AddColumn(col1_rb1_arrow)); EXPECT_OK(rb1.AddColumn(col2_rb1_arrow)); - int64_t rb1_size = 3 * sizeof(int64_t) + 12 * sizeof(char); + int64_t rb1_size = 3 * sizeof(int64_t) + 12 * sizeof(char) + 3 * sizeof(uint32_t); EXPECT_OK(table.WriteRowBatch(rb1)); EXPECT_EQ(table.GetTableStats().bytes, rb1_size); @@ -246,7 +228,7 @@ TEST(TableTest, expiry_test) { auto col2_rb2_arrow = types::ToArrow(col2_rb2, arrow::default_memory_pool()); EXPECT_OK(rb2.AddColumn(col1_rb2_arrow)); EXPECT_OK(rb2.AddColumn(col2_rb2_arrow)); - int64_t rb2_size = 2 * sizeof(int64_t) + 3 * sizeof(char); + int64_t rb2_size = 2 * sizeof(int64_t) + 3 * sizeof(char) + 2 * sizeof(uint32_t); EXPECT_OK(table.WriteRowBatch(rb2)); EXPECT_EQ(table.GetTableStats().bytes, rb1_size + rb2_size); @@ -258,10 +240,10 @@ TEST(TableTest, expiry_test) { auto col2_rb3_arrow = types::ToArrow(col2_rb3, arrow::default_memory_pool()); EXPECT_OK(rb3.AddColumn(col1_rb3_arrow)); EXPECT_OK(rb3.AddColumn(col2_rb3_arrow)); - int64_t rb3_size = 2 * sizeof(int64_t) + 27 * sizeof(char); + int64_t rb3_size = 2 * sizeof(int64_t) + 27 * sizeof(char) + 2 * sizeof(uint32_t); EXPECT_OK(table.WriteRowBatch(rb3)); - EXPECT_EQ(table.GetTableStats().bytes, rb3_size); + EXPECT_EQ(table.GetTableStats().bytes, rb2_size + rb3_size); std::vector time_hot_col1 = {1}; std::vector time_hot_col2 = {"a"}; @@ -278,7 +260,7 @@ TEST(TableTest, expiry_test) { } wrapper_batch_1->push_back(col_wrapper_1); wrapper_batch_1->push_back(col_wrapper_2); - int64_t rb4_size = 1 * sizeof(int64_t) + 1 * sizeof(char); + int64_t rb4_size = 1 * sizeof(int64_t) + 1 * sizeof(char) + 1 * sizeof(uint32_t); EXPECT_OK(table.TransferRecordBatch(std::move(wrapper_batch_1))); @@ -299,7 +281,7 @@ TEST(TableTest, expiry_test) { } wrapper_batch_1_2->push_back(col_wrapper_1_2); wrapper_batch_1_2->push_back(col_wrapper_2_2); - int64_t rb5_size = 5 * sizeof(int64_t) + 20 * sizeof(char); + int64_t rb5_size = 5 * sizeof(int64_t) + 20 * sizeof(char) + 5 * sizeof(uint32_t); EXPECT_OK(table.TransferRecordBatch(std::move(wrapper_batch_1_2))); @@ -317,7 +299,7 @@ TEST(TableTest, expiry_test_w_compaction) { auto col2_rb1_arrow = types::ToArrow(col2_rb1, arrow::default_memory_pool()); EXPECT_OK(rb1.AddColumn(col1_rb1_arrow)); EXPECT_OK(rb1.AddColumn(col2_rb1_arrow)); - int64_t rb1_size = 3 * sizeof(int64_t) + 12 * sizeof(char); + int64_t rb1_size = 3 * sizeof(int64_t) + 12 * sizeof(char) + 3 * sizeof(uint32_t); schema::RowBatch rb2(rd, 2); std::vector col1_rb2 = {4, 5}; @@ -326,7 +308,7 @@ TEST(TableTest, expiry_test_w_compaction) { auto col2_rb2_arrow = types::ToArrow(col2_rb2, arrow::default_memory_pool()); EXPECT_OK(rb2.AddColumn(col1_rb2_arrow)); EXPECT_OK(rb2.AddColumn(col2_rb2_arrow)); - int64_t rb2_size = 2 * sizeof(int64_t) + 3 * sizeof(char); + int64_t rb2_size = 2 * sizeof(int64_t) + 3 * sizeof(char) + 2 * sizeof(uint32_t); schema::RowBatch rb3(rd, 2); std::vector col1_rb3 = {4, 5}; @@ -335,7 +317,7 @@ TEST(TableTest, expiry_test_w_compaction) { auto col2_rb3_arrow = types::ToArrow(col2_rb3, arrow::default_memory_pool()); EXPECT_OK(rb3.AddColumn(col1_rb3_arrow)); EXPECT_OK(rb3.AddColumn(col2_rb3_arrow)); - int64_t rb3_size = 2 * sizeof(int64_t) + 27 * sizeof(char); + int64_t rb3_size = 2 * sizeof(int64_t) + 27 * sizeof(char) + 2 * sizeof(uint32_t); std::vector time_hot_col1 = {1}; std::vector time_hot_col2 = {"a"}; @@ -352,7 +334,7 @@ TEST(TableTest, expiry_test_w_compaction) { } wrapper_batch_1->push_back(col_wrapper_1); wrapper_batch_1->push_back(col_wrapper_2); - int64_t rb4_size = 1 * sizeof(int64_t) + 1 * sizeof(char); + int64_t rb4_size = 1 * sizeof(int64_t) + 1 * sizeof(char) + 1 * sizeof(uint32_t); std::vector time_hot_col1_2 = {1, 2, 3, 4, 5}; std::vector time_hot_col2_2 = {"abcdef", "ghi", "jklmno", "pqr", "tu"}; @@ -369,9 +351,9 @@ TEST(TableTest, expiry_test_w_compaction) { } wrapper_batch_1_2->push_back(col_wrapper_1_2); wrapper_batch_1_2->push_back(col_wrapper_2_2); - int64_t rb5_size = 5 * sizeof(int64_t) + 20 * sizeof(char); + int64_t rb5_size = 5 * sizeof(int64_t) + 20 * sizeof(char) + 5 * sizeof(uint32_t); - Table table("test_table", rel, 60, 40); + Table table("test_table", rel, 80, 40); EXPECT_OK(table.WriteRowBatch(rb1)); EXPECT_EQ(table.GetTableStats().bytes, rb1_size); @@ -380,7 +362,7 @@ TEST(TableTest, expiry_test_w_compaction) { EXPECT_EQ(table.GetTableStats().bytes, rb1_size + rb2_size); EXPECT_OK(table.WriteRowBatch(rb3)); - EXPECT_EQ(table.GetTableStats().bytes, rb3_size); + EXPECT_EQ(table.GetTableStats().bytes, rb2_size + rb3_size); EXPECT_OK(table.TransferRecordBatch(std::move(wrapper_batch_1))); EXPECT_EQ(table.GetTableStats().bytes, rb3_size + rb4_size); @@ -424,7 +406,8 @@ TEST(TableTest, write_row_batch) { EXPECT_OK(table.WriteRowBatch(rb1)); - auto rb_or_s = table.GetRowBatchSlice(table.FirstBatch(), {0, 1}, arrow::default_memory_pool()); + Table::Cursor cursor(table_ptr.get()); + auto rb_or_s = cursor.GetNextRowBatch({0, 1}); ASSERT_OK(rb_or_s); auto actual_rb = rb_or_s.ConsumeValueOrDie(); EXPECT_TRUE(actual_rb->ColumnAt(0)->Equals(col1_rb1_arrow)); @@ -461,17 +444,13 @@ TEST(TableTest, hot_batches_test) { rb_wrapper_2->push_back(col2_in2_wrapper); EXPECT_OK(table.TransferRecordBatch(std::move(rb_wrapper_2))); - auto slice = table.FirstBatch(); - auto rb1 = - table.GetRowBatchSlice(slice, std::vector({0, 1}), arrow::default_memory_pool()) - .ConsumeValueOrDie(); + Table::Cursor cursor(table_ptr.get()); + auto rb1 = cursor.GetNextRowBatch({0, 1}).ConsumeValueOrDie(); EXPECT_TRUE(rb1->ColumnAt(0)->Equals(types::ToArrow(col1_in1, arrow::default_memory_pool()))); EXPECT_TRUE(rb1->ColumnAt(1)->Equals(types::ToArrow(col2_in1, arrow::default_memory_pool()))); - slice = table.NextBatch(slice); - auto rb2 = - table.GetRowBatchSlice(slice, std::vector({0, 1}), arrow::default_memory_pool()) - .ConsumeValueOrDie(); + auto rb2 = cursor.GetNextRowBatch({0, 1}).ConsumeValueOrDie(); + ASSERT_NE(rb2, nullptr); EXPECT_TRUE(rb2->ColumnAt(0)->Equals(types::ToArrow(col1_in2, arrow::default_memory_pool()))); EXPECT_TRUE(rb2->ColumnAt(1)->Equals(types::ToArrow(col2_in2, arrow::default_memory_pool()))); } @@ -502,29 +481,24 @@ TEST(TableTest, hot_batches_w_compaction_test) { rb_wrapper_2->push_back(col1_in2_wrapper); rb_wrapper_2->push_back(col2_in2_wrapper); - Table table("test_table", rel, 128 * 1024, rb1_size + 1); + Table table("test_table", rel, 128 * 1024, rb1_size); EXPECT_OK(table.TransferRecordBatch(std::move(rb_wrapper_1))); EXPECT_OK(table.TransferRecordBatch(std::move(rb_wrapper_2))); - auto slice = table.FirstBatch(); - auto rb1 = - table.GetRowBatchSlice(slice, std::vector({0, 1}), arrow::default_memory_pool()) - .ConsumeValueOrDie(); + Table::Cursor cursor(&table); + auto rb1 = cursor.GetNextRowBatch({0, 1}).ConsumeValueOrDie(); EXPECT_TRUE(rb1->ColumnAt(0)->Equals(types::ToArrow(col1_in1, arrow::default_memory_pool()))); EXPECT_TRUE(rb1->ColumnAt(1)->Equals(types::ToArrow(col2_in1, arrow::default_memory_pool()))); EXPECT_OK(table.CompactHotToCold(arrow::default_memory_pool())); - slice = table.NextBatch(slice); - auto rb2 = - table.GetRowBatchSlice(slice, std::vector({0, 1}), arrow::default_memory_pool()) - .ConsumeValueOrDie(); + auto rb2 = cursor.GetNextRowBatch({0, 1}).ConsumeValueOrDie(); EXPECT_TRUE(rb2->ColumnAt(0)->Equals(types::ToArrow(col1_in2, arrow::default_memory_pool()))); EXPECT_TRUE(rb2->ColumnAt(1)->Equals(types::ToArrow(col2_in2, arrow::default_memory_pool()))); } -TEST(TableTest, find_batch_slice_greater_or_eq) { +TEST(TableTest, find_rowid_from_time_first_greater_than_or_equal) { schema::Relation rel(std::vector({types::DataType::TIME64NS}), std::vector({"time_"})); std::shared_ptr
table_ptr = Table::Create("test_table", rel); @@ -579,48 +553,28 @@ TEST(TableTest, find_batch_slice_greater_or_eq) { wrapper_batch->push_back(col_wrapper); EXPECT_OK(table.TransferRecordBatch(std::move(wrapper_batch))); - auto batch_slice = - table.FindBatchSliceGreaterThanOrEqual(0, arrow::default_memory_pool()).ConsumeValueOrDie(); - EXPECT_EQ(0, batch_slice.uniq_row_start_idx); - EXPECT_EQ(3, batch_slice.uniq_row_end_idx); - - batch_slice = - table.FindBatchSliceGreaterThanOrEqual(5, arrow::default_memory_pool()).ConsumeValueOrDie(); - EXPECT_EQ(3, batch_slice.uniq_row_start_idx); - EXPECT_EQ(3, batch_slice.uniq_row_end_idx); - - batch_slice = - table.FindBatchSliceGreaterThanOrEqual(6, arrow::default_memory_pool()).ConsumeValueOrDie(); - EXPECT_EQ(3, batch_slice.uniq_row_start_idx); - EXPECT_EQ(3, batch_slice.uniq_row_end_idx); - - batch_slice = - table.FindBatchSliceGreaterThanOrEqual(8, arrow::default_memory_pool()).ConsumeValueOrDie(); - EXPECT_EQ(4, batch_slice.uniq_row_start_idx); - EXPECT_EQ(6, batch_slice.uniq_row_end_idx); - - batch_slice = - table.FindBatchSliceGreaterThanOrEqual(10, arrow::default_memory_pool()).ConsumeValueOrDie(); - EXPECT_EQ(9, batch_slice.uniq_row_start_idx); - EXPECT_EQ(9, batch_slice.uniq_row_end_idx); - - batch_slice = - table.FindBatchSliceGreaterThanOrEqual(13, arrow::default_memory_pool()).ConsumeValueOrDie(); - EXPECT_EQ(10, batch_slice.uniq_row_start_idx); - EXPECT_EQ(12, batch_slice.uniq_row_end_idx); - - batch_slice = - table.FindBatchSliceGreaterThanOrEqual(21, arrow::default_memory_pool()).ConsumeValueOrDie(); - EXPECT_EQ(13, batch_slice.uniq_row_start_idx); - EXPECT_EQ(15, batch_slice.uniq_row_end_idx); - - batch_slice = - table.FindBatchSliceGreaterThanOrEqual(24, arrow::default_memory_pool()).ConsumeValueOrDie(); - EXPECT_EQ(-1, batch_slice.uniq_row_start_idx); - EXPECT_EQ(-1, batch_slice.uniq_row_end_idx); + EXPECT_EQ(0, table.FindRowIDFromTimeFirstGreaterThanOrEqual(0)); + + EXPECT_EQ(3, table.FindRowIDFromTimeFirstGreaterThanOrEqual(5)); + + EXPECT_EQ(3, table.FindRowIDFromTimeFirstGreaterThanOrEqual(6)); + + EXPECT_EQ(4, table.FindRowIDFromTimeFirstGreaterThanOrEqual(8)); + + EXPECT_EQ(9, table.FindRowIDFromTimeFirstGreaterThanOrEqual(10)); + + EXPECT_EQ(10, table.FindRowIDFromTimeFirstGreaterThanOrEqual(13)); + + EXPECT_EQ(13, table.FindRowIDFromTimeFirstGreaterThanOrEqual(21)); + + // If the time is not in the table it returns the RowID after the end of the table (which is the + // number of rows that have been added to the table). + EXPECT_EQ(time_batch_1.size() + time_batch_2.size() + time_batch_3.size() + time_batch_4.size() + + time_batch_5.size() + time_batch_6.size(), + table.FindRowIDFromTimeFirstGreaterThanOrEqual(24)); } -TEST(TableTest, find_batch_slice_greater_or_eq_w_compaction) { +TEST(TableTest, find_rowid_from_time_first_greater_than_or_equal_with_compaction) { schema::Relation rel(std::vector({types::DataType::TIME64NS}), std::vector({"time_"})); int64_t compaction_size = 4 * sizeof(int64_t); @@ -640,15 +594,10 @@ TEST(TableTest, find_batch_slice_greater_or_eq_w_compaction) { wrapper_batch->push_back(col_wrapper); EXPECT_OK(table.TransferRecordBatch(std::move(wrapper_batch))); + // Run Compaction. EXPECT_OK(table.CompactHotToCold(arrow::default_memory_pool())); - auto batch_slice = - table.FindBatchSliceGreaterThanOrEqual(0, arrow::default_memory_pool()).ConsumeValueOrDie(); - EXPECT_EQ(0, batch_slice.uniq_row_start_idx); - EXPECT_EQ(3, batch_slice.uniq_row_end_idx); - batch_slice = - table.FindBatchSliceGreaterThanOrEqual(5, arrow::default_memory_pool()).ConsumeValueOrDie(); - EXPECT_EQ(3, batch_slice.uniq_row_start_idx); - EXPECT_EQ(3, batch_slice.uniq_row_end_idx); + EXPECT_EQ(0, table.FindRowIDFromTimeFirstGreaterThanOrEqual(0)); + EXPECT_EQ(3, table.FindRowIDFromTimeFirstGreaterThanOrEqual(5)); wrapper_batch = std::make_unique(); col_wrapper = std::make_shared(3); @@ -664,18 +613,13 @@ TEST(TableTest, find_batch_slice_greater_or_eq_w_compaction) { wrapper_batch->push_back(col_wrapper); EXPECT_OK(table.TransferRecordBatch(std::move(wrapper_batch))); + // Run Compaction. EXPECT_OK(table.CompactHotToCold(arrow::default_memory_pool())); - batch_slice = - table.FindBatchSliceGreaterThanOrEqual(6, arrow::default_memory_pool()).ConsumeValueOrDie(); - EXPECT_EQ(3, batch_slice.uniq_row_start_idx); + EXPECT_EQ(3, table.FindRowIDFromTimeFirstGreaterThanOrEqual(6)); - batch_slice = - table.FindBatchSliceGreaterThanOrEqual(8, arrow::default_memory_pool()).ConsumeValueOrDie(); - EXPECT_EQ(4, batch_slice.uniq_row_start_idx); + EXPECT_EQ(4, table.FindRowIDFromTimeFirstGreaterThanOrEqual(8)); - batch_slice = - table.FindBatchSliceGreaterThanOrEqual(10, arrow::default_memory_pool()).ConsumeValueOrDie(); - EXPECT_EQ(9, batch_slice.uniq_row_start_idx); + EXPECT_EQ(9, table.FindRowIDFromTimeFirstGreaterThanOrEqual(10)); wrapper_batch = std::make_unique(); col_wrapper = std::make_shared(3); @@ -697,20 +641,18 @@ TEST(TableTest, find_batch_slice_greater_or_eq_w_compaction) { col_wrapper->AppendFromVector(time_batch_6); wrapper_batch->push_back(col_wrapper); EXPECT_OK(table.TransferRecordBatch(std::move(wrapper_batch))); + // Run Compaction. EXPECT_OK(table.CompactHotToCold(arrow::default_memory_pool())); - batch_slice = - table.FindBatchSliceGreaterThanOrEqual(13, arrow::default_memory_pool()).ConsumeValueOrDie(); - EXPECT_EQ(10, batch_slice.uniq_row_start_idx); + EXPECT_EQ(10, table.FindRowIDFromTimeFirstGreaterThanOrEqual(13)); - batch_slice = - table.FindBatchSliceGreaterThanOrEqual(21, arrow::default_memory_pool()).ConsumeValueOrDie(); - EXPECT_EQ(13, batch_slice.uniq_row_start_idx); + EXPECT_EQ(13, table.FindRowIDFromTimeFirstGreaterThanOrEqual(21)); - batch_slice = - table.FindBatchSliceGreaterThanOrEqual(24, arrow::default_memory_pool()).ConsumeValueOrDie(); - EXPECT_EQ(-1, batch_slice.uniq_row_start_idx); - EXPECT_EQ(-1, batch_slice.uniq_row_end_idx); + // If the time is not in the table it returns the RowID after the end of the table (which is the + // number of rows that have been added to the table). + EXPECT_EQ(time_batch_1.size() + time_batch_2.size() + time_batch_3.size() + time_batch_4.size() + + time_batch_5.size() + time_batch_6.size(), + table.FindRowIDFromTimeFirstGreaterThanOrEqual(24)); } TEST(TableTest, ToProto) { @@ -719,54 +661,54 @@ TEST(TableTest, ToProto) { EXPECT_OK(table->ToProto(&table_proto)); std::string expected = R"( -relation { - columns { - column_name: "col1" - column_type: FLOAT64 - column_semantic_type: ST_NONE - } - columns { - column_name: "col2" - column_type: INT64 - column_semantic_type: ST_NONE - } -} -row_batches { - cols { - float64_data { - data: 0.5 - data: 1.2 - data: 5.3 - } - } - cols { - int64_data { - data: 1 - data: 2 - data: 3 - } - } - eow: false - eos: false - num_rows: 3 -} -row_batches { - cols { - float64_data { - data: 0.1 - data: 5.1 - } - } - cols { - int64_data { - data: 5 - data: 6 - } - } - eow: true - eos: true - num_rows: 2 -})"; + relation { + columns { + column_name: "col1" + column_type: FLOAT64 + column_semantic_type: ST_NONE + } + columns { + column_name: "col2" + column_type: INT64 + column_semantic_type: ST_NONE + } + } + row_batches { + cols { + float64_data { + data: 0.5 + data: 1.2 + data: 5.3 + } + } + cols { + int64_data { + data: 1 + data: 2 + data: 3 + } + } + eow: false + eos: false + num_rows: 3 + } + row_batches { + cols { + float64_data { + data: 0.1 + data: 5.1 + } + } + cols { + int64_data { + data: 5 + data: 6 + } + } + eow: true + eos: true + num_rows: 2 + })"; google::protobuf::util::MessageDifferencer differ; table_store::schemapb::Table expected_proto; @@ -842,13 +784,17 @@ TEST(TableTest, threaded) { EXPECT_OK(table_ptr->CompactHotToCold(arrow::default_memory_pool())); }); + // Create the cursor before the write thread starts, to ensure that we get every row of the table. + Table::Cursor cursor(table_ptr.get(), Table::Cursor::StartSpec{}, + Table::Cursor::StopSpec{Table::Cursor::StopSpec::StopType::Infinite}); + std::thread writer_thread([table_ptr, done, max_time_counter]() { std::default_random_engine gen; std::uniform_int_distribution dist(256, 1024); int64_t time_counter = 0; - // This RAII wrapper around done, will notify threads waiting on done when it goes out of scope - // if done->Notify hasn't been called yet. This way if the writer thread dies for some reason - // the test will fail immediately instead of timing out. + // This RAII wrapper around done, will notify threads waiting on done when it goes out of + // scope if done->Notify hasn't been called yet. This way if the writer thread dies for some + // reason the test will fail immediately instead of timing out. NotifyOnDeath notifier(done.get()); while (time_counter < max_time_counter) { int64_t batch_size = dist(gen); @@ -869,47 +815,44 @@ TEST(TableTest, threaded) { done->Notify(); }); - std::thread reader_thread([table_ptr, done, max_time_counter]() { + std::thread reader_thread([table_ptr, done, max_time_counter, &cursor]() { int64_t time_counter = 0; - auto slice = table_ptr->FirstBatch(); - while (!slice.IsValid()) { - slice = table_ptr->FirstBatch(); + + // Wait for the writer thread to push some data to the table. + while (!cursor.NextBatchReady()) { + std::this_thread::sleep_for(std::chrono::microseconds{50}); } - // Loop over slices whilst the writer is still writing and we haven't seen all of the data yet. + // Loop over slices whilst the writer is still writing and we haven't seen all of the data + // yet. while (time_counter < max_time_counter && - !done->WaitForNotificationWithTimeout(absl::Milliseconds(1))) { - EXPECT_TRUE(slice.IsValid()); - auto batch = - table_ptr->GetRowBatchSlice(slice, {0}, arrow::default_memory_pool()).ConsumeValueOrDie(); + !done->WaitForNotificationWithTimeout(absl::Microseconds(1))) { + EXPECT_TRUE(cursor.NextBatchReady()); + auto batch = cursor.GetNextRowBatch({0}).ConsumeValueOrDie(); auto time_col = std::static_pointer_cast(batch->ColumnAt(0)); for (int i = 0; i < time_col->length(); ++i) { EXPECT_EQ(time_counter, time_col->Value(i)); time_counter++; } - // If the reader gets ahead of the writer we have to wait for the writer to write more data. - auto next_slice = table_ptr->NextBatch(slice); - if (time_counter < max_time_counter) { - // We check the done notifcation here so that if the writer fails to write all the data for - // some reason we still exit. - while (!next_slice.IsValid() && - !done->WaitForNotificationWithTimeout(absl::Milliseconds(1))) { - next_slice = table_ptr->NextBatch(slice); - } + // If the reader gets ahead of the writer we have to wait for the writer to write more + // data. We check the done notifcation here so that if the writer fails to write all the data + // for some reason, we still exit. + while (time_counter < max_time_counter && !cursor.NextBatchReady() && + !done->WaitForNotificationWithTimeout(absl::Milliseconds(1))) { } - slice = next_slice; } + // Now that the writer is finished move the stop of the cursor to the current end of the table. + cursor.UpdateStopSpec(Table::Cursor::StopSpec{Table::Cursor::StopSpec::CurrentEndOfTable}); + // Once the writer is finished, we loop over the remaining data in the table. - while (time_counter < max_time_counter && slice.IsValid()) { - auto batch = - table_ptr->GetRowBatchSlice(slice, {0}, arrow::default_memory_pool()).ConsumeValueOrDie(); + while (time_counter < max_time_counter && !cursor.Done()) { + auto batch = cursor.GetNextRowBatch({0}).ConsumeValueOrDie(); auto time_col = std::static_pointer_cast(batch->ColumnAt(0)); for (int i = 0; i < time_col->length(); ++i) { - EXPECT_EQ(time_counter, time_col->Value(i)); + ASSERT_EQ(time_counter, time_col->Value(i)); time_counter++; } - slice = table_ptr->NextBatch(slice); } EXPECT_EQ(time_counter, max_time_counter); @@ -920,11 +863,14 @@ TEST(TableTest, threaded) { reader_thread.join(); } +// This test was add when `NextBatch` and `BatchSlice`'s were still around, and there was a bug with +// generation handling of `BatchSlice`'s. Maintaining so as not to decrease test coverage, but this +// bug should no longer even be plausible. TEST(TableTest, NextBatch_generation_bug) { auto rd = schema::RowDescriptor({types::DataType::INT64, types::DataType::STRING}); schema::Relation rel(rd.types(), {"col1", "col2"}); - int64_t rb1_size = 3 * sizeof(int64_t) + 12 * sizeof(char); + int64_t rb1_size = 3 * sizeof(int64_t) + 12 * sizeof(char) + 3 * sizeof(uint32_t); Table table("test_table", rel, rb1_size, rb1_size); schema::RowBatch rb1(rd, 3); @@ -938,16 +884,58 @@ TEST(TableTest, NextBatch_generation_bug) { EXPECT_OK(table.WriteRowBatch(rb1)); EXPECT_OK(table.CompactHotToCold(arrow::default_memory_pool())); - auto slice = table.FirstBatch(); + Table::Cursor cursor(&table, Table::Cursor::StartSpec{}, Table::Cursor::StopSpec{}); // Force cold expiration. EXPECT_OK(table.WriteRowBatch(rb1)); - // Call NextBatch on slice, which before the bug fix updated it's generation when it shouldn't - // have. - table.NextBatch(slice); - // GetRowBatchSlice should return invalidargument since the slice was expired. Prior to the bug - // fix this would segfault. - EXPECT_NOT_OK(table.GetRowBatchSlice(slice, {0, 1}, arrow::default_memory_pool())); + // GetNextRowBatch should return invalidargument since the batch was expired. + EXPECT_NOT_OK(cursor.GetNextRowBatch({0, 1})); } +TEST(TableTest, GetNextRowBatch_after_expiry) { + schema::Relation rel({types::DataType::BOOLEAN, types::DataType::INT64}, {"col1", "col2"}); + + std::vector col1_in1 = {true, false, true}; + auto col1_in1_wrapper = + types::ColumnWrapper::FromArrow(types::ToArrow(col1_in1, arrow::default_memory_pool())); + std::vector col1_in2 = {false, false}; + auto col1_in2_wrapper = + types::ColumnWrapper::FromArrow(types::ToArrow(col1_in2, arrow::default_memory_pool())); + + std::vector col2_in1 = {1, 2, 3}; + auto col2_in1_wrapper = + types::ColumnWrapper::FromArrow(types::ToArrow(col2_in1, arrow::default_memory_pool())); + std::vector col2_in2 = {5, 6}; + auto col2_in2_wrapper = + types::ColumnWrapper::FromArrow(types::ToArrow(col2_in2, arrow::default_memory_pool())); + + auto rb_wrapper_1 = std::make_unique(); + rb_wrapper_1->push_back(col1_in1_wrapper); + rb_wrapper_1->push_back(col2_in1_wrapper); + int64_t rb1_size = 3 * sizeof(bool) + 3 * sizeof(int64_t); + + auto rb_wrapper_2 = std::make_unique(); + rb_wrapper_2->push_back(col1_in2_wrapper); + rb_wrapper_2->push_back(col2_in2_wrapper); + int64_t rb2_size = 2 * sizeof(bool) + 2 * sizeof(int64_t); + + Table table("test_table", rel, rb1_size + rb2_size, rb1_size); + + EXPECT_OK(table.TransferRecordBatch(std::move(rb_wrapper_1))); + EXPECT_OK(table.TransferRecordBatch(std::move(rb_wrapper_2))); + + Table::Cursor cursor(&table); + + // This write will expire the first batch. + auto rb_wrapper_1_copy = std::make_unique(); + rb_wrapper_1_copy->push_back(col1_in1_wrapper); + rb_wrapper_1_copy->push_back(col2_in1_wrapper); + EXPECT_OK(table.TransferRecordBatch(std::move(rb_wrapper_1_copy))); + + // GetNextRowBatch should return the second row batch since the first one was expired by the third + // write. + auto rb1 = cursor.GetNextRowBatch({0, 1}).ConsumeValueOrDie(); + EXPECT_TRUE(rb1->ColumnAt(0)->Equals(types::ToArrow(col1_in2, arrow::default_memory_pool()))); + EXPECT_TRUE(rb1->ColumnAt(1)->Equals(types::ToArrow(col2_in2, arrow::default_memory_pool()))); +} } // namespace table_store } // namespace px