Skip to content

Commit

Permalink
[Refactor] Refactor Starrocks LOG to reduce the log file size(part2) (S…
Browse files Browse the repository at this point in the history
…tarRocks#52129)

Signed-off-by: sevev <qiangzh95@gmail.com>
  • Loading branch information
sevev authored Oct 24, 2024
1 parent aeafef0 commit 16ffabe
Show file tree
Hide file tree
Showing 14 changed files with 186 additions and 147 deletions.
42 changes: 23 additions & 19 deletions be/src/agent/agent_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ static void alter_tablet(const TAlterTabletReqV2& agent_task_req, int64_t signat
strings::Substitute("[Alter Job:$0, tablet:$1]: ", agent_task_req.job_id, agent_task_req.base_tablet_id);
if (status == STARROCKS_SUCCESS) {
g_report_version.fetch_add(1, std::memory_order_relaxed);
LOG(INFO) << alter_msg_head << "alter finished. signature: " << signature;
VLOG(1) << alter_msg_head << "alter finished. signature: " << signature;
}

// Return result to fe
Expand Down Expand Up @@ -272,7 +272,7 @@ void run_alter_tablet_task(const std::shared_ptr<AlterTabletAgentTaskRequest>& a
int64_t signatrue = agent_task_req->signature;
std::string alter_msg_head = strings::Substitute("[Alter Job:$0, tablet:$1]: ", agent_task_req->task_req.job_id,
agent_task_req->task_req.base_tablet_id);
LOG(INFO) << alter_msg_head << "get alter table task, signature: " << agent_task_req->signature;
VLOG(1) << alter_msg_head << "get alter table task, signature: " << agent_task_req->signature;
bool is_task_req_expired = false;
if (agent_task_req->isset.recv_time) {
int64_t time_elapsed = time(nullptr) - agent_task_req->recv_time;
Expand Down Expand Up @@ -435,22 +435,23 @@ void run_storage_medium_migrate_task(const std::shared_ptr<StorageMediumMigrateT
if (src_storage_medium == storage_medium) {
// status code is ok
LOG(INFO) << "tablet is already on specified storage medium. "
<< "storage_medium=" << storage_medium;
<< "storage_medium=" << storage_medium << ", signature:" << agent_task_req->signature;
break;
}

uint32_t count = StorageEngine::instance()->available_storage_medium_type_count();
if (count <= 1) {
LOG(INFO) << "available storage medium type count is less than 1, "
<< "no need to migrate. count=" << count;
<< "no need to migrate. count=" << count << ", signature:" << agent_task_req->signature;
status_code = TStatusCode::RUNTIME_ERROR;
break;
}

// get a random store of specified storage medium
auto stores = StorageEngine::instance()->get_stores_for_create_tablet(storage_medium);
if (stores.empty()) {
LOG(WARNING) << "fail to get path for migration. storage_medium=" << storage_medium;
LOG(WARNING) << "get available store for migratio failed. storage_medium=" << storage_medium
<< ", signature:" << agent_task_req->signature;
status_code = TStatusCode::RUNTIME_ERROR;
break;
}
Expand Down Expand Up @@ -507,8 +508,8 @@ void run_check_consistency_task(const std::shared_ptr<CheckConsistencyTaskReques
LOG(WARNING) << "check consistency failed. status: " << res << ", signature: " << agent_task_req->signature;
status_code = TStatusCode::RUNTIME_ERROR;
} else {
LOG(INFO) << "check consistency success. status:" << res << ", signature:" << agent_task_req->signature
<< ", checksum:" << checksum;
VLOG(1) << "check consistency success. status:" << res << ", signature:" << agent_task_req->signature
<< ", checksum:" << checksum;
}
}

Expand Down Expand Up @@ -649,14 +650,15 @@ void run_update_schema_task(const std::shared_ptr<UpdateSchemaTaskRequest>& agen

finish_task(finish_task_request);
remove_task_info(agent_task_req->task_type, agent_task_req->signature);
LOG(INFO) << "Update schema task signature=" << agent_task_req->signature << " error_tablets["
<< error_tablet_ids.size() << "):" << JoinInts(error_tablet_ids, ",");
LOG_IF(INFO, error_tablet_ids.size() > 0)
<< "Update schema task signature=" << agent_task_req->signature << " error_tablets["
<< error_tablet_ids.size() << "):" << JoinInts(error_tablet_ids, ",");
}

void run_upload_task(const std::shared_ptr<UploadAgentTaskRequest>& agent_task_req, ExecEnv* exec_env) {
const TUploadReq& upload_request = agent_task_req->task_req;

LOG(INFO) << "Got upload task signature=" << agent_task_req->signature << " job id=" << upload_request.job_id;
VLOG(1) << "Got upload task signature=" << agent_task_req->signature << " job id=" << upload_request.job_id;

std::map<int64_t, std::vector<std::string>> tablet_files;
SnapshotLoader loader(exec_env, upload_request.job_id, agent_task_req->signature);
Expand Down Expand Up @@ -684,12 +686,13 @@ void run_upload_task(const std::shared_ptr<UploadAgentTaskRequest>& agent_task_r
finish_task(finish_task_request);
remove_task_info(agent_task_req->task_type, agent_task_req->signature);

LOG(INFO) << "Uploaded task signature=" << agent_task_req->signature << " job id=" << upload_request.job_id;
LOG(INFO) << "Finished uploaded task signature=" << agent_task_req->signature
<< " job id=" << upload_request.job_id;
}

void run_download_task(const std::shared_ptr<DownloadAgentTaskRequest>& agent_task_req, ExecEnv* exec_env) {
const TDownloadReq& download_request = agent_task_req->task_req;
LOG(INFO) << "Got download task signature=" << agent_task_req->signature << " job id=" << download_request.job_id;
VLOG(1) << "Got download task signature=" << agent_task_req->signature << " job id=" << download_request.job_id;

TStatusCode::type status_code = TStatusCode::OK;
std::vector<std::string> error_msgs;
Expand Down Expand Up @@ -719,12 +722,13 @@ void run_download_task(const std::shared_ptr<DownloadAgentTaskRequest>& agent_ta
finish_task(finish_task_request);
remove_task_info(agent_task_req->task_type, agent_task_req->signature);

LOG(INFO) << "Downloaded task signature=" << agent_task_req->signature << " job id=" << download_request.job_id;
LOG(INFO) << "Finished downloaded task signature=" << agent_task_req->signature
<< " job id=" << download_request.job_id;
}

void run_make_snapshot_task(const std::shared_ptr<SnapshotAgentTaskRequest>& agent_task_req, ExecEnv* exec_env) {
const TSnapshotRequest& snapshot_request = agent_task_req->task_req;
LOG(INFO) << "Got snapshot task signature=" << agent_task_req->signature;
VLOG(1) << "Got snapshot task signature=" << agent_task_req->signature;

TStatusCode::type status_code = TStatusCode::OK;
std::vector<std::string> error_msgs;
Expand Down Expand Up @@ -778,7 +782,7 @@ void run_make_snapshot_task(const std::shared_ptr<SnapshotAgentTaskRequest>& age
void run_release_snapshot_task(const std::shared_ptr<ReleaseSnapshotAgentTaskRequest>& agent_task_req,
ExecEnv* exec_env) {
const TReleaseSnapshotRequest& release_snapshot_request = agent_task_req->task_req;
LOG(INFO) << "Got release snapshot task signature=" << agent_task_req->signature;
VLOG(1) << "Got release snapshot task signature=" << agent_task_req->signature;

TStatusCode::type status_code = TStatusCode::OK;
std::vector<std::string> error_msgs;
Expand Down Expand Up @@ -827,7 +831,7 @@ AgentStatus move_dir(TTabletId tablet_id, TSchemaHash schema_hash, const std::st

void run_move_dir_task(const std::shared_ptr<MoveDirAgentTaskRequest>& agent_task_req, ExecEnv* exec_env) {
const TMoveDirReq& move_dir_req = agent_task_req->task_req;
LOG(INFO) << "Got move dir task signature=" << agent_task_req->signature << " job id=" << move_dir_req.job_id;
VLOG(1) << "Got move dir task signature=" << agent_task_req->signature << " job id=" << move_dir_req.job_id;

TStatusCode::type status_code = TStatusCode::OK;
std::vector<std::string> error_msgs;
Expand All @@ -854,7 +858,7 @@ void run_update_meta_info_task(const std::shared_ptr<UpdateTabletMetaInfoAgentTa
ExecEnv* exec_env) {
const TUpdateTabletMetaInfoReq& update_tablet_meta_req = agent_task_req->task_req;

LOG(INFO) << "get update tablet meta task, signature:" << agent_task_req->signature;
VLOG(1) << "get update tablet meta task, signature:" << agent_task_req->signature;

auto update_manager = StorageEngine::instance()->update_manager();
TStatusCode::type status_code = TStatusCode::OK;
Expand Down Expand Up @@ -909,7 +913,7 @@ void run_update_meta_info_task(const std::shared_ptr<UpdateTabletMetaInfoAgentTa
{
auto curr_binlog_config = tablet->tablet_meta()->get_binlog_config();
if (curr_binlog_config != nullptr) {
LOG(INFO) << "current binlog config:" << curr_binlog_config->to_string();
VLOG(1) << "current binlog config:" << curr_binlog_config->to_string();
}
}

Expand Down Expand Up @@ -963,7 +967,7 @@ AgentStatus drop_auto_increment_map(TTableId table_id) {
void run_drop_auto_increment_map_task(const std::shared_ptr<DropAutoIncrementMapAgentTaskRequest>& agent_task_req,
ExecEnv* exec_env) {
const TDropAutoIncrementMapReq& drop_auto_increment_map_req = agent_task_req->task_req;
LOG(INFO) << "drop auto increment map task tableid=" << drop_auto_increment_map_req.table_id;
VLOG(1) << "drop auto increment map task tableid=" << drop_auto_increment_map_req.table_id;

TStatusCode::type status_code = TStatusCode::OK;
std::vector<std::string> error_msgs;
Expand Down
20 changes: 10 additions & 10 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ void TaskWorkerPool<AgentTaskRequest>::submit_task(const TAgentTaskRequest& task
// Set the receiving time of task so that we can determine whether it is timed out later
auto new_task = _convert_task(task, time(nullptr));
size_t task_count = _push_task(std::move(new_task));
LOG(INFO) << "Submit task success. type=" << type_str << ", signature=" << signature
<< ", task_count_in_queue=" << task_count;
VLOG(1) << "Submit task success. type=" << type_str << ", signature=" << signature
<< ", task_count_in_queue=" << task_count;
} else {
LOG(INFO) << "Submit task failed, already exists type=" << type_str << ", signature=" << signature;
}
Expand Down Expand Up @@ -523,8 +523,8 @@ void* PublishVersionTaskWorkerPool::_worker_thread_callback(void* arg_this) {
}

const auto& publish_version_task = *priority_tasks.top();
LOG(INFO) << "get publish version task txn_id: " << publish_version_task.task_req.transaction_id
<< " priority queue size: " << priority_tasks.size();
VLOG(1) << "get publish version task txn_id: " << publish_version_task.task_req.transaction_id
<< " priority queue size: " << priority_tasks.size();
bool enable_sync_publish = publish_version_task.task_req.enable_sync_publish;
if (enable_sync_publish) {
wait_time = 0;
Expand Down Expand Up @@ -554,9 +554,9 @@ void* PublishVersionTaskWorkerPool::_worker_thread_callback(void* arg_this) {
remove_task_info(finish_task_request.task_type, finish_task_request.signature);
}
int64_t t2 = MonotonicMillis();
LOG(INFO) << "batch flush " << finish_task_requests.size()
<< " txn publish task(s). #dir:" << affected_dirs.size() << " flush:" << t1 - t0
<< "ms finish_task_rpc:" << t2 - t1 << "ms";
VLOG(1) << "batch flush " << finish_task_requests.size()
<< " txn publish task(s). #dir:" << affected_dirs.size() << " flush:" << t1 - t0
<< "ms finish_task_rpc:" << t2 - t1 << "ms";
finish_task_requests.clear();
affected_dirs.clear();
batch_publish_latency = 0;
Expand All @@ -573,8 +573,8 @@ void* PublishVersionTaskWorkerPool::_worker_thread_callback(void* arg_this) {
StorageEngine::instance()->wake_finish_publish_vesion_thread();
affected_dirs.clear();
batch_publish_latency = 0;
LOG(INFO) << "batch submit " << finish_task_size << " finish publish version task "
<< "txn publish task(s). #dir:" << affected_dirs.size() << " flush:" << t1 - t0 << "ms";
VLOG(1) << "batch submit " << finish_task_size << " finish publish version task "
<< "txn publish task(s). #dir:" << affected_dirs.size() << " flush:" << t1 - t0 << "ms";
}
}
}
Expand Down Expand Up @@ -713,7 +713,7 @@ void* ReportOlapTableTaskWorkerPool::_worker_thread_callback(void* arg_this) {
LOG(WARNING) << "Fail to report olap table state to " << master_address.hostname << ":"
<< master_address.port << ", err=" << status;
} else {
LOG(INFO) << "Report tablets successfully, report version: " << report_version;
VLOG(1) << "Report tablets successfully, report version: " << report_version;
}

// wait for notifying until timeout
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1483,4 +1483,6 @@ CONF_mBool(enable_bit_unpack_simd, "true");

CONF_mInt32(max_committed_without_schema_rowset, "1000");

CONF_mInt32(apply_version_slow_log_sec, "30");

} // namespace starrocks::config
12 changes: 6 additions & 6 deletions be/src/exec/pipeline/pipeline_driver_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,8 @@ void GlobalDriverExecutor::report_exec_state(QueryContext* query_ctx, FragmentCo
auto status = ExecStateReporter::report_exec_status(*params, exec_env, fe_addr);
if (!status.ok()) {
if (status.is_not_found()) {
LOG(INFO) << "[Driver] Fail to report exec state due to query not found: fragment_instance_id="
<< print_id(fragment_id);
VLOG(1) << "[Driver] Fail to report exec state due to query not found: fragment_instance_id="
<< print_id(fragment_id);
} else {
LOG(WARNING) << "[Driver] Fail to report exec state: fragment_instance_id=" << print_id(fragment_id)
<< ", status: " << status.to_string() << ", retry_times=" << retry_times;
Expand All @@ -357,8 +357,8 @@ void GlobalDriverExecutor::report_exec_state(QueryContext* query_ctx, FragmentCo
}
}
} else {
LOG(INFO) << "[Driver] Succeed to report exec state: fragment_instance_id=" << print_id(fragment_id)
<< ", is_done=" << params->done;
VLOG(1) << "[Driver] Succeed to report exec state: fragment_instance_id=" << print_id(fragment_id)
<< ", is_done=" << params->done;
}
break;
}
Expand Down Expand Up @@ -402,7 +402,7 @@ void GlobalDriverExecutor::report_audit_statistics(QueryContext* query_ctx, Frag
<< print_id(fragment_id) << ", status: " << status.to_string();
}
} else {
LOG(INFO) << "[Driver] Succeed to report audit statistics: fragment_instance_id=" << print_id(fragment_id);
VLOG(1) << "[Driver] Succeed to report audit statistics: fragment_instance_id=" << print_id(fragment_id);
}
};
auto st = this->_audit_statistics_reporter->submit(std::move(report_task));
Expand Down Expand Up @@ -444,7 +444,7 @@ void GlobalDriverExecutor::report_epoch(ExecEnv* exec_env, QueryContext* query_c
<< ", status: " << status.to_string();
}
} else {
LOG(INFO) << "[Driver] Succeed to report epoch exec state: query_id=" << print_id(query_id);
VLOG(1) << "[Driver] Succeed to report epoch exec state: query_id=" << print_id(query_id);
}
};

Expand Down
8 changes: 4 additions & 4 deletions be/src/runtime/plan_fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) {
const TPlanFragmentExecParams& params = request.params;
_query_id = params.query_id;

LOG(INFO) << "Prepare(): query_id=" << print_id(_query_id)
<< " fragment_instance_id=" << print_id(params.fragment_instance_id)
<< " backend_num=" << request.backend_num;
VLOG(1) << "Prepare(): query_id=" << print_id(_query_id)
<< " fragment_instance_id=" << print_id(params.fragment_instance_id)
<< " backend_num=" << request.backend_num;

DCHECK(_runtime_state->chunk_size() > 0);

Expand Down Expand Up @@ -189,7 +189,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) {
}

Status PlanFragmentExecutor::open() {
LOG(INFO) << "Open(): fragment_instance_id=" << print_id(_runtime_state->fragment_instance_id());
VLOG(1) << "Open(): fragment_instance_id=" << print_id(_runtime_state->fragment_instance_id());
tls_thread_status.set_query_id(_runtime_state->query_id());

// Only register profile report worker for broker load and insert into here,
Expand Down
10 changes: 5 additions & 5 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -443,9 +443,9 @@ Status PInternalServiceImplBase<T>::_exec_plan_fragment(brpc::Controller* cntl,
}

bool is_pipeline = t_request.__isset.is_pipeline && t_request.is_pipeline;
LOG(INFO) << "exec plan fragment, fragment_instance_id=" << print_id(t_request.params.fragment_instance_id)
<< ", coord=" << t_request.coord << ", backend=" << t_request.backend_num
<< ", is_pipeline=" << is_pipeline << ", chunk_size=" << t_request.query_options.batch_size;
VLOG(1) << "exec plan fragment, fragment_instance_id=" << print_id(t_request.params.fragment_instance_id)
<< ", coord=" << t_request.coord << ", backend=" << t_request.backend_num << ", is_pipeline=" << is_pipeline
<< ", chunk_size=" << t_request.query_options.batch_size;
if (is_pipeline) {
return _exec_plan_fragment_by_pipeline(t_request, t_request);
} else {
Expand Down Expand Up @@ -541,8 +541,8 @@ void PInternalServiceImplBase<T>::_cancel_plan_fragment(google::protobuf::RpcCon

auto query_ctx = _exec_env->query_context_mgr()->get(query_id);
if (!query_ctx) {
LOG(INFO) << strings::Substitute("QueryContext already destroyed: query_id=$0, fragment_instance_id=$1",
print_id(query_id), print_id(tid));
VLOG(1) << strings::Substitute("QueryContext already destroyed: query_id=$0, fragment_instance_id=$1",
print_id(query_id), print_id(tid));
st.to_protobuf(result->mutable_status());
return;
}
Expand Down
Loading

0 comments on commit 16ffabe

Please sign in to comment.