Skip to content

Commit

Permalink
[Enhancement] Support audic statistics for insert statement (backport #…
Browse files Browse the repository at this point in the history
…29901) (#30174)

* [Enhancement] Support audic statistics for insert statement (#29901)

---------

Signed-off-by: liuyehcf <1559500551@qq.com>
(cherry picked from commit 728573e)

# Conflicts:
#	be/src/exec/pipeline/pipeline_driver_executor.h
#	be/src/exec/pipeline/sink/iceberg_table_sink_operator.cpp
#	fe/fe-core/src/main/java/com/starrocks/qe/DefaultCoordinator.java
#	fe/fe-core/src/main/java/com/starrocks/qe/scheduler/Coordinator.java
#	fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java

* solve conflict

Signed-off-by: liuyehcf <1559500551@qq.com>

---------

Signed-off-by: liuyehcf <1559500551@qq.com>
Co-authored-by: liuyehcf <1559500551@qq.com>
  • Loading branch information
mergify[bot] and liuyehcf authored Aug 31, 2023
1 parent 87dd6ea commit c9e042a
Show file tree
Hide file tree
Showing 21 changed files with 297 additions and 18 deletions.
1 change: 1 addition & 0 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ set(EXEC_FILES
pipeline/pipeline_driver_queue.cpp
pipeline/pipeline_driver_poller.cpp
pipeline/pipeline_driver.cpp
pipeline/audit_statistics_reporter.cpp
pipeline/exec_state_reporter.cpp
pipeline/driver_limiter.cpp
pipeline/fragment_context.cpp
Expand Down
77 changes: 77 additions & 0 deletions be/src/exec/pipeline/audit_statistics_reporter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "exec/pipeline/audit_statistics_reporter.h"

#include <thrift/Thrift.h>
#include <thrift/protocol/TDebugProtocol.h>

#include "agent/master_info.h"
#include "gen_cpp/FrontendService_types.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "service/backend_options.h"

namespace starrocks::pipeline {

using apache::thrift::TException;
using apache::thrift::TProcessor;
using apache::thrift::transport::TTransportException;

// including the final status when execution finishes.
Status AuditStatisticsReporter::report_audit_statistics(const TReportAuditStatisticsParams& params, ExecEnv* exec_env,
const TNetworkAddress& fe_addr) {
Status fe_status;
FrontendServiceConnection coord(exec_env->frontend_client_cache(), fe_addr, &fe_status);
if (!fe_status.ok()) {
LOG(WARNING) << "Couldn't get a client for " << fe_addr;
return fe_status;
}

TReportAuditStatisticsResult res;
Status rpc_status;

try {
try {
coord->reportAuditStatistics(res, params);
} catch (TTransportException& e) {
TTransportException::TTransportExceptionType type = e.getType();
if (type != TTransportException::TTransportExceptionType::TIMED_OUT) {
// if not TIMED_OUT, retry
rpc_status = coord.reopen();

if (!rpc_status.ok()) {
return rpc_status;
}
coord->reportAuditStatistics(res, params);
} else {
std::stringstream msg;
msg << "ReportExecStatus() to " << fe_addr << " failed:\n" << e.what();
LOG(WARNING) << msg.str();
rpc_status = Status::InternalError(msg.str());
return rpc_status;
}
}

rpc_status = Status(res.status);
} catch (TException& e) {
std::stringstream msg;
msg << "ReportExecStatus() to " << fe_addr << " failed:\n" << e.what();
LOG(WARNING) << msg.str();
rpc_status = Status::InternalError(msg.str());
return rpc_status;
}
return rpc_status;
}
} // namespace starrocks::pipeline
37 changes: 37 additions & 0 deletions be/src/exec/pipeline/audit_statistics_reporter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <memory>

#include "exec/pipeline/fragment_context.h"
#include "exec/pipeline/pipeline_fwd.h"
#include "gen_cpp/FrontendService.h"
#include "gen_cpp/InternalService_types.h"
#include "gen_cpp/Types_types.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "service/backend_options.h"
#include "util/threadpool.h"

namespace starrocks::pipeline {
class AuditStatisticsReporter {
public:
AuditStatisticsReporter();

static Status report_audit_statistics(const TReportAuditStatisticsParams& params, ExecEnv* exec_env,
const TNetworkAddress& fe_addr);
};
} // namespace starrocks::pipeline
2 changes: 2 additions & 0 deletions be/src/exec/pipeline/exec_state_reporter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ Status ExecStateReporter::report_exec_status(const TReportExecStatusParams& para
msg << "ReportExecStatus() to " << fe_addr << " failed:\n" << e.what();
LOG(WARNING) << msg.str();
rpc_status = Status::InternalError(msg.str());
return rpc_status;
}
}

Expand All @@ -161,6 +162,7 @@ Status ExecStateReporter::report_exec_status(const TReportExecStatusParams& para
msg << "ReportExecStatus() to " << fe_addr << " failed:\n" << e.what();
LOG(WARNING) << msg.str();
rpc_status = Status::InternalError(msg.str());
return rpc_status;
}
return rpc_status;
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/pipeline/olap_table_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "exec/pipeline/olap_table_sink_operator.h"

#include "exec/pipeline/pipeline_driver_executor.h"
#include "exec/tablet_sink.h"
#include "exprs/expr.h"
#include "runtime/buffer_control_block.h"
Expand Down Expand Up @@ -137,6 +138,9 @@ Status OlapTableSinkOperator::set_cancelled(RuntimeState* state) {
Status OlapTableSinkOperator::set_finishing(RuntimeState* state) {
_is_finished = true;

auto* executor = state->fragment_ctx()->enable_resource_group() ? state->exec_env()->wg_driver_executor()
: state->exec_env()->driver_executor();
executor->report_audit_statistics(state->query_ctx(), state->fragment_ctx());
if (_is_open_done && !_automatic_partition_chunk) {
// sink's open already finish, we can try_close
return _sink->try_close(state);
Expand Down
32 changes: 32 additions & 0 deletions be/src/exec/pipeline/pipeline_driver_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,38 @@ void GlobalDriverExecutor::report_exec_state(QueryContext* query_ctx, FragmentCo
this->_exec_state_reporter->submit(std::move(report_task));
}

void GlobalDriverExecutor::report_audit_statistics(QueryContext* query_ctx, FragmentContext* fragment_ctx) {
auto query_statistics = query_ctx->final_query_statistic();

TReportAuditStatisticsParams params;
params.__set_query_id(fragment_ctx->query_id());
params.__set_fragment_instance_id(fragment_ctx->fragment_instance_id());
query_statistics->to_params(&params);

auto fe_addr = fragment_ctx->fe_addr();
if (fe_addr.hostname.empty()) {
// query executed by external connectors, like spark and flink connector,
// does not need to report exec state to FE, so return if fe addr is empty.
return;
}

auto exec_env = fragment_ctx->runtime_state()->exec_env();
auto fragment_id = fragment_ctx->fragment_instance_id();

auto status = AuditStatisticsReporter::report_audit_statistics(params, exec_env, fe_addr);
if (!status.ok()) {
if (status.is_not_found()) {
LOG(INFO) << "[Driver] Fail to report audit statistics due to query not found: fragment_instance_id="
<< print_id(fragment_id);
} else {
LOG(WARNING) << "[Driver] Fail to report audit statistics fragment_instance_id=" << print_id(fragment_id)
<< ", status: " << status.to_string();
}
} else {
LOG(INFO) << "[Driver] Succeed to report audit statistics: fragment_instance_id=" << print_id(fragment_id);
}
}

size_t GlobalDriverExecutor::activate_parked_driver(const ImmutableDriverPredicateFunc& predicate_func) {
return _blocked_driver_poller->activate_parked_driver(predicate_func);
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/pipeline/pipeline_driver_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <memory>
#include <unordered_map>

#include "exec/pipeline/audit_statistics_reporter.h"
#include "exec/pipeline/exec_state_reporter.h"
#include "exec/pipeline/pipeline_driver.h"
#include "exec/pipeline/pipeline_driver_poller.h"
Expand Down Expand Up @@ -50,6 +51,8 @@ class DriverExecutor {
virtual void report_exec_state(QueryContext* query_ctx, FragmentContext* fragment_ctx, const Status& status,
bool done) = 0;

virtual void report_audit_statistics(QueryContext* query_ctx, FragmentContext* fragment_ctx) = 0;

virtual void iterate_immutable_blocking_driver(const IterateImmutableDriverFunc& call) const = 0;

virtual size_t activate_parked_driver(const ImmutableDriverPredicateFunc& predicate_func) = 0;
Expand All @@ -73,6 +76,7 @@ class GlobalDriverExecutor final : public FactoryMethod<DriverExecutor, GlobalDr
void cancel(DriverRawPtr driver) override;
void report_exec_state(QueryContext* query_ctx, FragmentContext* fragment_ctx, const Status& status,
bool done) override;
void report_audit_statistics(QueryContext* query_ctx, FragmentContext* fragment_ctx) override;

void iterate_immutable_blocking_driver(const IterateImmutableDriverFunc& call) const override;

Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/pipeline/sink/export_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "exec/data_sink.h"
#include "exec/file_builder.h"
#include "exec/pipeline/fragment_context.h"
#include "exec/pipeline/pipeline_driver_executor.h"
#include "exec/pipeline/sink/sink_io_buffer.h"
#include "exec/plain_text_builder.h"
#include "formats/csv/converter.h"
Expand Down Expand Up @@ -187,6 +188,9 @@ bool ExportSinkOperator::is_finished() const {
}

Status ExportSinkOperator::set_finishing(RuntimeState* state) {
auto* executor = state->fragment_ctx()->enable_resource_group() ? state->exec_env()->wg_driver_executor()
: state->exec_env()->driver_executor();
executor->report_audit_statistics(state->query_ctx(), state->fragment_ctx());
return _export_sink_buffer->set_finishing();
}

Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/pipeline/sink/memory_scratch_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "exec/pipeline/sink/memory_scratch_sink_operator.h"

#include "exec/pipeline/pipeline_driver_executor.h"
#include "util/arrow/row_batch.h"
#include "util/arrow/starrocks_column_to_arrow.h"

Expand Down Expand Up @@ -45,6 +46,9 @@ bool MemoryScratchSinkOperator::is_finished() const {

Status MemoryScratchSinkOperator::set_finishing(RuntimeState* state) {
_is_finished = true;
auto* executor = state->fragment_ctx()->enable_resource_group() ? state->exec_env()->wg_driver_executor()
: state->exec_env()->driver_executor();
executor->report_audit_statistics(state->query_ctx(), state->fragment_ctx());
return Status::OK();
}

Expand Down
18 changes: 18 additions & 0 deletions be/src/runtime/query_statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,24 @@ void QueryStatistics::to_pb(PQueryStatistics* statistics) {
}
}

void QueryStatistics::to_params(TReportAuditStatisticsParams* params) {
DCHECK(params != nullptr);
params->__set_scan_rows(scan_rows);
params->__set_scan_bytes(scan_bytes);
params->__set_returned_rows(returned_rows);
params->__set_cpu_cost_ns(cpu_ns);
params->__set_mem_cost_bytes(mem_cost_bytes);
{
std::lock_guard l(_lock);
for (const auto& [table_id, stats_item] : _stats_items) {
auto new_stats_item = params->stats_items.emplace_back();
new_stats_item.__set_table_id(table_id);
new_stats_item.__set_scan_rows(stats_item->scan_rows);
new_stats_item.__set_scan_bytes(stats_item->scan_bytes);
}
}
}

void QueryStatistics::clear() {
scan_rows = 0;
scan_bytes = 0;
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/query_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

#include <mutex>

#include "gen_cpp/FrontendService.h"
#include "gen_cpp/data.pb.h"
#include "util/spinlock.h"

Expand All @@ -58,6 +59,7 @@ class QueryStatistics {
void add_mem_costs(int64_t bytes) { mem_cost_bytes += bytes; }

void to_pb(PQueryStatistics* statistics);
void to_params(TReportAuditStatisticsParams* params);

void merge(int sender_id, QueryStatistics& other);
void merge_pb(const PQueryStatistics& statistics);
Expand Down
2 changes: 1 addition & 1 deletion be/test/exec/stream/stream_operators_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ StatusOr<ChunkPtr> GeneratorStreamSourceOperator::pull_chunk(starrocks::RuntimeS
}

Status PrinterStreamSinkOperator::push_chunk(RuntimeState* state, const ChunkPtr& chunk) {
std::cout << "<<<<<<<<< Sink Result: " << chunk->debug_columns() << std::endl;
std::cout << "========= Sink Result: " << chunk->debug_columns() << std::endl;
for (auto& col : chunk->columns()) {
std::cout << col->debug_string() << std::endl;
}
Expand Down
4 changes: 0 additions & 4 deletions docs/sql-reference/sql-functions/string-functions/rpad.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,7 @@

## Description

<<<<<<< HEAD
This function returns strings with a length of len (starting counting from the first syllable) in str. If len is longer than str, the return value is lengthened to len characters by adding pad characters in front of str. If str is longer than len, the return value is shortened to len characters. Len means the length of characters, not bytes.
=======
This function returns strings with a length of `len` (starting counting from the first syllable) in `str`. If `len` is longer than `str`, the return value is lengthened to `len` characters by adding pad characters behind `str`. If `str` is longer than `len`, the return value is shortened to `len` characters. `len` means the length of characters, not bytes.
>>>>>>> 73797fe93e ([Doc] add func/datatype overview (#27428))

## Syntax

Expand Down
29 changes: 29 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import com.starrocks.proto.PExecPlanFragmentResult;
import com.starrocks.proto.PPlanFragmentCancelReason;
import com.starrocks.proto.PQueryStatistics;
import com.starrocks.proto.QueryStatisticsItemPB;
import com.starrocks.proto.StatusPB;
import com.starrocks.qe.QueryStatisticsItem.FragmentInstanceInfo;
import com.starrocks.rpc.BackendServiceClient;
Expand All @@ -79,6 +80,7 @@
import com.starrocks.system.ComputeNode;
import com.starrocks.system.SystemInfoService;
import com.starrocks.task.LoadEtlTask;
import com.starrocks.thrift.TAuditStatisticsItem;
import com.starrocks.thrift.TCompressionType;
import com.starrocks.thrift.TDescriptorTable;
import com.starrocks.thrift.TExecBatchPlanFragmentsParams;
Expand All @@ -89,6 +91,7 @@
import com.starrocks.thrift.TQueryGlobals;
import com.starrocks.thrift.TQueryOptions;
import com.starrocks.thrift.TQueryType;
import com.starrocks.thrift.TReportAuditStatisticsParams;
import com.starrocks.thrift.TReportExecStatusParams;
import com.starrocks.thrift.TRuntimeFilterDestination;
import com.starrocks.thrift.TRuntimeFilterProberParams;
Expand All @@ -97,6 +100,7 @@
import com.starrocks.thrift.TTabletFailInfo;
import com.starrocks.thrift.TUniqueId;
import com.starrocks.thrift.TUnit;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -182,6 +186,8 @@ public class Coordinator {

private boolean thriftServerHighLoad;

private PQueryStatistics auditStatistics;

// Used for new planner
public Coordinator(ConnectContext context, List<PlanFragment> fragments, List<ScanNode> scanNodes,
TDescriptorTable descTable) {
Expand Down Expand Up @@ -1503,6 +1509,25 @@ public void updateFragmentExecStatus(TReportExecStatusParams params) {
}
}

public void updateAuditStatistics(TReportAuditStatisticsParams params) {
auditStatistics = new PQueryStatistics();
auditStatistics.scanRows = params.scan_rows;
auditStatistics.scanBytes = params.scan_bytes;
auditStatistics.returnedRows = params.returned_rows;
auditStatistics.cpuCostNs = params.cpu_cost_ns;
auditStatistics.memCostBytes = params.mem_cost_bytes;
if (CollectionUtils.isNotEmpty(params.stats_items)) {
auditStatistics.statsItems = Lists.newArrayList();
for (TAuditStatisticsItem item : params.stats_items) {
QueryStatisticsItemPB itemPB = new QueryStatisticsItemPB();
itemPB.scanBytes = item.scan_bytes;
itemPB.scanRows = item.scan_rows;
itemPB.tableId = item.table_id;
auditStatistics.statsItems.add(itemPB);
}
}
}

public void endProfile() {
if (backendExecStates.isEmpty()) {
return;
Expand Down Expand Up @@ -1797,6 +1822,10 @@ private void attachInstanceProfileToFragmentProfile() {
}
}

public PQueryStatistics getAuditStatistics() {
return auditStatistics;
}

public boolean isThriftServerHighLoad() {
return this.thriftServerHighLoad;
}
Expand Down
Loading

0 comments on commit c9e042a

Please sign in to comment.