Skip to content

Commit

Permalink
[Enhancement] enable load spill by default (StarRocks#55361)
Browse files Browse the repository at this point in the history
Signed-off-by: luohaha <18810541851@163.com>
  • Loading branch information
luohaha authored Jan 26, 2025
1 parent e52cb9e commit 9cc700a
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 23 deletions.
2 changes: 1 addition & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1547,7 +1547,7 @@ CONF_mInt32(merge_commit_txn_state_expire_time_sec, "1800");
CONF_mInt32(merge_commit_txn_state_poll_interval_ms, "2000");
CONF_mInt32(merge_commit_txn_state_poll_max_fail_times, "2");

CONF_mBool(enable_load_spill, "false");
CONF_mBool(enable_load_spill, "true");
// Max chunk bytes which allow to spill per flush. Default is 10MB.
CONF_mInt64(load_spill_max_chunk_bytes, "10485760");
// Max merge input bytes during spill merge. Default is 1024MB.
Expand Down
1 change: 1 addition & 0 deletions be/test/storage/lake/async_delta_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,7 @@ TEST_F(LakeAsyncDeltaWriterTest, test_flush) {
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_schema_id(_tablet_schema->id())
.set_profile(&_dummy_runtime_profile)
.build());
ASSERT_OK(delta_writer->open());
delta_writer->write(&chunk0, indexes.data(), indexes.size(), [&](const Status& st) { ASSERT_OK(st); });
Expand Down
6 changes: 6 additions & 0 deletions be/test/storage/lake/compaction_task_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ class LakeCompactionTest : public TestBase, testing::WithParamInterface<Compacti
config::min_cumulative_compaction_num_singleton_deltas = _min_cumulative_compaction_num_singleton_deltas;
}

RuntimeProfile _dummy_runtime_profile{"dummy"};

private:
bool _enable_size_tiered_compaction_strategy = config::enable_size_tiered_compaction_strategy;
int64_t _vertical_compaction_max_columns_per_group = config::vertical_compaction_max_columns_per_group;
Expand Down Expand Up @@ -154,6 +156,7 @@ TEST_P(LakeDuplicateKeyCompactionTest, test1) {
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_schema_id(_tablet_schema->id())
.set_profile(&_dummy_runtime_profile)
.build());
ASSERT_OK(delta_writer->open());
ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size()));
Expand Down Expand Up @@ -292,6 +295,7 @@ TEST_P(LakeDuplicateKeyOverlapSegmentsCompactionTest, test) {
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_schema_id(_tablet_schema->id())
.set_profile(&_dummy_runtime_profile)
.build());
ASSERT_OK(delta_writer->open());
for (int j = 0; j < i + 1; ++j) {
Expand Down Expand Up @@ -451,6 +455,7 @@ TEST_P(LakeUniqueKeyCompactionTest, test1) {
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_schema_id(_tablet_schema->id())
.set_profile(&_dummy_runtime_profile)
.build());
ASSERT_OK(delta_writer->open());
ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size()));
Expand Down Expand Up @@ -574,6 +579,7 @@ TEST_P(LakeUniqueKeyCompactionWithDeleteTest, test_base_compaction_with_delete)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_schema_id(_tablet_schema->id())
.set_profile(&_dummy_runtime_profile)
.build());
ASSERT_OK(delta_writer->open());
ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size()));
Expand Down
40 changes: 25 additions & 15 deletions be/test/storage/lake/delta_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class LakeDeltaWriterTest : public TestBase {
std::shared_ptr<TabletSchema> _tablet_schema;
std::shared_ptr<Schema> _schema;
int64_t _partition_id = 456;
RuntimeProfile _dummy_runtime_profile{"dummy"};
};

