Skip to content

Commit

Permalink
[SR-4336] bugfix: wrong _compaction_running value after BE restart (S…
Browse files Browse the repository at this point in the history
  • Loading branch information
decster authored Sep 6, 2021
1 parent 96245d8 commit e1b4fcc
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 6 deletions.
46 changes: 40 additions & 6 deletions be/src/storage/tablet_updates.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,9 @@ Status TabletUpdates::_load_from_pb(const TabletUpdatesPB& updates) {
std::lock_guard lg(_rowset_stats_lock);
_rowset_stats.emplace(rsid, std::move(stats));
}
VLOG(1) << "load tablet " << _debug_string(false, true);
l2.unlock(); // _rowsets_lock
_update_total_stats(_versions[_apply_version_idx]->rowsets);
VLOG(1) << "load tablet " << _debug_string(false, true);
_try_commit_pendings_unlocked();
_check_for_apply();

Expand Down Expand Up @@ -627,9 +628,15 @@ void TabletUpdates::do_apply() {
_apply_rowset_commit(*version_info_apply);
}
StarRocksMetrics::instance()->update_rowset_commit_apply_duration_us.increment(duration_ns / 1000);
} else {
} else if (version_info_apply->compaction) {
// compaction
// _compaction_running may be false after BE restart, reset it to true
_compaction_running = true;
_apply_compaction_commit(*version_info_apply);
_compaction_running = false;
} else {
LOG(ERROR) << "bad EditVersionInfo tablet:" << _tablet.tablet_id();
_set_error();
}
first = false;
if (_error) {
Expand Down Expand Up @@ -820,9 +827,12 @@ void TabletUpdates::_apply_rowset_commit(const EditVersionInfo& version_info) {
_apply_version_idx++;
_apply_version_changed.notify_all();
}
_update_total_stats(version_info.rowsets);
int64_t t_write = MonotonicMillis();

size_t del_percent = _cur_total_rows == 0 ? 0 : (_cur_total_dels * 100) / _cur_total_rows;
LOG(INFO) << "apply_rowset_commit finish. tablet:" << tablet_id << " version:" << version_info.version.to_string()
<< " total #del/#rows" << _cur_total_dels << "/" << _cur_total_rows << " " << del_percent << "%"
<< " rowset:" << rowset_id << " #seg:" << rowset->num_segments() << " #row:" << rowset->num_rows()
<< " #del:" << old_total_del << "+" << new_del << "=" << total_del << " #dv:" << ndelvec
<< delvec_change_info << " duration:" << t_write - t_start << "ms"
Expand Down Expand Up @@ -1148,10 +1158,14 @@ void TabletUpdates::_apply_compaction_commit(const EditVersionInfo& version_info
DCHECK_LE(iter->second->num_dels, iter->second->num_rows);
}
}
_update_total_stats(version_info.rowsets);
int64_t t_write = MonotonicMillis();
size_t del_percent = _cur_total_rows == 0 ? 0 : (_cur_total_dels * 100) / _cur_total_rows;
LOG(INFO) << "apply_compaction_commit finish tablet:" << tablet_id
<< " version:" << version_info.version.to_string() << " rowset:" << rowset_id << " #row:" << total_rows
<< " #del:" << total_deletes << " #delvec:" << delvecs.size() << " duration:" << t_write - t_start << "ms"
<< " version:" << version_info.version.to_string() << " total #del/#rows" << _cur_total_dels << "/"
<< del_percent << "%"
<< " rowset:" << rowset_id << " #row:" << total_rows << " #del:" << total_deletes
<< " #delvec:" << delvecs.size() << " duration:" << t_write - t_start << "ms"
<< Substitute("($0/$1/$2)", t_load - t_start, t_index_delvec - t_load, t_write - t_index_delvec);
VLOG(1) << "update compaction apply " << _debug_string(true, true);
}
Expand Down Expand Up @@ -1410,6 +1424,7 @@ Status TabletUpdates::compaction(MemTracker* mem_tracker) {
string msg = Substitute("compaction got negative score: tablet=$0 score=$1", _tablet.tablet_id(), total_score);
DCHECK(false) << msg;
LOG(WARNING) << msg;
_compaction_running = false;
return Status::OK();
}
if (total_valid_rowsets - info->inputs.size() <= 3) {
Expand All @@ -1426,7 +1441,9 @@ Status TabletUpdates::compaction(MemTracker* mem_tracker) {
<< " bytes:" << PrettyPrinter::print(total_bytes, TUnit::BYTES) << "->"
<< PrettyPrinter::print(total_bytes_after_compaction, TUnit::BYTES) << "(estimate)";
Status st = _do_compaction(&info, mem_tracker, true);
_compaction_running = false;
if (!st.ok()) {
_compaction_running = false;
}
return st;
}

Expand All @@ -1437,7 +1454,7 @@ void TabletUpdates::_calc_compaction_score(RowsetStats* stats) {
}
// TODO(cbl): estimate read/write cost, currently just use fixed value
const int64_t cost_record_write = 1;
const int64_t cost_record_read = 5;
const int64_t cost_record_read = 4;
// use double to prevent overflow
int64_t delete_bytes = (int64_t)(stats->byte_size * (double)stats->num_dels / stats->num_rows);
stats->compaction_score = _compaction_cost_seek + (cost_record_read + cost_record_write) * delete_bytes -
Expand Down Expand Up @@ -2093,4 +2110,21 @@ Status TabletUpdates::clear_meta() {
return Status::OK();
}

void TabletUpdates::_update_total_stats(const std::vector<uint32_t>& rowsets) {
size_t nrow = 0;
size_t ndel = 0;
{
std::lock_guard l3(_rowset_stats_lock);
for (auto rid : rowsets) {
auto itr = _rowset_stats.find(rid);
if (itr != _rowset_stats.end()) {
nrow += itr->second->num_rows;
ndel += itr->second->num_dels;
}
}
}
_cur_total_rows = nrow;
_cur_total_dels = ndel;
}

} // namespace starrocks
7 changes: 7 additions & 0 deletions be/src/storage/tablet_updates.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ class TabletUpdates {

void _clear_rowset_del_vec_cache(const Rowset& rowset);

void _update_total_stats(const std::vector<uint32_t>& rowsets);

private:
Tablet& _tablet;

Expand Down Expand Up @@ -297,6 +299,11 @@ class TabletUpdates {
// used to for compaction policy.
// protected by |_rowset_stats_lock|.
OrderedMap<uint32_t, std::unique_ptr<RowsetStats>> _rowset_stats;
// stats for current applied version's total rows/total deletes
// it will be updated after new EditVersion applied
// currently just for debug/logging purpose
size_t _cur_total_rows = 0;
size_t _cur_total_dels = 0;

// state used in compaction process
std::unique_ptr<vectorized::CompactionState> _compaction_state;
Expand Down

0 comments on commit e1b4fcc

Please sign in to comment.