Skip to content

Commit

Permalink
[Enhancement] Optimize Tablet Report (StarRocks#54848)
Browse files Browse the repository at this point in the history
## Why I'm doing:
In StarRocks, FE will periodically diff the tablets in BE and the tablets recorded in metadata, and then process the inconsistent tablets. The current implementation is that BE reports the full number of tablets to the FE Leader regularly (default 1 minute), and the Leader maintains a reporting queue, and then retrieves one BE tablet from the queue each time for single-threaded processing. For large-scale clusters, the speed of FE processing usually cannot keep up with the speed of BE reporting, resulting in the existence of all BE tablets in the reporting queue, which causes memory waste. This optimization uses the Leader's active pull mode to control the tablets in the reporting queue within a BE range.

## What I'm doing:
![image](https://github.com/user-attachments/assets/bfab21dc-9740-4e13-aa8c-a483a867d341)

After optimization, a new TabletController daemon is added to regularly pull the full number of tablets from the Backend. The pull condition is
1. For a certain BE, it has been more than collect_tablet_inverval_seconds since the last pull.
2. The processing queue of ReportHandler is empty.

BE still retains the ability to actively report tablets to FE Leader, but only for emergency situations, such as disk corruption and the need to immediately remove replicas from FE metadata.

## Test(a cluster with 5 million tablets)
after optimization
![image](https://github.com/user-attachments/assets/1eea3c3e-a430-4711-8b8c-62695222ad6d)

before optimization
![image](https://github.com/user-attachments/assets/d658afbd-74c9-40ac-8274-021d508f348c)

We can see the GC time has become smoother.

Signed-off-by: gengjun-git <gengjun@starrocks.com>
  • Loading branch information
gengjun-git authored Jan 23, 2025
1 parent e5375da commit 40edf17
Show file tree
Hide file tree
Showing 19 changed files with 325 additions and 48 deletions.
19 changes: 7 additions & 12 deletions be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include <sstream>

#include "agent/master_info.h"
#include "agent/task_worker_pool.h"
#include "common/process_exit.h"
#include "common/status.h"
#include "gen_cpp/HeartbeatService.h"
Expand Down Expand Up @@ -105,15 +106,6 @@ void HeartbeatServer::heartbeat(THeartbeatResult& heartbeat_result, const TMaste
bool r = update_master_info(master_info);
LOG_IF(WARNING, !r) << "Fail to update master info, maybe the master info has been updated by another thread "
"with a larger epoch";
} else if (*res == kNeedUpdateAndReport) {
LOG(INFO) << "Updating master info: " << print_master_info(master_info);
bool r = update_master_info(master_info);
LOG_IF(WARNING, !r) << "Fail to update master info, maybe the master info has been updated by another thread "
"with a larger epoch";
if (r) {
LOG(INFO) << "Master FE is changed or restarted. report tablet and disk info immediately";
_olap_engine->trigger_report();
}
} else {
DCHECK_EQ(kUnchanged, *res);
// nothing to do
Expand All @@ -127,6 +119,12 @@ void HeartbeatServer::heartbeat(THeartbeatResult& heartbeat_result, const TMaste
_olap_engine->decommission_disks(master_info.decommissioned_disks);
}

if (master_info.__isset.stop_regular_tablet_report) {
ReportOlapTableTaskWorkerPool::set_regular_report_stopped(master_info.stop_regular_tablet_report);
} else {
ReportOlapTableTaskWorkerPool::set_regular_report_stopped(false);
}

static auto num_hardware_cores = static_cast<int32_t>(CpuInfo::num_cores());
if (res.ok()) {
heartbeat_result.backend_info.__set_be_port(config::be_port);
Expand Down Expand Up @@ -270,9 +268,6 @@ StatusOr<HeartbeatServer::CmpResult> HeartbeatServer::compare_master_info(const
heartbeat_flags->update(master_info.heartbeat_flags);
}

if (curr_master_info->network_address != master_info.network_address) {
return kNeedUpdateAndReport;
}
if (*curr_master_info != master_info) {
return kNeedUpdate;
}
Expand Down
31 changes: 21 additions & 10 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,27 @@
namespace starrocks {

namespace {
static void wait_for_notify_small_steps(int32_t timeout_sec, bool from_report_tablet_thread,
const std::function<bool()>& stop_waiting) {
auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(timeout_sec);
static void wait_for_disk_report_notify(const std::function<bool()>& stop_waiting) {
auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(config::report_disk_state_interval_seconds);
bool notified = false;
do {
// take 1 second per step
notified = StorageEngine::instance()->wait_for_report_notify(1, from_report_tablet_thread);
notified = StorageEngine::instance()->wait_for_report_notify(1, false);
} while (!notified && std::chrono::steady_clock::now() < deadline && !stop_waiting());
}

static void wait_for_tablet_report_notify(const std::function<bool()>& stop_waiting) {
auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(config::report_tablet_interval_seconds);
bool notified = false;
do {
// take 1 second per step
notified = StorageEngine::instance()->wait_for_report_notify(1, true);
} while (!notified
// if the regular report is stopped, there will be no deadline
&& (ReportOlapTableTaskWorkerPool::is_regular_report_stopped() ||
std::chrono::steady_clock::now() < deadline) &&
!stop_waiting());
}
} // namespace

const size_t PUBLISH_VERSION_BATCH_SIZE = 10;
Expand Down Expand Up @@ -665,8 +677,7 @@ void* ReportDiskStateTaskWorkerPool::_worker_thread_callback(void* arg_this) {
}

// wait for notifying until timeout
wait_for_notify_small_steps(config::report_disk_state_interval_seconds, false,
[&] { return worker_pool_this->_stopped.load(); });
wait_for_disk_report_notify([&] { return worker_pool_this->_stopped.load(); });
}

return nullptr;
Expand Down Expand Up @@ -695,8 +706,7 @@ void* ReportOlapTableTaskWorkerPool::_worker_thread_callback(void* arg_this) {
if (!st_report.ok()) {
LOG(WARNING) << "Fail to report all tablets info, err=" << st_report.to_string();
// wait for notifying until timeout
wait_for_notify_small_steps(config::report_tablet_interval_seconds, true,
[&] { return worker_pool_this->_stopped.load(); });
wait_for_tablet_report_notify([&] { return worker_pool_this->_stopped.load(); });
continue;
}
int64_t max_compaction_score =
Expand All @@ -717,13 +727,14 @@ void* ReportOlapTableTaskWorkerPool::_worker_thread_callback(void* arg_this) {
}

// wait for notifying until timeout
wait_for_notify_small_steps(config::report_tablet_interval_seconds, true,
[&] { return worker_pool_this->_stopped.load(); });
wait_for_tablet_report_notify([&] { return worker_pool_this->_stopped.load(); });
}

return nullptr;
}

std::atomic<bool> ReportOlapTableTaskWorkerPool::_regular_report_stopped(false);

void* ReportWorkgroupTaskWorkerPool::_worker_thread_callback(void* arg_this) {
auto* worker_pool_this = (ReportWorkgroupTaskWorkerPool*)arg_this;

Expand Down
6 changes: 6 additions & 0 deletions be/src/agent/task_worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,13 @@ class ReportOlapTableTaskWorkerPool : public TaskWorkerPool<AgentTaskRequestWith
_callback_function = _worker_thread_callback;
}

static void set_regular_report_stopped(bool stop) { _regular_report_stopped.store(stop); }

static bool is_regular_report_stopped() { return _regular_report_stopped.load(); }

private:
static std::atomic<bool> _regular_report_stopped;

static void* _worker_thread_callback(void* arg_this);

AgentTaskRequestPtr _convert_task(const TAgentTaskRequest& task, time_t recv_time) override {
Expand Down
13 changes: 13 additions & 0 deletions be/src/service/service_be/backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "backend_service.h"

#include "agent/agent_server.h"
#include "agent/task_worker_pool.h"
#include "storage/storage_engine.h"
#include "storage/tablet_manager.h"

Expand Down Expand Up @@ -66,4 +67,16 @@ void BackendService::publish_cluster_state(TAgentResult& result, const TAgentPub
_agent_server->publish_cluster_state(result, request);
}

void BackendService::get_tablets_info(TGetTabletsInfoResult& result_, const TGetTabletsInfoRequest& request) {
result_.__set_report_version(curr_report_version());
result_.__isset.tablets = true;
TStatus t_status;
Status st_report = StorageEngine::instance()->tablet_manager()->report_all_tablets_info(&result_.tablets);
if (!st_report.ok()) {
LOG(WARNING) << "Fail to get all tablets info, err=" << st_report.to_string();
}
st_report.to_thrift(&t_status);
result_.status = t_status;
}

} // namespace starrocks
2 changes: 2 additions & 0 deletions be/src/service/service_be/backend_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class BackendService : public BackendServiceBase {

void get_tablet_stat(TTabletStatResult& result) override;

void get_tablets_info(TGetTabletsInfoResult& result_, const TGetTabletsInfoRequest& request) override;

private:
AgentServer* _agent_server;
};
Expand Down
4 changes: 2 additions & 2 deletions be/test/agent/heartbeat_server_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ TEST(HeartbeatServerTest, test_print_master_info_with_token_null) {
"cluster_id=12345, epoch=100, token=<null>, backend_ip=192.168.1.1, "
"http_port=<null>, heartbeat_flags=<null>, backend_id=<null>, "
"min_active_txn_id=0, run_mode=<null>, disabled_disks=<null>, "
"decommissioned_disks=<null>, encrypted=<null>)";
"decommissioned_disks=<null>, encrypted=<null>, stop_regular_tablet_report=<null>)";

EXPECT_EQ(server.print_master_info(master_info), expected_output);
}
Expand All @@ -71,7 +71,7 @@ TEST(HeartbeatServerTest, test_print_master_info_with_token_hidden) {
"cluster_id=12345, epoch=100, token=<hidden>, backend_ip=192.168.1.1, "
"http_port=<null>, heartbeat_flags=<null>, backend_id=<null>, "
"min_active_txn_id=0, run_mode=<null>, disabled_disks=<null>, "
"decommissioned_disks=<null>, encrypted=<null>)";
"decommissioned_disks=<null>, encrypted=<null>, stop_regular_tablet_report=<null>)";

EXPECT_EQ(server.print_master_info(master_info), expected_output);
}
Expand Down
6 changes: 6 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1493,6 +1493,12 @@ public class Config extends ConfigBase {
@ConfField
public static int tablet_stat_update_interval_second = 300; // 5 min

@ConfField(mutable = true, comment = "time interval to collect tablet info from backend")
public static long tablet_collect_interval_seconds = 60;

@ConfField(mutable = true, comment = "Timeout for calling BE get_tablets_info rpc")
public static int tablet_collect_timeout_seconds = 60;

/**
* The tryLock timeout configuration of globalStateMgr lock.
* Normally it does not need to change, unless you need to test something.
Expand Down
10 changes: 1 addition & 9 deletions fe/fe-core/src/main/java/com/starrocks/leader/LeaderImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
import com.starrocks.load.DeleteJob;
import com.starrocks.load.OlapDeleteJob;
import com.starrocks.load.loadv2.SparkLoadJob;
import com.starrocks.memory.MemoryUsageTracker;
import com.starrocks.rpc.ThriftConnectionPool;
import com.starrocks.rpc.ThriftRPCRequestExecutor;
import com.starrocks.server.GlobalStateMgr;
Expand Down Expand Up @@ -175,13 +174,6 @@
public class LeaderImpl {
private static final Logger LOG = LogManager.getLogger(LeaderImpl.class);

private final ReportHandler reportHandler = new ReportHandler();

public LeaderImpl() {
reportHandler.start();
MemoryUsageTracker.registerMemoryTracker("Report", reportHandler);
}

public TMasterResult finishTask(TFinishTaskRequest request) {
// if current node is not master, reject the request
TMasterResult result = new TMasterResult();
Expand Down Expand Up @@ -873,7 +865,7 @@ public TMasterResult report(TReportRequest request) throws TException {
result.setStatus(status);
return result;
}
return reportHandler.handleReport(request);
return GlobalStateMgr.getCurrentState().getReportHandler().handleReport(request);
}

private void finishAlterTask(AgentTask task) {
Expand Down
46 changes: 31 additions & 15 deletions fe/fe-core/src/main/java/com/starrocks/leader/ReportHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import com.starrocks.catalog.TabletMeta;
import com.starrocks.clone.TabletChecker;
import com.starrocks.clone.TabletSchedCtx;
import com.starrocks.common.CloseableLock;
import com.starrocks.common.Config;
import com.starrocks.common.FeConstants;
import com.starrocks.common.InternalErrorCode;
Expand All @@ -79,9 +80,6 @@
import com.starrocks.common.util.concurrent.lock.Locker;
import com.starrocks.datacache.DataCacheMetrics;
import com.starrocks.memory.MemoryTrackable;
import com.starrocks.metric.GaugeMetric;
import com.starrocks.metric.Metric.MetricUnit;
import com.starrocks.metric.MetricRepo;
import com.starrocks.persist.BackendTabletsInfo;
import com.starrocks.persist.BatchDeleteReplicaInfo;
import com.starrocks.persist.ReplicaPersistInfo;
Expand Down Expand Up @@ -142,12 +140,14 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;

public class ReportHandler extends Daemon implements MemoryTrackable {
@Override
public List<Pair<List<Object>, Long>> getSamples() {
synchronized (pendingTaskMap) {
try (CloseableLock ignored = CloseableLock.lock(lock.readLock())) {
List<Pair<List<Object>, Long>> result = new ArrayList<>();
for (Map<Long, ReportTask> taskMap : pendingTaskMap.values()) {
result.add(Pair.create(taskMap.values()
Expand All @@ -162,7 +162,7 @@ public List<Pair<List<Object>, Long>> getSamples() {

@Override
public Map<String, Long> estimateCount() {
synchronized (pendingTaskMap) {
try (CloseableLock ignored = CloseableLock.lock(lock.readLock())) {
long count = 0;
for (Map<Long, ReportTask> taskMap : pendingTaskMap.values()) {
count += taskMap.size();
Expand Down Expand Up @@ -193,6 +193,8 @@ public enum ReportType {

private final Map<ReportType, Map<Long, ReportTask>> pendingTaskMap = Maps.newHashMap();

private final ReadWriteLock lock = new ReentrantReadWriteLock();

/**
* Record the mapping of <tablet id, backend id> to the to be dropped time of tablet.
* We will delay the drop of tablet based on configuration `tablet_report_drop_tablet_delay_sec`
Expand All @@ -208,14 +210,6 @@ public enum ReportType {

public ReportHandler() {
super("ReportHandler");
GaugeMetric<Long> gaugeQueueSize = new GaugeMetric<Long>(
"report_queue_size", MetricUnit.NOUNIT, "report queue size") {
@Override
public Long getValue() {
return (long) reportQueue.size();
}
};
MetricRepo.addMetric(gaugeQueueSize);
pendingTaskMap.put(ReportType.TABLET_REPORT, Maps.newHashMap());
pendingTaskMap.put(ReportType.DISK_REPORT, Maps.newHashMap());
pendingTaskMap.put(ReportType.TASK_REPORT, Maps.newHashMap());
Expand Down Expand Up @@ -373,7 +367,7 @@ private void buildErrorResult(TStatus tStatus, String msg) {
}

private void putToQueue(ReportTask reportTask) throws Exception {
synchronized (pendingTaskMap) {
try (CloseableLock ignored = CloseableLock.lock(lock.writeLock())) {
if (!pendingTaskMap.containsKey(reportTask.type)) {
throw new Exception("Unknown report task type" + reportTask.toString());
}
Expand Down Expand Up @@ -427,6 +421,13 @@ public ReportTask(long beId, ReportType type, Map<TTaskType, Set<Long>> tasks,
this.dataCacheMetrics = dataCacheMetrics;
}

public ReportTask(long beId, ReportType type, Map<Long, TTablet> tablets, long reportVersion) {
this.beId = beId;
this.type = type;
this.tablets = tablets;
this.reportVersion = reportVersion;
}

@Override
protected void exec() {
if (tasks != null) {
Expand Down Expand Up @@ -2184,13 +2185,28 @@ private static void addReplica(long tabletId, TTabletInfo backendTabletInfo, lon
}
}

public int getPendingTabletReportTaskCnt() {
try (CloseableLock ignored = CloseableLock.lock(lock.readLock())) {
Map<Long, ReportTask> tasks = pendingTaskMap.get(ReportType.TABLET_REPORT);
return tasks == null ? 0 : tasks.size();
}
}

public void putTabletReportTask(long beId, long reportVersion, Map<Long, TTablet> tablets) throws Exception {
putToQueue(new ReportTask(beId, ReportType.TABLET_REPORT, tablets, reportVersion));
}

public int getReportQueueSize() {
return reportQueue.size();
}

@Override
protected void runOneCycle() {
while (true) {
try {
Pair<Long, ReportType> pair = reportQueue.take();
ReportTask task = null;
synchronized (pendingTaskMap) {
try (CloseableLock ignored = CloseableLock.lock(lock.writeLock())) {
// using the lastest task
task = pendingTaskMap.get(pair.second).get(pair.first);
if (task == null) {
Expand Down
Loading

0 comments on commit 40edf17

Please sign in to comment.