Skip to content

Commit

Permalink
[Enhancement] Refactor lake compaction task cancel checker in BE to e…
Browse files Browse the repository at this point in the history
…nsure it can stop asap (backport #54832) (#55633)

Co-authored-by: Drake Wang <wxl24life@gmail.com>
  • Loading branch information
mergify[bot] and wxl24life authored Feb 7, 2025
1 parent a236afa commit 6dcb0f8
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 100 deletions.
2 changes: 1 addition & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1027,7 +1027,7 @@ CONF_mInt64(lake_metadata_cache_limit, /*2GB=*/"2147483648");
CONF_mBool(lake_print_delete_log, "false");
CONF_mInt64(lake_compaction_stream_buffer_size_bytes, "1048576"); // 1MB
// The interval to check whether lake compaction is valid. Set to <= 0 to disable the check.
CONF_mInt32(lake_compaction_check_valid_interval_minutes, "30"); // 30 minutes
CONF_mInt32(lake_compaction_check_valid_interval_minutes, "10"); // 10 minutes
// Used to ensure service availability in extreme situations by sacrificing a certain degree of correctness
CONF_mBool(experimental_lake_ignore_lost_segment, "false");
CONF_mInt64(experimental_lake_wait_per_put_ms, "0");
Expand Down
103 changes: 53 additions & 50 deletions be/src/storage/lake/compaction_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,57 @@ void CompactionTaskCallback::finish_task(std::unique_ptr<CompactionTaskContext>&
}
}

Status CompactionTaskCallback::is_txn_still_valid() {
RETURN_IF_ERROR(has_error());
auto check_interval_seconds = 60L * config::lake_compaction_check_valid_interval_minutes;
if (check_interval_seconds <= 0) {
return Status::OK();
}
// try_lock failed means other thread is checking txn
if (!_txn_valid_check_mutex.try_lock()) {
return Status::OK();
}
DeferOp defer([&]() { _txn_valid_check_mutex.unlock(); });
// check again after acquired lock
auto now = time(nullptr);
if (now <= _last_check_time || (now - _last_check_time) < check_interval_seconds) {
return Status::OK();
}
// ask FE whether this compaction transaction is still valid
#ifndef BE_TEST
TNetworkAddress master_addr = get_master_address();
if (master_addr.hostname.size() > 0 && master_addr.port > 0) {
TReportLakeCompactionRequest request;
request.__set_txn_id(_request->txn_id());
TReportLakeCompactionResponse result;
auto status = ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, &result](FrontendServiceConnection& client) {
client->reportLakeCompaction(result, request);
},
3000 /* timeout 3 seconds */);
if (status.ok()) {
if (!result.valid) {
// notify all tablets in this compaction request
LOG(WARNING) << "abort invalid compaction transaction " << _request->txn_id();
Status rs = Status::Aborted("compaction validation failed");
update_status(rs);
return rs; // should cancel compaction
} else {
// everything is fine
}
} else {
LOG(WARNING) << "fail to validate compaction transaction " << _request->txn_id() << ", error: " << status;
}
} else {
LOG(WARNING) << "fail to validate compaction transaction " << _request->txn_id()
<< ", error: leader FE address not found";
}
#endif
_last_check_time = time(nullptr);
return Status::OK();
}

CompactionScheduler::CompactionScheduler(TabletManager* tablet_mgr)
: _tablet_mgr(tablet_mgr),
_limiter(config::compact_threads),
Expand Down Expand Up @@ -181,18 +232,16 @@ void CompactionScheduler::compact(::google::protobuf::RpcController* controller,
// thread to avoid blocking other transactions, but if there are idle threads, they will steal
// tasks from busy threads to execute.
auto cb = std::make_shared<CompactionTaskCallback>(this, request, response, done);
bool is_checker = true; // make the first tablet as checker
std::vector<std::unique_ptr<CompactionTaskContext>> contexts_vec;
for (auto tablet_id : request->tablet_ids()) {
auto context = std::make_unique<CompactionTaskContext>(request->txn_id(), tablet_id, request->version(),
request->force_base_compaction(), is_checker, cb);
request->force_base_compaction(), cb);
{
std::lock_guard l(_contexts_lock);
_contexts.Append(context.get());
}
contexts_vec.push_back(std::move(context));
// DO NOT touch `context` from here!
is_checker = false;
}
// initialize last check time, compact request is received right after FE sends it, so consider it valid now
cb->set_last_check_time(time(nullptr));
Expand Down Expand Up @@ -301,53 +350,7 @@ void CompactionScheduler::thread_task(int id) {
}

