Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Support audic statistics for insert statement #29901

Merged
merged 3 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ set(EXEC_FILES
pipeline/pipeline_driver_queue.cpp
pipeline/pipeline_driver_poller.cpp
pipeline/pipeline_driver.cpp
pipeline/audit_statistics_reporter.cpp
pipeline/exec_state_reporter.cpp
pipeline/driver_limiter.cpp
pipeline/fragment_context.cpp
Expand Down
75 changes: 75 additions & 0 deletions be/src/exec/pipeline/audit_statistics_reporter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "exec/pipeline/audit_statistics_reporter.h"

#include <thrift/Thrift.h>
#include <thrift/protocol/TDebugProtocol.h>

#include "agent/master_info.h"
#include "gen_cpp/FrontendService_types.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "service/backend_options.h"

namespace starrocks::pipeline {

using apache::thrift::TException;
using apache::thrift::TProcessor;
using apache::thrift::transport::TTransportException;

// including the final status when execution finishes.
Status AuditStatisticsReporter::report_audit_statistics(const TReportAuditStatisticsParams& params, ExecEnv* exec_env,
const TNetworkAddress& fe_addr) {
Status fe_status;
FrontendServiceConnection coord(exec_env->frontend_client_cache(), fe_addr, &fe_status);
if (!fe_status.ok()) {
LOG(WARNING) << "Couldn't get a client for " << fe_addr;
return fe_status;
}

TReportAuditStatisticsResult res;
Status rpc_status;

try {
try {
coord->reportAuditStatistics(res, params);
} catch (TTransportException& e) {
TTransportException::TTransportExceptionType type = e.getType();
if (type != TTransportException::TTransportExceptionType::TIMED_OUT) {
// if not TIMED_OUT, retry
rpc_status = coord.reopen();

if (!rpc_status.ok()) {
return rpc_status;
}
coord->reportAuditStatistics(res, params);
} else {
std::stringstream msg;
msg << "ReportExecStatus() to " << fe_addr << " failed:\n" << e.what();
LOG(WARNING) << msg.str();
rpc_status = Status::InternalError(msg.str());
}
}

rpc_status = Status(res.status);
} catch (TException& e) {
std::stringstream msg;
msg << "ReportExecStatus() to " << fe_addr << " failed:\n" << e.what();
LOG(WARNING) << msg.str();
rpc_status = Status::InternalError(msg.str());
}
return rpc_status;
}
} // namespace starrocks::pipeline
37 changes: 37 additions & 0 deletions be/src/exec/pipeline/audit_statistics_reporter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <memory>

#include "exec/pipeline/fragment_context.h"
#include "exec/pipeline/pipeline_fwd.h"
#include "gen_cpp/FrontendService.h"
#include "gen_cpp/InternalService_types.h"
#include "gen_cpp/Types_types.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "service/backend_options.h"
#include "util/threadpool.h"

namespace starrocks::pipeline {
class AuditStatisticsReporter {
public:
AuditStatisticsReporter();

static Status report_audit_statistics(const TReportAuditStatisticsParams& params, ExecEnv* exec_env,
const TNetworkAddress& fe_addr);
};
} // namespace starrocks::pipeline
2 changes: 2 additions & 0 deletions be/src/exec/pipeline/olap_table_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "exec/pipeline/olap_table_sink_operator.h"

#include "exec/pipeline/pipeline_driver_executor.h"
#include "exec/tablet_sink.h"
#include "exprs/expr.h"
#include "runtime/buffer_control_block.h"
Expand Down Expand Up @@ -137,6 +138,7 @@ Status OlapTableSinkOperator::set_cancelled(RuntimeState* state) {
Status OlapTableSinkOperator::set_finishing(RuntimeState* state) {
_is_finished = true;

state->exec_env()->wg_driver_executor()->report_audit_statistics(state->query_ctx(), state->fragment_ctx());
if (_is_open_done && !_automatic_partition_chunk) {
// sink's open already finish, we can try_close
return _sink->try_close(state);
Expand Down
32 changes: 32 additions & 0 deletions be/src/exec/pipeline/pipeline_driver_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,38 @@ void GlobalDriverExecutor::report_exec_state(QueryContext* query_ctx, FragmentCo
this->_exec_state_reporter->submit(std::move(report_task));
}

void GlobalDriverExecutor::report_audit_statistics(QueryContext* query_ctx, FragmentContext* fragment_ctx) {
auto query_statistics = query_ctx->final_query_statistic();

TReportAuditStatisticsParams params;
params.__set_query_id(fragment_ctx->query_id());
params.__set_fragment_instance_id(fragment_ctx->fragment_instance_id());
query_statistics->to_params(&params);

auto fe_addr = fragment_ctx->fe_addr();
if (fe_addr.hostname.empty()) {
// query executed by external connectors, like spark and flink connector,
// does not need to report exec state to FE, so return if fe addr is empty.
return;
}

auto exec_env = fragment_ctx->runtime_state()->exec_env();
auto fragment_id = fragment_ctx->fragment_instance_id();

auto status = AuditStatisticsReporter::report_audit_statistics(params, exec_env, fe_addr);
if (!status.ok()) {
if (status.is_not_found()) {
LOG(INFO) << "[Driver] Fail to report audit statistics due to query not found: fragment_instance_id="
<< print_id(fragment_id);
} else {
LOG(WARNING) << "[Driver] Fail to report audit statistics fragment_instance_id=" << print_id(fragment_id)
<< ", status: " << status.to_string();
}
} else {
LOG(INFO) << "[Driver] Succeed to report audit statistics: fragment_instance_id=" << print_id(fragment_id);
}
}

size_t GlobalDriverExecutor::activate_parked_driver(const ImmutableDriverPredicateFunc& predicate_func) {
return _blocked_driver_poller->activate_parked_driver(predicate_func);
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/pipeline/pipeline_driver_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <memory>
#include <unordered_map>

#include "exec/pipeline/audit_statistics_reporter.h"
#include "exec/pipeline/exec_state_reporter.h"
#include "exec/pipeline/pipeline_driver.h"
#include "exec/pipeline/pipeline_driver_poller.h"
Expand Down Expand Up @@ -51,6 +52,8 @@ class DriverExecutor {
virtual void report_exec_state(QueryContext* query_ctx, FragmentContext* fragment_ctx, const Status& status,
bool done, bool attach_profile) = 0;

virtual void report_audit_statistics(QueryContext* query_ctx, FragmentContext* fragment_ctx) = 0;

virtual void iterate_immutable_blocking_driver(const IterateImmutableDriverFunc& call) const = 0;

virtual size_t activate_parked_driver(const ImmutableDriverPredicateFunc& predicate_func) = 0;
Expand All @@ -75,6 +78,7 @@ class GlobalDriverExecutor final : public FactoryMethod<DriverExecutor, GlobalDr
void close() override;
void report_exec_state(QueryContext* query_ctx, FragmentContext* fragment_ctx, const Status& status, bool done,
bool attach_profile) override;
void report_audit_statistics(QueryContext* query_ctx, FragmentContext* fragment_ctx) override;

void iterate_immutable_blocking_driver(const IterateImmutableDriverFunc& call) const override;

Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/pipeline/sink/export_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "exec/data_sink.h"
#include "exec/file_builder.h"
#include "exec/pipeline/fragment_context.h"
#include "exec/pipeline/pipeline_driver_executor.h"
#include "exec/pipeline/sink/sink_io_buffer.h"
#include "exec/plain_text_builder.h"
#include "formats/csv/converter.h"
Expand Down Expand Up @@ -191,6 +192,7 @@ bool ExportSinkOperator::is_finished() const {
}

Status ExportSinkOperator::set_finishing(RuntimeState* state) {
state->exec_env()->wg_driver_executor()->report_audit_statistics(state->query_ctx(), state->fragment_ctx());
return _export_sink_buffer->set_finishing();
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/pipeline/sink/iceberg_table_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <utility>

#include "exec/parquet_builder.h"
#include "exec/pipeline/pipeline_driver_executor.h"

namespace starrocks::pipeline {

Expand Down Expand Up @@ -60,6 +61,8 @@ bool IcebergTableSinkOperator::is_finished() const {
}

Status IcebergTableSinkOperator::set_finishing(RuntimeState* state) {
state->exec_env()->wg_driver_executor()->report_audit_statistics(state->query_ctx(), state->fragment_ctx());

for (const auto& writer : _partition_writers) {
if (!writer.second->closed()) {
writer.second->close(state);
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/pipeline/sink/memory_scratch_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "exec/pipeline/sink/memory_scratch_sink_operator.h"

#include "exec/pipeline/pipeline_driver_executor.h"
#include "util/arrow/row_batch.h"
#include "util/arrow/starrocks_column_to_arrow.h"

Expand Down Expand Up @@ -45,6 +46,7 @@ bool MemoryScratchSinkOperator::is_finished() const {

Status MemoryScratchSinkOperator::set_finishing(RuntimeState* state) {
_is_finished = true;
state->exec_env()->wg_driver_executor()->report_audit_statistics(state->query_ctx(), state->fragment_ctx());
return Status::OK();
}

Expand Down
19 changes: 19 additions & 0 deletions be/src/runtime/query_statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,25 @@ void QueryStatistics::to_pb(PQueryStatistics* statistics) {
}
}

void QueryStatistics::to_params(TReportAuditStatisticsParams* params) {
DCHECK(params != nullptr);
params->__set_scan_rows(scan_rows);
params->__set_scan_bytes(scan_bytes);
params->__set_returned_rows(returned_rows);
params->__set_cpu_cost_ns(cpu_ns);
params->__set_mem_cost_bytes(mem_cost_bytes);
params->__set_spill_bytes(spill_bytes);
{
std::lock_guard l(_lock);
for (const auto& [table_id, stats_item] : _stats_items) {
auto new_stats_item = params->stats_items.emplace_back();
new_stats_item.__set_table_id(table_id);
new_stats_item.__set_scan_rows(stats_item->scan_rows);
new_stats_item.__set_scan_bytes(stats_item->scan_bytes);
}
}
}

void QueryStatistics::clear() {
scan_rows = 0;
scan_bytes = 0;
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/query_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

#include <mutex>

#include "gen_cpp/FrontendService.h"
#include "gen_cpp/data.pb.h"
#include "util/spinlock.h"

Expand All @@ -59,6 +60,7 @@ class QueryStatistics {
void add_spill_bytes(int64_t bytes) { spill_bytes += bytes; }

void to_pb(PQueryStatistics* statistics);
void to_params(TReportAuditStatisticsParams* params);

void merge(int sender_id, QueryStatistics& other);
void merge_pb(const PQueryStatistics& statistics);
Expand Down
33 changes: 33 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/DefaultCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import com.starrocks.planner.StreamLoadPlanner;
import com.starrocks.privilege.PrivilegeBuiltinConstants;
import com.starrocks.proto.PPlanFragmentCancelReason;
import com.starrocks.proto.PQueryStatistics;
import com.starrocks.proto.QueryStatisticsItemPB;
import com.starrocks.qe.scheduler.Coordinator;
import com.starrocks.qe.scheduler.Deployer;
import com.starrocks.qe.scheduler.QueryRuntimeProfile;
Expand All @@ -73,10 +75,12 @@
import com.starrocks.sql.ast.UserIdentity;
import com.starrocks.sql.plan.ExecPlan;
import com.starrocks.system.ComputeNode;
import com.starrocks.thrift.TAuditStatisticsItem;
import com.starrocks.thrift.TDescriptorTable;
import com.starrocks.thrift.TLoadJobType;
import com.starrocks.thrift.TNetworkAddress;
import com.starrocks.thrift.TQueryType;
import com.starrocks.thrift.TReportAuditStatisticsParams;
import com.starrocks.thrift.TReportExecStatusParams;
import com.starrocks.thrift.TRuntimeFilterDestination;
import com.starrocks.thrift.TRuntimeFilterProberParams;
Expand All @@ -85,6 +89,7 @@
import com.starrocks.thrift.TTabletCommitInfo;
import com.starrocks.thrift.TTabletFailInfo;
import com.starrocks.thrift.TUniqueId;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -125,6 +130,8 @@ public class DefaultCoordinator extends Coordinator {
*/
private Status queryStatus = new Status();

private PQueryStatistics auditStatistics;

private final QueryRuntimeProfile queryProfile;

private ResultReceiver receiver;
Expand Down Expand Up @@ -873,6 +880,27 @@ public void updateFragmentExecStatus(TReportExecStatusParams params) {
updateJobProgress(params);
}

@Override
public void updateAuditStatistics(TReportAuditStatisticsParams params) {
auditStatistics = new PQueryStatistics();
auditStatistics.scanRows = params.scan_rows;
auditStatistics.scanBytes = params.scan_bytes;
auditStatistics.returnedRows = params.returned_rows;
auditStatistics.cpuCostNs = params.cpu_cost_ns;
auditStatistics.memCostBytes = params.mem_cost_bytes;
auditStatistics.spillBytes = params.spill_bytes;
if (CollectionUtils.isNotEmpty(params.stats_items)) {
auditStatistics.statsItems = Lists.newArrayList();
for (TAuditStatisticsItem item : params.stats_items) {
QueryStatisticsItemPB itemPB = new QueryStatisticsItemPB();
itemPB.scanBytes = item.scan_bytes;
itemPB.scanRows = item.scan_rows;
itemPB.tableId = item.table_id;
auditStatistics.statsItems.add(itemPB);
}
}
}

private void updateJobProgress(TReportExecStatusParams params) {
if (params.isSetLoad_type()) {
TLoadJobType loadJobType = params.getLoad_type();
Expand Down Expand Up @@ -1003,6 +1031,11 @@ public List<QueryStatisticsItem.FragmentInstanceInfo> getFragmentInstanceInfos()
return executionDAG.getFragmentInstanceInfos();
}

@Override
public PQueryStatistics getAuditStatistics() {
return auditStatistics;
}

@Override
public boolean isThriftServerHighLoad() {
return this.thriftServerHighLoad;
Expand Down
4 changes: 4 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/QeProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.starrocks.thrift.TBatchReportExecStatusParams;
import com.starrocks.thrift.TBatchReportExecStatusResult;
import com.starrocks.thrift.TNetworkAddress;
import com.starrocks.thrift.TReportAuditStatisticsParams;
import com.starrocks.thrift.TReportAuditStatisticsResult;
import com.starrocks.thrift.TReportExecStatusParams;
import com.starrocks.thrift.TReportExecStatusResult;
import com.starrocks.thrift.TUniqueId;
Expand All @@ -33,6 +35,8 @@ public interface QeProcessor {

TReportExecStatusResult reportExecStatus(TReportExecStatusParams params, TNetworkAddress beAddr);

TReportAuditStatisticsResult reportAuditStatistics(TReportAuditStatisticsParams params, TNetworkAddress beAddr);

TBatchReportExecStatusResult batchReportExecStatus(TBatchReportExecStatusParams params, TNetworkAddress beAddr);

void registerQuery(TUniqueId queryId, Coordinator coord) throws UserException;
Expand Down
Loading