Skip to content

Commit

Permalink
[Enhancement] Support audic statistics for insert statement (StarRock…
Browse files Browse the repository at this point in the history
…s#29901)

* [Enhancement] Support audic statistics for insert statement

Signed-off-by: liuyehcf <1559500551@qq.com>

* update test

Signed-off-by: liuyehcf <1559500551@qq.com>

* update 2

Signed-off-by: liuyehcf <1559500551@qq.com>

---------

Signed-off-by: liuyehcf <1559500551@qq.com>
  • Loading branch information
liuyehcf authored Aug 28, 2023
1 parent 7fe1597 commit 728573e
Show file tree
Hide file tree
Showing 22 changed files with 396 additions and 1 deletion.
1 change: 1 addition & 0 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ set(EXEC_FILES
pipeline/pipeline_driver_queue.cpp
pipeline/pipeline_driver_poller.cpp
pipeline/pipeline_driver.cpp
pipeline/audit_statistics_reporter.cpp
pipeline/exec_state_reporter.cpp
pipeline/driver_limiter.cpp
pipeline/fragment_context.cpp
Expand Down
77 changes: 77 additions & 0 deletions be/src/exec/pipeline/audit_statistics_reporter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "exec/pipeline/audit_statistics_reporter.h"

#include <thrift/Thrift.h>
#include <thrift/protocol/TDebugProtocol.h>

#include "agent/master_info.h"
#include "gen_cpp/FrontendService_types.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "service/backend_options.h"

namespace starrocks::pipeline {

using apache::thrift::TException;
using apache::thrift::TProcessor;
using apache::thrift::transport::TTransportException;

// including the final status when execution finishes.
Status AuditStatisticsReporter::report_audit_statistics(const TReportAuditStatisticsParams& params, ExecEnv* exec_env,
const TNetworkAddress& fe_addr) {
Status fe_status;
FrontendServiceConnection coord(exec_env->frontend_client_cache(), fe_addr, &fe_status);
if (!fe_status.ok()) {
LOG(WARNING) << "Couldn't get a client for " << fe_addr;
return fe_status;
}

TReportAuditStatisticsResult res;
Status rpc_status;

try {
try {
coord->reportAuditStatistics(res, params);
} catch (TTransportException& e) {
TTransportException::TTransportExceptionType type = e.getType();
if (type != TTransportException::TTransportExceptionType::TIMED_OUT) {
// if not TIMED_OUT, retry
rpc_status = coord.reopen();

if (!rpc_status.ok()) {
return rpc_status;
}
coord->reportAuditStatistics(res, params);
} else {
std::stringstream msg;
msg << "ReportExecStatus() to " << fe_addr << " failed:\n" << e.what();
LOG(WARNING) << msg.str();
rpc_status = Status::InternalError(msg.str());
return rpc_status;
}
}

rpc_status = Status(res.status);
} catch (TException& e) {
std::stringstream msg;
msg << "ReportExecStatus() to " << fe_addr << " failed:\n" << e.what();
LOG(WARNING) << msg.str();
rpc_status = Status::InternalError(msg.str());
return rpc_status;
}
return rpc_status;
}
} // namespace starrocks::pipeline
37 changes: 37 additions & 0 deletions be/src/exec/pipeline/audit_statistics_reporter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <memory>

#include "exec/pipeline/fragment_context.h"
#include "exec/pipeline/pipeline_fwd.h"
#include "gen_cpp/FrontendService.h"
#include "gen_cpp/InternalService_types.h"
#include "gen_cpp/Types_types.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "service/backend_options.h"
#include "util/threadpool.h"

namespace starrocks::pipeline {
class AuditStatisticsReporter {
public:
AuditStatisticsReporter();

static Status report_audit_statistics(const TReportAuditStatisticsParams& params, ExecEnv* exec_env,
const TNetworkAddress& fe_addr);
};
} // namespace starrocks::pipeline
2 changes: 2 additions & 0 deletions be/src/exec/pipeline/exec_state_reporter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ Status ExecStateReporter::report_exec_status(const TReportExecStatusParams& para
msg << "ReportExecStatus() to " << fe_addr << " failed:\n" << e.what();
LOG(WARNING) << msg.str();
rpc_status = Status::InternalError(msg.str());
return rpc_status;
}
}

Expand All @@ -171,6 +172,7 @@ Status ExecStateReporter::report_exec_status(const TReportExecStatusParams& para
msg << "ReportExecStatus() to " << fe_addr << " failed:\n" << e.what();
LOG(WARNING) << msg.str();
rpc_status = Status::InternalError(msg.str());
return rpc_status;
}
return rpc_status;
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/pipeline/olap_table_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "exec/pipeline/olap_table_sink_operator.h"

#include "exec/pipeline/pipeline_driver_executor.h"
#include "exec/tablet_sink.h"
#include "exprs/expr.h"
#include "runtime/buffer_control_block.h"
Expand Down Expand Up @@ -137,6 +138,7 @@ Status OlapTableSinkOperator::set_cancelled(RuntimeState* state) {
Status OlapTableSinkOperator::set_finishing(RuntimeState* state) {
_is_finished = true;

state->exec_env()->wg_driver_executor()->report_audit_statistics(state->query_ctx(), state->fragment_ctx());
if (_is_open_done && !_automatic_partition_chunk) {
// sink's open already finish, we can try_close
return _sink->try_close(state);
Expand Down
32 changes: 32 additions & 0 deletions be/src/exec/pipeline/pipeline_driver_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,38 @@ void GlobalDriverExecutor::report_exec_state(QueryContext* query_ctx, FragmentCo
this->_exec_state_reporter->submit(std::move(report_task));
}

void GlobalDriverExecutor::report_audit_statistics(QueryContext* query_ctx, FragmentContext* fragment_ctx) {
auto query_statistics = query_ctx->final_query_statistic();

TReportAuditStatisticsParams params;
params.__set_query_id(fragment_ctx->query_id());
params.__set_fragment_instance_id(fragment_ctx->fragment_instance_id());
query_statistics->to_params(&params);

auto fe_addr = fragment_ctx->fe_addr();
if (fe_addr.hostname.empty()) {
// query executed by external connectors, like spark and flink connector,
// does not need to report exec state to FE, so return if fe addr is empty.
return;
}

auto exec_env = fragment_ctx->runtime_state()->exec_env();
auto fragment_id = fragment_ctx->fragment_instance_id();

auto status = AuditStatisticsReporter::report_audit_statistics(params, exec_env, fe_addr);
if (!status.ok()) {
if (status.is_not_found()) {
LOG(INFO) << "[Driver] Fail to report audit statistics due to query not found: fragment_instance_id="
<< print_id(fragment_id);
} else {
LOG(WARNING) << "[Driver] Fail to report audit statistics fragment_instance_id=" << print_id(fragment_id)
<< ", status: " << status.to_string();
}
} else {
LOG(INFO) << "[Driver] Succeed to report audit statistics: fragment_instance_id=" << print_id(fragment_id);
}
}

size_t GlobalDriverExecutor::activate_parked_driver(const ImmutableDriverPredicateFunc& predicate_func) {
return _blocked_driver_poller->activate_parked_driver(predicate_func);
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/pipeline/pipeline_driver_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <memory>
#include <unordered_map>

#include "exec/pipeline/audit_statistics_reporter.h"
#include "exec/pipeline/exec_state_reporter.h"
#include "exec/pipeline/pipeline_driver.h"
#include "exec/pipeline/pipeline_driver_poller.h"
Expand Down Expand Up @@ -51,6 +52,8 @@ class DriverExecutor {
virtual void report_exec_state(QueryContext* query_ctx, FragmentContext* fragment_ctx, const Status& status,
bool done, bool attach_profile) = 0;

virtual void report_audit_statistics(QueryContext* query_ctx, FragmentContext* fragment_ctx) = 0;

virtual void iterate_immutable_blocking_driver(const IterateImmutableDriverFunc& call) const = 0;

virtual size_t activate_parked_driver(const ImmutableDriverPredicateFunc& predicate_func) = 0;
Expand All @@ -75,6 +78,7 @@ class GlobalDriverExecutor final : public FactoryMethod<DriverExecutor, GlobalDr
void close() override;
void report_exec_state(QueryContext* query_ctx, FragmentContext* fragment_ctx, const Status& status, bool done,
bool attach_profile) override;
void report_audit_statistics(QueryContext* query_ctx, FragmentContext* fragment_ctx) override;

void iterate_immutable_blocking_driver(const IterateImmutableDriverFunc& call) const override;

Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/pipeline/sink/export_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "exec/data_sink.h"
#include "exec/file_builder.h"
#include "exec/pipeline/fragment_context.h"
#include "exec/pipeline/pipeline_driver_executor.h"
#include "exec/pipeline/sink/sink_io_buffer.h"
#include "exec/plain_text_builder.h"
#include "formats/csv/converter.h"
Expand Down Expand Up @@ -191,6 +192,7 @@ bool ExportSinkOperator::is_finished() const {
}

Status ExportSinkOperator::set_finishing(RuntimeState* state) {
state->exec_env()->wg_driver_executor()->report_audit_statistics(state->query_ctx(), state->fragment_ctx());
return _export_sink_buffer->set_finishing();
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/pipeline/sink/iceberg_table_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <utility>

#include "exec/parquet_builder.h"
#include "exec/pipeline/pipeline_driver_executor.h"

namespace starrocks::pipeline {

Expand Down Expand Up @@ -60,6 +61,8 @@ bool IcebergTableSinkOperator::is_finished() const {
}

Status IcebergTableSinkOperator::set_finishing(RuntimeState* state) {
state->exec_env()->wg_driver_executor()->report_audit_statistics(state->query_ctx(), state->fragment_ctx());

for (const auto& writer : _partition_writers) {
if (!writer.second->closed()) {
writer.second->close(state);
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/pipeline/sink/memory_scratch_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "exec/pipeline/sink/memory_scratch_sink_operator.h"

#include "exec/pipeline/pipeline_driver_executor.h"
#include "util/arrow/row_batch.h"
#include "util/arrow/starrocks_column_to_arrow.h"

Expand Down Expand Up @@ -45,6 +46,7 @@ bool MemoryScratchSinkOperator::is_finished() const {

Status MemoryScratchSinkOperator::set_finishing(RuntimeState* state) {
_is_finished = true;
state->exec_env()->wg_driver_executor()->report_audit_statistics(state->query_ctx(), state->fragment_ctx());
return Status::OK();
}

Expand Down
19 changes: 19 additions & 0 deletions be/src/runtime/query_statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,25 @@ void QueryStatistics::to_pb(PQueryStatistics* statistics) {
}
}

void QueryStatistics::to_params(TReportAuditStatisticsParams* params) {
DCHECK(params != nullptr);
params->__set_scan_rows(scan_rows);
params->__set_scan_bytes(scan_bytes);
params->__set_returned_rows(returned_rows);
params->__set_cpu_cost_ns(cpu_ns);
params->__set_mem_cost_bytes(mem_cost_bytes);
params->__set_spill_bytes(spill_bytes);
{
std::lock_guard l(_lock);
for (const auto& [table_id, stats_item] : _stats_items) {
auto new_stats_item = params->stats_items.emplace_back();
new_stats_item.__set_table_id(table_id);
new_stats_item.__set_scan_rows(stats_item->scan_rows);
new_stats_item.__set_scan_bytes(stats_item->scan_bytes);
}
}
}

void QueryStatistics::clear() {
scan_rows = 0;
scan_bytes = 0;
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/query_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

#include <mutex>

#include "gen_cpp/FrontendService.h"
#include "gen_cpp/data.pb.h"
#include "util/spinlock.h"

Expand All @@ -59,6 +60,7 @@ class QueryStatistics {
void add_spill_bytes(int64_t bytes) { spill_bytes += bytes; }

void to_pb(PQueryStatistics* statistics);
void to_params(TReportAuditStatisticsParams* params);

void merge(int sender_id, QueryStatistics& other);
void merge_pb(const PQueryStatistics& statistics);
Expand Down
33 changes: 33 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/DefaultCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import com.starrocks.planner.StreamLoadPlanner;
import com.starrocks.privilege.PrivilegeBuiltinConstants;
import com.starrocks.proto.PPlanFragmentCancelReason;
import com.starrocks.proto.PQueryStatistics;
import com.starrocks.proto.QueryStatisticsItemPB;
import com.starrocks.qe.scheduler.Coordinator;
import com.starrocks.qe.scheduler.Deployer;
import com.starrocks.qe.scheduler.QueryRuntimeProfile;
Expand All @@ -73,10 +75,12 @@
import com.starrocks.sql.ast.UserIdentity;
import com.starrocks.sql.plan.ExecPlan;
import com.starrocks.system.ComputeNode;
import com.starrocks.thrift.TAuditStatisticsItem;
import com.starrocks.thrift.TDescriptorTable;
import com.starrocks.thrift.TLoadJobType;
import com.starrocks.thrift.TNetworkAddress;
import com.starrocks.thrift.TQueryType;
import com.starrocks.thrift.TReportAuditStatisticsParams;
import com.starrocks.thrift.TReportExecStatusParams;
import com.starrocks.thrift.TRuntimeFilterDestination;
import com.starrocks.thrift.TRuntimeFilterProberParams;
Expand All @@ -85,6 +89,7 @@
import com.starrocks.thrift.TTabletCommitInfo;
import com.starrocks.thrift.TTabletFailInfo;
import com.starrocks.thrift.TUniqueId;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -125,6 +130,8 @@ public class DefaultCoordinator extends Coordinator {
*/
private Status queryStatus = new Status();

private PQueryStatistics auditStatistics;

private final QueryRuntimeProfile queryProfile;

private ResultReceiver receiver;
Expand Down Expand Up @@ -873,6 +880,27 @@ public void updateFragmentExecStatus(TReportExecStatusParams params) {
updateJobProgress(params);
}

@Override
public void updateAuditStatistics(TReportAuditStatisticsParams params) {
auditStatistics = new PQueryStatistics();
auditStatistics.scanRows = params.scan_rows;
auditStatistics.scanBytes = params.scan_bytes;
auditStatistics.returnedRows = params.returned_rows;
auditStatistics.cpuCostNs = params.cpu_cost_ns;
auditStatistics.memCostBytes = params.mem_cost_bytes;
auditStatistics.spillBytes = params.spill_bytes;
if (CollectionUtils.isNotEmpty(params.stats_items)) {
auditStatistics.statsItems = Lists.newArrayList();
for (TAuditStatisticsItem item : params.stats_items) {
QueryStatisticsItemPB itemPB = new QueryStatisticsItemPB();
itemPB.scanBytes = item.scan_bytes;
itemPB.scanRows = item.scan_rows;
itemPB.tableId = item.table_id;
auditStatistics.statsItems.add(itemPB);
}
}
}

private void updateJobProgress(TReportExecStatusParams params) {
if (params.isSetLoad_type()) {
TLoadJobType loadJobType = params.getLoad_type();
Expand Down Expand Up @@ -1003,6 +1031,11 @@ public List<QueryStatisticsItem.FragmentInstanceInfo> getFragmentInstanceInfos()
return executionDAG.getFragmentInstanceInfos();
}

@Override
public PQueryStatistics getAuditStatistics() {
return auditStatistics;
}

@Override
public boolean isThriftServerHighLoad() {
return this.thriftServerHighLoad;
Expand Down
Loading

0 comments on commit 728573e

Please sign in to comment.