Status compaction_should_cancel(CompactionTaskContext* context) {
RETURN_IF_ERROR(context->callback->has_error());

int64_t check_interval_seconds = 60LL * config::lake_compaction_check_valid_interval_minutes;
if (!context->is_checker || check_interval_seconds <= 0) {
return Status::OK();
}

int64_t now = time(nullptr);
int64_t last_check_time = context->callback->last_check_time();
if (now > last_check_time && (now - last_check_time) >= check_interval_seconds) {
// ask FE whether this compaction transaction is still valid
#ifndef BE_TEST
TNetworkAddress master_addr = get_master_address();
if (master_addr.hostname.size() > 0 && master_addr.port > 0) {
TReportLakeCompactionRequest request;
request.__set_txn_id(context->txn_id);
TReportLakeCompactionResponse result;
auto status = ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, &result](FrontendServiceConnection& client) {
client->reportLakeCompaction(result, request);
},
3000 /* timeout 3 seconds */);
if (status.ok()) {
if (!result.valid) {
// notify all tablets in this compaction request
LOG(WARNING) << "validate compaction transaction " << context->txn_id << " for tablet "
<< context->tablet_id << ", abort invalid compaction";
Status rs = Status::Aborted("compaction validation failed");
context->callback->update_status(rs);
return rs; // should cancel compaction
} else {
// everything is fine
}
} else {
LOG(WARNING) << "fail to validate compaction transaction " << context->txn_id << " for tablet "
<< context->tablet_id << ", error: " << status;
}
} else {
LOG(WARNING) << "fail to validate compaction transaction " << context->txn_id << " for tablet "
<< context->tablet_id << ", error: leader FE address not found";
}
#endif
// update check time, if check rpc failed, wait next round
context->callback->set_last_check_time(now);
}
return Status::OK();
return context->callback->is_txn_still_valid();
}