TEST_F(LakeDeltaWriterTest, test_build) {
Expand Down Expand Up @@ -154,6 +155,7 @@ TEST_F(LakeDeltaWriterTest, test_open) {
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_schema_id(_tablet_schema->id())
.set_profile(&_dummy_runtime_profile)
.build());
ASSERT_OK(delta_writer->open());
delta_writer->close();
Expand All @@ -180,6 +182,7 @@ TEST_F(LakeDeltaWriterTest, test_write) {
.set_mem_tracker(_mem_tracker.get())
.set_schema_id(_tablet_schema->id())
.set_immutable_tablet_size(1)
.set_profile(&_dummy_runtime_profile)
.build());
ASSERT_OK(delta_writer->open());

Expand All @@ -202,18 +205,16 @@ TEST_F(LakeDeltaWriterTest, test_write) {
ASSERT_FALSE(txnlog->has_op_compaction());
ASSERT_FALSE(txnlog->has_op_schema_change());
ASSERT_TRUE(txnlog->op_write().has_rowset());
ASSERT_EQ(2, txnlog->op_write().rowset().segments_size());
ASSERT_TRUE(txnlog->op_write().rowset().overlapped());
ASSERT_EQ(1, txnlog->op_write().rowset().segments_size());
ASSERT_FALSE(txnlog->op_write().rowset().overlapped());
ASSERT_EQ(2 * kChunkSize, txnlog->op_write().rowset().num_rows());
ASSERT_GT(txnlog->op_write().rowset().data_size(), 0);

// Check segment file
ASSIGN_OR_ABORT(auto fs, FileSystem::CreateSharedFromString(kTestDirectory));
auto path0 = _tablet_mgr->segment_location(tablet_id, txnlog->op_write().rowset().segments(0));
auto path1 = _tablet_mgr->segment_location(tablet_id, txnlog->op_write().rowset().segments(1));

ASSIGN_OR_ABORT(auto seg0, Segment::open(fs, FileInfo{path0}, 0, _tablet_schema));
ASSIGN_OR_ABORT(auto seg1, Segment::open(fs, FileInfo{path1}, 1, _tablet_schema));

OlapReaderStatistics statistics;
SegmentReadOptions opts;
Expand All @@ -226,18 +227,17 @@ TEST_F(LakeDeltaWriterTest, test_write) {
ASSIGN_OR_ABORT(auto seg_iter, segment->new_iterator(*_schema, opts));
auto read_chunk_ptr = ChunkHelper::new_chunk(*_schema, 1024);
ASSERT_OK(seg_iter->get_next(read_chunk_ptr.get()));
ASSERT_EQ(kChunkSize, read_chunk_ptr->num_rows());
for (int i = 0; i < kChunkSize; i++) {
EXPECT_EQ(i, read_chunk_ptr->get(i)[0].get_int32());
EXPECT_EQ(i * 3, read_chunk_ptr->get(i)[1].get_int32());
ASSERT_EQ(kChunkSize * 2, read_chunk_ptr->num_rows());
for (int i = 0; i < kChunkSize * 2; i += 2) {
EXPECT_EQ(i / 2, read_chunk_ptr->get(i)[0].get_int32());
EXPECT_EQ((i / 2) * 3, read_chunk_ptr->get(i)[1].get_int32());
}
read_chunk_ptr->reset();
ASSERT_TRUE(seg_iter->get_next(read_chunk_ptr.get()).is_end_of_file());
seg_iter->close();
};

check_segment(seg0);
check_segment(seg1);
}

TEST_F(LakeDeltaWriterTest, test_write_without_schema_file) {
Expand Down Expand Up @@ -269,6 +269,7 @@ TEST_F(LakeDeltaWriterTest, test_write_without_schema_file) {
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_schema_id(_tablet_schema->id())
.set_profile(&_dummy_runtime_profile)
.build());

ASSERT_OK(delta_writer->open());
Expand Down Expand Up @@ -304,6 +305,7 @@ TEST_F(LakeDeltaWriterTest, test_close) {
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_schema_id(_tablet_schema->id())
.set_profile(&_dummy_runtime_profile)
.build());
ASSERT_OK(delta_writer->open());

Expand Down Expand Up @@ -343,6 +345,7 @@ TEST_F(LakeDeltaWriterTest, test_finish_without_write_txn_log) {
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_schema_id(_tablet_schema->id())
.set_profile(&_dummy_runtime_profile)
.build());
ASSERT_OK(delta_writer->open());

Expand Down Expand Up @@ -375,6 +378,7 @@ TEST_F(LakeDeltaWriterTest, test_empty_write) {
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_schema_id(_tablet_schema->id())
.set_profile(&_dummy_runtime_profile)
.build());
ASSERT_OK(delta_writer->open());
ASSERT_OK(delta_writer->finish_with_txnlog());
Expand Down Expand Up @@ -404,6 +408,7 @@ TEST_F(LakeDeltaWriterTest, test_negative_txn_id) {
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_schema_id(_tablet_schema->id())
.set_profile(&_dummy_runtime_profile)
.build());
ASSERT_OK(delta_writer->open());
ASSERT_ERROR(delta_writer->finish_with_txnlog());
Expand All @@ -429,6 +434,7 @@ TEST_F(LakeDeltaWriterTest, test_memory_limit_unreached) {
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_schema_id(_tablet_schema->id())
.set_profile(&_dummy_runtime_profile)
.build());
ASSERT_OK(delta_writer->open());

Expand Down Expand Up @@ -479,6 +485,7 @@ TEST_F(LakeDeltaWriterTest, test_reached_memory_limit) {
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_schema_id(_tablet_schema->id())
.set_profile(&_dummy_runtime_profile)
.build());
ASSERT_OK(delta_writer->open());

Expand All @@ -502,8 +509,8 @@ TEST_F(LakeDeltaWriterTest, test_reached_memory_limit) {
ASSERT_FALSE(txnlog->has_op_compaction());
ASSERT_FALSE(txnlog->has_op_schema_change());
ASSERT_TRUE(txnlog->op_write().has_rowset());
ASSERT_EQ(3, txnlog->op_write().rowset().segments_size());
ASSERT_TRUE(txnlog->op_write().rowset().overlapped());
ASSERT_EQ(1, txnlog->op_write().rowset().segments_size());
ASSERT_FALSE(txnlog->op_write().rowset().overlapped());
ASSERT_EQ(3 * kChunkSize, txnlog->op_write().rowset().num_rows());
ASSERT_GT(txnlog->op_write().rowset().data_size(), 0);
}
Expand Down Expand Up @@ -531,6 +538,7 @@ TEST_F(LakeDeltaWriterTest, test_reached_parent_memory_limit) {
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_schema_id(_tablet_schema->id())
.set_profile(&_dummy_runtime_profile)
.build());
ASSERT_OK(delta_writer->open());

Expand All @@ -553,8 +561,8 @@ TEST_F(LakeDeltaWriterTest, test_reached_parent_memory_limit) {
ASSERT_FALSE(txnlog->has_op_compaction());
ASSERT_FALSE(txnlog->has_op_schema_change());
ASSERT_TRUE(txnlog->op_write().has_rowset());
ASSERT_EQ(3, txnlog->op_write().rowset().segments_size());
ASSERT_TRUE(txnlog->op_write().rowset().overlapped());
ASSERT_EQ(1, txnlog->op_write().rowset().segments_size());
ASSERT_FALSE(txnlog->op_write().rowset().overlapped());
ASSERT_EQ(3 * kChunkSize, txnlog->op_write().rowset().num_rows());
ASSERT_GT(txnlog->op_write().rowset().data_size(), 0);
}
Expand Down Expand Up @@ -583,6 +591,7 @@ TEST_F(LakeDeltaWriterTest, test_memtable_full) {
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_schema_id(_tablet_schema->id())
.set_profile(&_dummy_runtime_profile)
.build());
ASSERT_OK(delta_writer->open());

Expand All @@ -604,8 +613,8 @@ TEST_F(LakeDeltaWriterTest, test_memtable_full) {
ASSERT_FALSE(txnlog->has_op_compaction());
ASSERT_FALSE(txnlog->has_op_schema_change());
ASSERT_TRUE(txnlog->op_write().has_rowset());
ASSERT_EQ(3, txnlog->op_write().rowset().segments_size());
ASSERT_TRUE(txnlog->op_write().rowset().overlapped());
ASSERT_EQ(1, txnlog->op_write().rowset().segments_size());
ASSERT_FALSE(txnlog->op_write().rowset().overlapped());
ASSERT_EQ(3 * kChunkSize, txnlog->op_write().rowset().num_rows());
ASSERT_GT(txnlog->op_write().rowset().data_size(), 0);
}
Expand All @@ -632,6 +641,7 @@ TEST_F(LakeDeltaWriterTest, test_write_oom) {
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_schema_id(_tablet_schema->id())
.set_profile(&_dummy_runtime_profile)
.set_immutable_tablet_size(1)
.build());
ASSERT_OK(delta_writer->open());
Expand Down
6 changes: 6 additions & 0 deletions be/test/storage/lake/lake_primary_key_consistency_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa
.set_mem_tracker(_mem_tracker.get())
.set_schema_id(_tablet_schema->id())
.set_slot_descriptors(&_slot_pointers)
.set_profile(&_dummy_runtime_profile)
.build());
RETURN_IF_ERROR(delta_writer->open());
size_t upsert_size = _random_generator->random() % MaxUpsert;
Expand Down Expand Up @@ -425,6 +426,7 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa
.set_schema_id(_tablet_schema->id())
.set_slot_descriptors(&_partial_slot_pointers)
.set_partial_update_mode(mode)
.set_profile(&_dummy_runtime_profile)
.build());
RETURN_IF_ERROR(delta_writer->open());
size_t upsert_size = _random_generator->random() % MaxUpsert;
Expand Down Expand Up @@ -461,6 +463,7 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa
.set_schema_id(_tablet_schema->id())
.set_slot_descriptors(&_slot_pointers)
.set_merge_condition(merge_condition)
.set_profile(&_dummy_runtime_profile)
.build());
RETURN_IF_ERROR(delta_writer->open());
size_t upsert_size = _random_generator->random() % MaxUpsert;
Expand Down Expand Up @@ -492,6 +495,7 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa
.set_mem_tracker(_mem_tracker.get())
.set_schema_id(_tablet_schema->id())
.set_slot_descriptors(&_slot_pointers)
.set_profile(&_dummy_runtime_profile)
.build());
RETURN_IF_ERROR(delta_writer->open());
size_t upsert_size = _random_generator->random() % MaxUpsert;
Expand Down Expand Up @@ -529,6 +533,7 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa
.set_mem_tracker(_mem_tracker.get())
.set_schema_id(_tablet_schema->id())
.set_slot_descriptors(&_slot_pointers)
.set_profile(&_dummy_runtime_profile)
.build());
RETURN_IF_ERROR(delta_writer->open());
RETURN_IF_ERROR(
Expand Down Expand Up @@ -639,6 +644,7 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa
std::vector<SlotDescriptor*> _partial_slot_pointers;
Chunk::SlotHashMap _slot_cid_map;
int64_t _version = 0;
RuntimeProfile _dummy_runtime_profile{"dummy"};

int _seed = 0;
int _run_second = 0;
Expand Down
Loading

0 comments on commit 9cc700a

Please sign in to comment.