Skip to content

Commit

Permalink
[BugFix] Fix backend crash due to _immutable_partition_ids unsafe con…
Browse files Browse the repository at this point in the history
…current access (StarRocks#55085)

Signed-off-by: meegoo <meegoo.sr@gmail.com>
  • Loading branch information
meegoo authored Jan 26, 2025
1 parent 380af2d commit b4b3ce8
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 9 deletions.
9 changes: 5 additions & 4 deletions be/src/runtime/lake_tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ class LakeTabletsChannel : public TabletsChannel {
bool _is_incremental_channel{false};
lake::DeltaWriterFinishMode _finish_mode{lake::DeltaWriterFinishMode::kWriteTxnLog};
TxnLogCollector _txn_log_collector;
std::set<int64_t> _immutable_partition_ids;

std::map<string, string> _column_to_expr_value;

// Profile counters
Expand Down Expand Up @@ -530,7 +530,8 @@ void LakeTabletsChannel::add_chunk(Chunk* chunk, const PTabletWriterAddChunkRequ
response->add_immutable_tablet_ids(tablet_id);
response->add_immutable_partition_ids(writer->partition_id());
immutable_tablet_ids.insert(tablet_id);
_immutable_partition_ids.insert(writer->partition_id());

_insert_immutable_partition(writer->partition_id());
}
}

Expand Down Expand Up @@ -602,7 +603,7 @@ static void null_callback(const Status& status) {
}

void LakeTabletsChannel::_flush_stale_memtables() {
if (_immutable_partition_ids.empty() && config::stale_memtable_flush_time_sec <= 0) {
if (_is_immutable_partition_empty() && config::stale_memtable_flush_time_sec <= 0) {
return;
}
bool high_mem_usage = false;
Expand All @@ -616,7 +617,7 @@ void LakeTabletsChannel::_flush_stale_memtables() {
bool log_flushed = false;
auto last_write_ts = writer->last_write_ts();
if (last_write_ts > 0) {
if (_immutable_partition_ids.count(writer->partition_id()) > 0) {
if (_has_immutable_partition(writer->partition_id())) {
if (high_mem_usage) {
log_flushed = true;
writer->flush(null_callback);
Expand Down
7 changes: 4 additions & 3 deletions be/src/runtime/local_tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -388,8 +388,9 @@ void LocalTabletsChannel::add_chunk(Chunk* chunk, const PTabletWriterAddChunkReq
if (writer->is_immutable() && immutable_tablet_ids.count(tablet_id) == 0) {
response->add_immutable_tablet_ids(tablet_id);
response->add_immutable_partition_ids(writer->partition_id());
_immutable_partition_ids.insert(writer->partition_id());
immutable_tablet_ids.insert(tablet_id);

_insert_immutable_partition(writer->partition_id());
}
}

Expand Down Expand Up @@ -466,7 +467,7 @@ void LocalTabletsChannel::add_chunk(Chunk* chunk, const PTabletWriterAddChunkReq
}

void LocalTabletsChannel::_flush_stale_memtables() {
if (_immutable_partition_ids.empty() && config::stale_memtable_flush_time_sec <= 0) {
if (_is_immutable_partition_empty() && config::stale_memtable_flush_time_sec <= 0) {
return;
}
bool high_mem_usage = false;
Expand All @@ -489,7 +490,7 @@ void LocalTabletsChannel::_flush_stale_memtables() {
bool need_flush = false;
auto last_write_ts = writer->last_write_ts();
if (last_write_ts > 0) {
if (_immutable_partition_ids.count(writer->partition_id()) > 0) {
if (_has_immutable_partition(writer->partition_id())) {
if (high_mem_usage) {
// immutable tablet flush stale memtable immediately when high mem usage
need_flush = true;
Expand Down
2 changes: 0 additions & 2 deletions be/src/runtime/local_tablets_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,6 @@ class LocalTabletsChannel : public TabletsChannel {
mutable bthread::Mutex _status_lock;
Status _status = Status::OK();

std::set<int64_t> _immutable_partition_ids;

std::map<string, string> _column_to_expr_value;

// Profile counters
Expand Down
21 changes: 21 additions & 0 deletions be/src/runtime/tablets_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

#pragma once

#include <bthread/mutex.h>

#include <atomic>
#include <set>
#include <sstream>
#include <string>

Expand Down Expand Up @@ -71,13 +74,31 @@ class TabletsChannel {
virtual void update_profile() = 0;

protected:
bool _is_immutable_partition_empty() const {
std::lock_guard l(_immutable_partition_ids_lock);
return _immutable_partition_ids.empty();
}

bool _has_immutable_partition(int64_t partition_id) const {
std::lock_guard l(_immutable_partition_ids_lock);
return _immutable_partition_ids.count(partition_id) > 0;
}

void _insert_immutable_partition(int64_t partition_id) {
std::lock_guard l(_immutable_partition_ids_lock);
_immutable_partition_ids.insert(partition_id);
}

// counter of remaining senders
std::atomic<int> _num_remaining_senders = 0;

// counter of initial senders
std::atomic<int> _num_initial_senders = 0;

std::unordered_map<int64_t, std::atomic<int>> _tablet_id_to_num_remaining_senders;

mutable bthread::Mutex _immutable_partition_ids_lock;
std::set<int64_t> _immutable_partition_ids;
};

struct TabletsChannelKey {
Expand Down

0 comments on commit b4b3ce8

Please sign in to comment.