Status CompactionScheduler::do_compaction(std::unique_ptr<CompactionTaskContext> context) {
Expand Down
11 changes: 8 additions & 3 deletions be/src/storage/lake/compaction_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,18 @@ class CompactionTaskCallback {
bool allow_partial_success() const;

void set_last_check_time(int64_t now) {
std::lock_guard l(_mtx);
std::lock_guard l(_txn_valid_check_mutex);
_last_check_time = now;
}

int64_t last_check_time() const {
std::lock_guard l(_mtx);
int64_t TEST_get_last_check_time() const {
std::lock_guard l(_txn_valid_check_mutex);
return _last_check_time;
}

// check if txn in FE still valid while compaction task (specified by `context`) is running
Status is_txn_still_valid();

private:
const static int64_t kDefaultTimeoutMs = 24L * 60 * 60 * 1000; // 1 day

Expand All @@ -93,6 +96,8 @@ class CompactionTaskCallback {
// compaction's last check time in second, initialized when first put into task queue,
// used to help check whether it's valid periodically, task's in queue time is considered
int64_t _last_check_time;
// use lock to protect _last_check_time and prevent multiple rpc called
mutable std::mutex _txn_valid_check_mutex;
std::vector<std::unique_ptr<CompactionTaskContext>> _contexts;
};

Expand Down
5 changes: 1 addition & 4 deletions be/src/storage/lake/compaction_task_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,11 @@ struct CompactionTaskStats {
// Context of a single tablet compaction task.
struct CompactionTaskContext : public butil::LinkNode<CompactionTaskContext> {
explicit CompactionTaskContext(int64_t txn_id_, int64_t tablet_id_, int64_t version_, bool force_base_compaction_,
bool is_checker_, std::shared_ptr<CompactionTaskCallback> cb_)
std::shared_ptr<CompactionTaskCallback> cb_)
: txn_id(txn_id_),
tablet_id(tablet_id_),
version(version_),
force_base_compaction(force_base_compaction_),
is_checker(is_checker_),
callback(std::move(cb_)) {}

#ifndef NDEBUG
Expand All @@ -82,8 +81,6 @@ struct CompactionTaskContext : public butil::LinkNode<CompactionTaskContext> {
std::atomic<int64_t> finish_time{0};
std::atomic<bool> skipped{false};
std::atomic<int> runs{0};
// the first tablet of a compaction request, will ask FE periodically to see if compaction is valid
bool is_checker;
Status status;
Progress progress;
int64_t enqueue_time_sec; // time point when put into queue
Expand Down
38 changes: 26 additions & 12 deletions be/test/storage/lake/compaction_scheduler_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,15 @@ class LakeCompactionSchedulerTest : public TestBase {

TEST_F(LakeCompactionSchedulerTest, test_task_queue) {
CompactionScheduler::WrapTaskQueues queue(10);
auto ctx =
std::make_unique<CompactionTaskContext>(100 /* txn_id */, 101 /* tablet_id */, 1 /* version */,
false /* force_base_compaction */, false /* is_checker */, nullptr);
auto ctx = std::make_unique<CompactionTaskContext>(100 /* txn_id */, 101 /* tablet_id */, 1 /* version */,
false /* force_base_compaction */, nullptr);
queue.set_target_size(5);
ASSERT_EQ(5, queue.target_size());
queue.put_by_txn_id(ctx->txn_id, ctx);

std::vector<std::unique_ptr<CompactionTaskContext>> v;
auto ctx2 =
std::make_unique<CompactionTaskContext>(101 /* txn_id */, 102 /* tablet_id */, 1 /* version */,
false /* force_base_compaction */, false /* is_checker */, nullptr);
auto ctx2 = std::make_unique<CompactionTaskContext>(101 /* txn_id */, 102 /* tablet_id */, 1 /* version */,
false /* force_base_compaction */, nullptr);
v.push_back(std::move(ctx2));
queue.put_by_txn_id(101 /* txn_id */, v);
}
Expand Down Expand Up @@ -113,26 +111,42 @@ TEST_F(LakeCompactionSchedulerTest, test_compaction_cancel) {
{
auto cb = std::make_shared<CompactionTaskCallback>(nullptr, &request, &response, nullptr);
CompactionTaskContext ctx(100 /* txn_id */, 101 /* tablet_id */, 1 /* version */,
false /* force_base_compaction */, false /* is_checker */, cb);
false /* force_base_compaction */, cb);
cb->update_status(Status::Aborted("aborted for test"));
EXPECT_FALSE(compaction_should_cancel(&ctx).ok());
}

// not checker
// not valid time interval, should return early
{
auto check_interval = config::lake_compaction_check_valid_interval_minutes;
config::lake_compaction_check_valid_interval_minutes = -1;
auto cb = std::make_shared<CompactionTaskCallback>(nullptr, &request, &response, nullptr);
CompactionTaskContext ctx(100 /* txn_id */, 101 /* tablet_id */, 1 /* version */,
false /* force_base_compaction */, false /* is_checker */, cb);
false /* force_base_compaction */, cb);
EXPECT_TRUE(compaction_should_cancel(&ctx).ok());
config::lake_compaction_check_valid_interval_minutes = check_interval;
}

// is checker
// try_lock succeed and check time not satisfied
{
auto check_interval = config::lake_compaction_check_valid_interval_minutes;
config::lake_compaction_check_valid_interval_minutes = 24 * 60; // set to a big value
auto cb = std::make_shared<CompactionTaskCallback>(nullptr, &request, &response, nullptr);
CompactionTaskContext ctx(100 /* txn_id */, 101 /* tablet_id */, 1 /* version */,
false /* force_base_compaction */, true /* is_checker */, cb);
cb->set_last_check_time(0);
false /* force_base_compaction */, cb);

cb->set_last_check_time(time(nullptr));
EXPECT_TRUE(compaction_should_cancel(&ctx).ok());
config::lake_compaction_check_valid_interval_minutes = check_interval;

// give another try, should acquire the lock successfully
// try_lock succeed and check time satisfied, should cancel succeed
check_interval = config::lake_compaction_check_valid_interval_minutes;
auto last_check_time_val = time(nullptr) - 60 * check_interval;
cb->set_last_check_time(last_check_time_val);
EXPECT_TRUE(compaction_should_cancel(&ctx).ok());
// make sure _last_check_time value is updated
EXPECT_TRUE(cb->TEST_get_last_check_time() > last_check_time_val);
}
}

Expand Down
3 changes: 1 addition & 2 deletions be/test/storage/lake/compaction_task_context_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class CompactionTaskContextTest : public testing::Test {
protected:
// Implement a mock version of CompactionTaskCallback if needed
std::shared_ptr<CompactionTaskCallback> callback;
CompactionTaskContext context{123, 456, 789, false, false, callback};
CompactionTaskContext context{123, 456, 789, false, callback};

void SetUp() override {
// Initialize your context or mock callback here if necessary
Expand All @@ -54,7 +54,6 @@ TEST_F(CompactionTaskContextTest, test_constructor) {
EXPECT_EQ(123, context.txn_id);
EXPECT_EQ(456, context.tablet_id);
EXPECT_EQ(789, context.version);
EXPECT_EQ(false, context.is_checker);
}

TEST_F(CompactionTaskContextTest, test_calculation) {
Expand Down
12 changes: 6 additions & 6 deletions be/test/storage/lake/compaction_task_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ TEST_P(LakeDuplicateKeyCompactionTest, test1) {
ASSERT_EQ(kChunkSize * 3, read(version));

auto txn_id = next_id();
auto task_context = std::make_unique<CompactionTaskContext>(txn_id, tablet_id, version, false, false, nullptr);
auto task_context = std::make_unique<CompactionTaskContext>(txn_id, tablet_id, version, false, nullptr);
ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get()));
check_task(task);
ASSERT_OK(task->execute(CompactionTask::kNoCancelFn));
Expand Down Expand Up @@ -203,7 +203,7 @@ TEST_P(LakeDuplicateKeyCompactionTest, test_empty_tablet) {

auto txn_id = next_id();
auto tablet_id = _tablet_metadata->id();
auto task_context = std::make_unique<CompactionTaskContext>(txn_id, tablet_id, version, false, false, nullptr);
auto task_context = std::make_unique<CompactionTaskContext>(txn_id, tablet_id, version, false, nullptr);
ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get()));
ASSERT_OK(task->execute(CompactionTask::kNoCancelFn));
EXPECT_EQ(100, task_context->progress.value());
Expand Down Expand Up @@ -309,7 +309,7 @@ TEST_P(LakeDuplicateKeyOverlapSegmentsCompactionTest, test) {
// Cancelled compaction task
{
auto txn_id = next_id();
auto task_context = std::make_unique<CompactionTaskContext>(txn_id, tablet_id, version, false, false, nullptr);
auto task_context = std::make_unique<CompactionTaskContext>(txn_id, tablet_id, version, false, nullptr);
ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get()));
check_task(task);
auto st = task->execute(CompactionTask::kCancelledFn);
Expand All @@ -319,7 +319,7 @@ TEST_P(LakeDuplicateKeyOverlapSegmentsCompactionTest, test) {
// Completed compaction task without error
{
auto txn_id = next_id();
auto task_context = std::make_unique<CompactionTaskContext>(txn_id, tablet_id, version, false, false, nullptr);
auto task_context = std::make_unique<CompactionTaskContext>(txn_id, tablet_id, version, false, nullptr);
ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get()));
check_task(task);
ASSERT_OK(task->execute(CompactionTask::kNoCancelFn));
Expand Down Expand Up @@ -463,7 +463,7 @@ TEST_P(LakeUniqueKeyCompactionTest, test1) {
ASSERT_EQ(kChunkSize, read(version));

auto txn_id = next_id();
auto task_context = std::make_unique<CompactionTaskContext>(txn_id, tablet_id, version, false, false, nullptr);
auto task_context = std::make_unique<CompactionTaskContext>(txn_id, tablet_id, version, false, nullptr);
ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get()));
check_task(task);
ASSERT_OK(task->execute(CompactionTask::kNoCancelFn));
Expand Down Expand Up @@ -610,7 +610,7 @@ TEST_P(LakeUniqueKeyCompactionWithDeleteTest, test_base_compaction_with_delete)
}

auto txn_id = next_id();
auto task_context = std::make_unique<CompactionTaskContext>(txn_id, tablet_id, version, false, false, nullptr);
auto task_context = std::make_unique<CompactionTaskContext>(txn_id, tablet_id, version, false, nullptr);
ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get()));
check_task(task);
ASSERT_OK(task->execute(CompactionTask::kNoCancelFn));
Expand Down
4 changes: 2 additions & 2 deletions be/test/storage/lake/lake_primary_key_consistency_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -544,8 +544,8 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa

Status compact_op() {
auto txn_id = next_id();
auto task_context = std::make_unique<CompactionTaskContext>(txn_id, _tablet_metadata->id(), _version, false,
false, nullptr);
auto task_context =
std::make_unique<CompactionTaskContext>(txn_id, _tablet_metadata->id(), _version, false, nullptr);
ASSIGN_OR_RETURN(auto task, _tablet_mgr->compact(task_context.get()));
RETURN_IF_ERROR(task->execute(CompactionTask::kNoCancelFn));
RETURN_IF_ERROR(publish_single_version(_tablet_metadata->id(), _version + 1, txn_id));
Expand Down
Loading

0 comments on commit 6dcb0f8

Please sign in to comment.