Skip to content

Commit

Permalink
[Enhancement] Support audic statistics for insert statement (backport #…
Browse files Browse the repository at this point in the history
…29901) (#30175)

* [Enhancement] Support audic statistics for insert statement (#29901)

---------

Signed-off-by: liuyehcf <1559500551@qq.com>
(cherry picked from commit 728573e)

# Conflicts:
#	be/src/exec/pipeline/olap_table_sink_operator.cpp
#	be/src/exec/pipeline/pipeline_driver_executor.cpp
#	be/src/exec/pipeline/pipeline_driver_executor.h
#	be/src/exec/pipeline/sink/iceberg_table_sink_operator.cpp
#	fe/fe-core/src/main/java/com/starrocks/qe/DefaultCoordinator.java
#	fe/fe-core/src/main/java/com/starrocks/qe/scheduler/Coordinator.java
#	fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java

* solve conflict

Signed-off-by: liuyehcf <1559500551@qq.com>

---------

Signed-off-by: liuyehcf <1559500551@qq.com>
Co-authored-by: liuyehcf <1559500551@qq.com>
  • Loading branch information
mergify[bot] and liuyehcf authored Aug 31, 2023
1 parent 87c99dd commit 5419c90
Show file tree
Hide file tree
Showing 21 changed files with 290 additions and 6 deletions.
1 change: 1 addition & 0 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ set(EXEC_FILES
pipeline/pipeline_driver_queue.cpp
pipeline/pipeline_driver_poller.cpp
pipeline/pipeline_driver.cpp
pipeline/audit_statistics_reporter.cpp
pipeline/exec_state_reporter.cpp
pipeline/driver_limiter.cpp
pipeline/fragment_context.cpp
Expand Down
77 changes: 77 additions & 0 deletions be/src/exec/pipeline/audit_statistics_reporter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "exec/pipeline/audit_statistics_reporter.h"

#include <thrift/Thrift.h>
#include <thrift/protocol/TDebugProtocol.h>

#include "agent/master_info.h"
#include "gen_cpp/FrontendService_types.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "service/backend_options.h"

namespace starrocks::pipeline {

using apache::thrift::TException;
using apache::thrift::TProcessor;
using apache::thrift::transport::TTransportException;

// including the final status when execution finishes.
Status AuditStatisticsReporter::report_audit_statistics(const TReportAuditStatisticsParams& params, ExecEnv* exec_env,
const TNetworkAddress& fe_addr) {
Status fe_status;
FrontendServiceConnection coord(exec_env->frontend_client_cache(), fe_addr, &fe_status);
if (!fe_status.ok()) {
LOG(WARNING) << "Couldn't get a client for " << fe_addr;
return fe_status;
}

TReportAuditStatisticsResult res;
Status rpc_status;

try {
try {
coord->reportAuditStatistics(res, params);
} catch (TTransportException& e) {
TTransportException::TTransportExceptionType type = e.getType();
if (type != TTransportException::TTransportExceptionType::TIMED_OUT) {
// if not TIMED_OUT, retry
rpc_status = coord.reopen();

if (!rpc_status.ok()) {
return rpc_status;
}
coord->reportAuditStatistics(res, params);
} else {
std::stringstream msg;
msg << "ReportExecStatus() to " << fe_addr << " failed:\n" << e.what();
LOG(WARNING) << msg.str();
rpc_status = Status::InternalError(msg.str());
return rpc_status;
}
}

rpc_status = Status(res.status);
} catch (TException& e) {
std::stringstream msg;
msg << "ReportExecStatus() to " << fe_addr << " failed:\n" << e.what();
LOG(WARNING) << msg.str();
rpc_status = Status::InternalError(msg.str());
return rpc_status;
}
return rpc_status;
}
} // namespace starrocks::pipeline
37 changes: 37 additions & 0 deletions be/src/exec/pipeline/audit_statistics_reporter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <memory>

#include "exec/pipeline/fragment_context.h"
#include "exec/pipeline/pipeline_fwd.h"
#include "gen_cpp/FrontendService.h"
#include "gen_cpp/InternalService_types.h"
#include "gen_cpp/Types_types.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "service/backend_options.h"
#include "util/threadpool.h"

namespace starrocks::pipeline {
class AuditStatisticsReporter {
public:
AuditStatisticsReporter();

static Status report_audit_statistics(const TReportAuditStatisticsParams& params, ExecEnv* exec_env,
const TNetworkAddress& fe_addr);
};
} // namespace starrocks::pipeline
2 changes: 2 additions & 0 deletions be/src/exec/pipeline/exec_state_reporter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ Status ExecStateReporter::report_exec_status(const TReportExecStatusParams& para
msg << "ReportExecStatus() to " << fe_addr << " failed:\n" << e.what();
LOG(WARNING) << msg.str();
rpc_status = Status::InternalError(msg.str());
return rpc_status;
}
}

Expand All @@ -148,6 +149,7 @@ Status ExecStateReporter::report_exec_status(const TReportExecStatusParams& para
msg << "ReportExecStatus() to " << fe_addr << " failed:\n" << e.what();
LOG(WARNING) << msg.str();
rpc_status = Status::InternalError(msg.str());
return rpc_status;
}
return rpc_status;
}
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/pipeline/fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ Status FragmentExecutor::_prepare_runtime_state(ExecEnv* exec_env, const Unified
std::make_unique<RuntimeState>(query_id, fragment_instance_id, query_options, query_globals, exec_env));
auto* runtime_state = _fragment_ctx->runtime_state();
runtime_state->set_enable_pipeline_engine(true);
runtime_state->set_fragment_ctx(_fragment_ctx.get());

auto* parent_mem_tracker = wg != nullptr ? wg->mem_tracker() : exec_env->query_pool_mem_tracker();
auto per_instance_mem_limit = query_options.__isset.mem_limit ? query_options.mem_limit : -1;
Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/pipeline/olap_table_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "exec/pipeline/olap_table_sink_operator.h"

#include "exec/pipeline/pipeline_driver_executor.h"
#include "exec/tablet_sink.h"
#include "exprs/expr.h"
#include "runtime/buffer_control_block.h"
Expand Down Expand Up @@ -76,6 +77,9 @@ Status OlapTableSinkOperator::set_cancelled(RuntimeState* state) {
Status OlapTableSinkOperator::set_finishing(RuntimeState* state) {
_is_finished = true;

auto* executor = state->fragment_ctx()->enable_resource_group() ? state->exec_env()->wg_driver_executor()
: state->exec_env()->driver_executor();
executor->report_audit_statistics(state->query_ctx(), state->fragment_ctx());
if (_is_open_done) {
// sink's open already finish, we can try_close
return _sink->try_close(state);
Expand Down
32 changes: 32 additions & 0 deletions be/src/exec/pipeline/pipeline_driver_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,38 @@ void GlobalDriverExecutor::report_exec_state(QueryContext* query_ctx, FragmentCo
this->_exec_state_reporter->submit(std::move(report_task));
}

void GlobalDriverExecutor::report_audit_statistics(QueryContext* query_ctx, FragmentContext* fragment_ctx) {
auto query_statistics = query_ctx->final_query_statistic();

TReportAuditStatisticsParams params;
params.__set_query_id(fragment_ctx->query_id());
params.__set_fragment_instance_id(fragment_ctx->fragment_instance_id());
query_statistics->to_params(&params);

auto fe_addr = fragment_ctx->fe_addr();
if (fe_addr.hostname.empty()) {
// query executed by external connectors, like spark and flink connector,
// does not need to report exec state to FE, so return if fe addr is empty.
return;
}

auto exec_env = fragment_ctx->runtime_state()->exec_env();
auto fragment_id = fragment_ctx->fragment_instance_id();

auto status = AuditStatisticsReporter::report_audit_statistics(params, exec_env, fe_addr);
if (!status.ok()) {
if (status.is_not_found()) {
LOG(INFO) << "[Driver] Fail to report audit statistics due to query not found: fragment_instance_id="
<< print_id(fragment_id);
} else {
LOG(WARNING) << "[Driver] Fail to report audit statistics fragment_instance_id=" << print_id(fragment_id)
<< ", status: " << status.to_string();
}
} else {
LOG(INFO) << "[Driver] Succeed to report audit statistics: fragment_instance_id=" << print_id(fragment_id);
}
}

void GlobalDriverExecutor::iterate_immutable_blocking_driver(const IterateImmutableDriverFunc& call) const {
_blocked_driver_poller->iterate_immutable_driver(call);
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/pipeline/pipeline_driver_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <memory>
#include <unordered_map>

#include "exec/pipeline/audit_statistics_reporter.h"
#include "exec/pipeline/exec_state_reporter.h"
#include "exec/pipeline/pipeline_driver.h"
#include "exec/pipeline/pipeline_driver_poller.h"
Expand Down Expand Up @@ -39,6 +40,8 @@ class DriverExecutor {
virtual void report_exec_state(QueryContext* query_ctx, FragmentContext* fragment_ctx, const Status& status,
bool done) = 0;

virtual void report_audit_statistics(QueryContext* query_ctx, FragmentContext* fragment_ctx) = 0;

virtual void iterate_immutable_blocking_driver(const IterateImmutableDriverFunc& call) const = 0;

protected:
Expand All @@ -55,6 +58,7 @@ class GlobalDriverExecutor final : public FactoryMethod<DriverExecutor, GlobalDr
void cancel(DriverRawPtr driver) override;
void report_exec_state(QueryContext* query_ctx, FragmentContext* fragment_ctx, const Status& status,
bool done) override;
void report_audit_statistics(QueryContext* query_ctx, FragmentContext* fragment_ctx) override;

void iterate_immutable_blocking_driver(const IterateImmutableDriverFunc& call) const override;

Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/pipeline/sink/export_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "exec/data_sink.h"
#include "exec/file_builder.h"
#include "exec/pipeline/fragment_context.h"
#include "exec/pipeline/pipeline_driver_executor.h"
#include "exec/pipeline/sink/sink_io_buffer.h"
#include "exec/plain_text_builder.h"
#include "formats/csv/converter.h"
Expand Down Expand Up @@ -175,6 +176,9 @@ bool ExportSinkOperator::is_finished() const {
}

Status ExportSinkOperator::set_finishing(RuntimeState* state) {
auto* executor = state->fragment_ctx()->enable_resource_group() ? state->exec_env()->wg_driver_executor()
: state->exec_env()->driver_executor();
executor->report_audit_statistics(state->query_ctx(), state->fragment_ctx());
return _export_sink_buffer->set_finishing();
}

Expand Down
6 changes: 5 additions & 1 deletion be/src/exec/pipeline/sink/memory_scratch_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "exec/pipeline/sink/memory_scratch_sink_operator.h"

#include "exec/pipeline/pipeline_driver_executor.h"
#include "util/arrow/row_batch.h"
#include "util/arrow/starrocks_column_to_arrow.h"

Expand Down Expand Up @@ -33,6 +34,9 @@ bool MemoryScratchSinkOperator::is_finished() const {

Status MemoryScratchSinkOperator::set_finishing(RuntimeState* state) {
_is_finished = true;
auto* executor = state->fragment_ctx()->enable_resource_group() ? state->exec_env()->wg_driver_executor()
: state->exec_env()->driver_executor();
executor->report_audit_statistics(state->query_ctx(), state->fragment_ctx());
return Status::OK();
}

Expand Down Expand Up @@ -126,4 +130,4 @@ void MemoryScratchSinkOperatorFactory::_prepare_id_to_col_name_map() {
}
}

} // namespace starrocks::pipeline
} // namespace starrocks::pipeline
18 changes: 18 additions & 0 deletions be/src/runtime/query_statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,24 @@ void QueryStatistics::to_pb(PQueryStatistics* statistics) {
}
}

void QueryStatistics::to_params(TReportAuditStatisticsParams* params) {
DCHECK(params != nullptr);
params->__set_scan_rows(scan_rows);
params->__set_scan_bytes(scan_bytes);
params->__set_returned_rows(returned_rows);
params->__set_cpu_cost_ns(cpu_ns);
params->__set_mem_cost_bytes(mem_cost_bytes);
{
std::lock_guard l(_lock);
for (const auto& [table_id, stats_item] : _stats_items) {
auto new_stats_item = params->stats_items.emplace_back();
new_stats_item.__set_table_id(table_id);
new_stats_item.__set_scan_rows(stats_item->scan_rows);
new_stats_item.__set_scan_bytes(stats_item->scan_bytes);
}
}
}

void QueryStatistics::clear() {
scan_rows = 0;
scan_bytes = 0;
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/query_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include <mutex>

#include "gen_cpp/FrontendService.h"
#include "gen_cpp/data.pb.h"
#include "util/spinlock.h"

Expand All @@ -45,6 +46,7 @@ class QueryStatistics {
void add_mem_costs(int64_t bytes) { mem_cost_bytes += bytes; }

void to_pb(PQueryStatistics* statistics);
void to_params(TReportAuditStatisticsParams* params);

void merge(int sender_id, QueryStatistics& other);
void merge_pb(const PQueryStatistics& statistics);
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ class RuntimeState {
ObjectPool* global_obj_pool() const;
void set_query_ctx(pipeline::QueryContext* ctx) { _query_ctx = ctx; }
pipeline::QueryContext* query_ctx() { return _query_ctx; }
pipeline::FragmentContext* fragment_ctx() { return _fragment_ctx; }
void set_fragment_ctx(pipeline::FragmentContext* fragment_ctx) { _fragment_ctx = fragment_ctx; }
const DescriptorTbl& desc_tbl() const { return *_desc_tbl; }
void set_desc_tbl(DescriptorTbl* desc_tbl) { _desc_tbl = desc_tbl; }
int chunk_size() const { return _query_options.batch_size; }
Expand Down Expand Up @@ -449,6 +451,7 @@ class RuntimeState {
phmap::flat_hash_map<uint32_t, int64_t> _load_dict_versions;

pipeline::QueryContext* _query_ctx = nullptr;
pipeline::FragmentContext* _fragment_ctx = nullptr;

bool _enable_pipeline_engine = false;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ Returns a DOUBLE value.

## Examples

<<<<<<< HEAD
Sample dataset

```plain
Expand Down
3 changes: 0 additions & 3 deletions docs/sql-reference/sql-statements/account-management/GRANT.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ Example 8: Grant user `jack` the privilege to impersonate user `rose` to perform
```SQL
GRANT IMPERSONATE ON 'rose'@'%' TO 'jack'@'%';
```
<<<<<<< HEAD
=======

## Best practices - Customize roles based on scenarios

Expand Down Expand Up @@ -272,4 +270,3 @@ You can only write data into Iceberg tables (since v3.1).
```

For the best practices of multi-service access control, see [Multi-service access control](../../../administration/User_privilege.md#multi-service-access-control).
>>>>>>> 4cf65a8b45 ([Doc] Add link to bitmap value and update sys variables (#28604))
Loading

0 comments on commit 5419c90

Please sign in to comment.