Skip to content

Commit

Permalink
[Enhancement] Support audic statistics for insert statement (#29901)
Browse files Browse the repository at this point in the history
* [Enhancement] Support audic statistics for insert statement

Signed-off-by: liuyehcf <1559500551@qq.com>

* update test

Signed-off-by: liuyehcf <1559500551@qq.com>

* update 2

Signed-off-by: liuyehcf <1559500551@qq.com>

---------

Signed-off-by: liuyehcf <1559500551@qq.com>
(cherry picked from commit 728573e)

# Conflicts:
#	be/src/exec/pipeline/olap_table_sink_operator.cpp
#	be/src/exec/pipeline/pipeline_driver_executor.cpp
#	be/src/exec/pipeline/pipeline_driver_executor.h
#	be/src/exec/pipeline/sink/iceberg_table_sink_operator.cpp
#	fe/fe-core/src/main/java/com/starrocks/qe/DefaultCoordinator.java
#	fe/fe-core/src/main/java/com/starrocks/qe/scheduler/Coordinator.java
#	fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java
  • Loading branch information
liuyehcf authored and mergify[bot] committed Aug 30, 2023
1 parent a352db6 commit 81071cc
Show file tree
Hide file tree
Showing 22 changed files with 2,024 additions and 1 deletion.
1 change: 1 addition & 0 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ set(EXEC_FILES
pipeline/pipeline_driver_queue.cpp
pipeline/pipeline_driver_poller.cpp
pipeline/pipeline_driver.cpp
pipeline/audit_statistics_reporter.cpp
pipeline/exec_state_reporter.cpp
pipeline/driver_limiter.cpp
pipeline/fragment_context.cpp
Expand Down
77 changes: 77 additions & 0 deletions be/src/exec/pipeline/audit_statistics_reporter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "exec/pipeline/audit_statistics_reporter.h"

#include <thrift/Thrift.h>
#include <thrift/protocol/TDebugProtocol.h>

#include "agent/master_info.h"
#include "gen_cpp/FrontendService_types.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "service/backend_options.h"

namespace starrocks::pipeline {

using apache::thrift::TException;
using apache::thrift::TProcessor;
using apache::thrift::transport::TTransportException;

// including the final status when execution finishes.
Status AuditStatisticsReporter::report_audit_statistics(const TReportAuditStatisticsParams& params, ExecEnv* exec_env,
const TNetworkAddress& fe_addr) {
Status fe_status;
FrontendServiceConnection coord(exec_env->frontend_client_cache(), fe_addr, &fe_status);
if (!fe_status.ok()) {
LOG(WARNING) << "Couldn't get a client for " << fe_addr;
return fe_status;
}

TReportAuditStatisticsResult res;
Status rpc_status;

try {
try {
coord->reportAuditStatistics(res, params);
} catch (TTransportException& e) {
TTransportException::TTransportExceptionType type = e.getType();
if (type != TTransportException::TTransportExceptionType::TIMED_OUT) {
// if not TIMED_OUT, retry
rpc_status = coord.reopen();

if (!rpc_status.ok()) {
return rpc_status;
}
coord->reportAuditStatistics(res, params);
} else {
std::stringstream msg;
msg << "ReportExecStatus() to " << fe_addr << " failed:\n" << e.what();
LOG(WARNING) << msg.str();
rpc_status = Status::InternalError(msg.str());
return rpc_status;
}
}

rpc_status = Status(res.status);
} catch (TException& e) {
std::stringstream msg;
msg << "ReportExecStatus() to " << fe_addr << " failed:\n" << e.what();
LOG(WARNING) << msg.str();
rpc_status = Status::InternalError(msg.str());
return rpc_status;
}
return rpc_status;
}
} // namespace starrocks::pipeline
37 changes: 37 additions & 0 deletions be/src/exec/pipeline/audit_statistics_reporter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <memory>

#include "exec/pipeline/fragment_context.h"
#include "exec/pipeline/pipeline_fwd.h"
#include "gen_cpp/FrontendService.h"
#include "gen_cpp/InternalService_types.h"
#include "gen_cpp/Types_types.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "service/backend_options.h"
#include "util/threadpool.h"

namespace starrocks::pipeline {
class AuditStatisticsReporter {
public:
AuditStatisticsReporter();

static Status report_audit_statistics(const TReportAuditStatisticsParams& params, ExecEnv* exec_env,
const TNetworkAddress& fe_addr);
};
} // namespace starrocks::pipeline
2 changes: 2 additions & 0 deletions be/src/exec/pipeline/exec_state_reporter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ Status ExecStateReporter::report_exec_status(const TReportExecStatusParams& para
msg << "ReportExecStatus() to " << fe_addr << " failed:\n" << e.what();
LOG(WARNING) << msg.str();
rpc_status = Status::InternalError(msg.str());
return rpc_status;
}
}

Expand All @@ -148,6 +149,7 @@ Status ExecStateReporter::report_exec_status(const TReportExecStatusParams& para
msg << "ReportExecStatus() to " << fe_addr << " failed:\n" << e.what();
LOG(WARNING) << msg.str();
rpc_status = Status::InternalError(msg.str());
return rpc_status;
}
return rpc_status;
}
Expand Down
6 changes: 6 additions & 0 deletions be/src/exec/pipeline/olap_table_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "exec/pipeline/olap_table_sink_operator.h"

#include "exec/pipeline/pipeline_driver_executor.h"
#include "exec/tablet_sink.h"
#include "exprs/expr.h"
#include "runtime/buffer_control_block.h"
Expand Down Expand Up @@ -76,7 +77,12 @@ Status OlapTableSinkOperator::set_cancelled(RuntimeState* state) {
Status OlapTableSinkOperator::set_finishing(RuntimeState* state) {
_is_finished = true;

<<<<<<< HEAD
if (_is_open_done) {
=======
state->exec_env()->wg_driver_executor()->report_audit_statistics(state->query_ctx(), state->fragment_ctx());
if (_is_open_done && !_automatic_partition_chunk) {
>>>>>>> 728573e241 ([Enhancement] Support audic statistics for insert statement (#29901))
// sink's open already finish, we can try_close
return _sink->try_close(state);
} else {
Expand Down
75 changes: 75 additions & 0 deletions be/src/exec/pipeline/pipeline_driver_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,81 @@ void GlobalDriverExecutor::report_exec_state(QueryContext* query_ctx, FragmentCo
this->_exec_state_reporter->submit(std::move(report_task));
}

<<<<<<< HEAD
=======
void GlobalDriverExecutor::report_audit_statistics(QueryContext* query_ctx, FragmentContext* fragment_ctx) {
auto query_statistics = query_ctx->final_query_statistic();

TReportAuditStatisticsParams params;
params.__set_query_id(fragment_ctx->query_id());
params.__set_fragment_instance_id(fragment_ctx->fragment_instance_id());
query_statistics->to_params(&params);

auto fe_addr = fragment_ctx->fe_addr();
if (fe_addr.hostname.empty()) {
// query executed by external connectors, like spark and flink connector,
// does not need to report exec state to FE, so return if fe addr is empty.
return;
}

auto exec_env = fragment_ctx->runtime_state()->exec_env();
auto fragment_id = fragment_ctx->fragment_instance_id();

auto status = AuditStatisticsReporter::report_audit_statistics(params, exec_env, fe_addr);
if (!status.ok()) {
if (status.is_not_found()) {
LOG(INFO) << "[Driver] Fail to report audit statistics due to query not found: fragment_instance_id="
<< print_id(fragment_id);
} else {
LOG(WARNING) << "[Driver] Fail to report audit statistics fragment_instance_id=" << print_id(fragment_id)
<< ", status: " << status.to_string();
}
} else {
LOG(INFO) << "[Driver] Succeed to report audit statistics: fragment_instance_id=" << print_id(fragment_id);
}
}

size_t GlobalDriverExecutor::activate_parked_driver(const ImmutableDriverPredicateFunc& predicate_func) {
return _blocked_driver_poller->activate_parked_driver(predicate_func);
}

size_t GlobalDriverExecutor::calculate_parked_driver(const ImmutableDriverPredicateFunc& predicate_func) const {
return _blocked_driver_poller->calculate_parked_driver(predicate_func);
}

void GlobalDriverExecutor::_finalize_epoch(DriverRawPtr driver, RuntimeState* runtime_state, DriverState state) {
DCHECK(driver);
DCHECK(down_cast<StreamPipelineDriver*>(driver));
auto* stream_driver = down_cast<StreamPipelineDriver*>(driver);
stream_driver->epoch_finalize(runtime_state, state);
}

void GlobalDriverExecutor::report_epoch(ExecEnv* exec_env, QueryContext* query_ctx,
std::vector<FragmentContext*> fragment_ctxs) {
DCHECK_LT(0, fragment_ctxs.size());
auto params = ExecStateReporter::create_report_epoch_params(query_ctx, fragment_ctxs);
// TODO(lism): Check all fragment_ctx's fe_addr are the same.
auto fe_addr = fragment_ctxs[0]->fe_addr();
auto query_id = query_ctx->query_id();
auto report_task = [=]() {
auto status = ExecStateReporter::report_epoch(params, exec_env, fe_addr);
if (!status.ok()) {
if (status.is_not_found()) {
LOG(INFO) << "[Driver] Fail to report epoch exec state due to query not found: query_id="
<< print_id(query_id);
} else {
LOG(WARNING) << "[Driver] Fail to report epoch exec state: query_id=" << print_id(query_id)
<< ", status: " << status.to_string();
}
} else {
LOG(INFO) << "[Driver] Succeed to report epoch exec state: query_id=" << print_id(query_id);
}
};

this->_exec_state_reporter->submit(std::move(report_task));
}

>>>>>>> 728573e241 ([Enhancement] Support audic statistics for insert statement (#29901))
void GlobalDriverExecutor::iterate_immutable_blocking_driver(const IterateImmutableDriverFunc& call) const {
_blocked_driver_poller->iterate_immutable_driver(call);
}
Expand Down
10 changes: 10 additions & 0 deletions be/src/exec/pipeline/pipeline_driver_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <memory>
#include <unordered_map>

#include "exec/pipeline/audit_statistics_reporter.h"
#include "exec/pipeline/exec_state_reporter.h"
#include "exec/pipeline/pipeline_driver.h"
#include "exec/pipeline/pipeline_driver_poller.h"
Expand Down Expand Up @@ -39,6 +40,8 @@ class DriverExecutor {
virtual void report_exec_state(QueryContext* query_ctx, FragmentContext* fragment_ctx, const Status& status,
bool done) = 0;

virtual void report_audit_statistics(QueryContext* query_ctx, FragmentContext* fragment_ctx) = 0;

virtual void iterate_immutable_blocking_driver(const IterateImmutableDriverFunc& call) const = 0;

protected:
Expand All @@ -53,8 +56,15 @@ class GlobalDriverExecutor final : public FactoryMethod<DriverExecutor, GlobalDr
void change_num_threads(int32_t num_threads) override;
void submit(DriverRawPtr driver) override;
void cancel(DriverRawPtr driver) override;
<<<<<<< HEAD
void report_exec_state(QueryContext* query_ctx, FragmentContext* fragment_ctx, const Status& status,
bool done) override;
=======
void close() override;
void report_exec_state(QueryContext* query_ctx, FragmentContext* fragment_ctx, const Status& status, bool done,
bool attach_profile) override;
void report_audit_statistics(QueryContext* query_ctx, FragmentContext* fragment_ctx) override;
>>>>>>> 728573e241 ([Enhancement] Support audic statistics for insert statement (#29901))

void iterate_immutable_blocking_driver(const IterateImmutableDriverFunc& call) const override;

Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/pipeline/sink/export_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "exec/data_sink.h"
#include "exec/file_builder.h"
#include "exec/pipeline/fragment_context.h"
#include "exec/pipeline/pipeline_driver_executor.h"
#include "exec/pipeline/sink/sink_io_buffer.h"
#include "exec/plain_text_builder.h"
#include "formats/csv/converter.h"
Expand Down Expand Up @@ -175,6 +176,7 @@ bool ExportSinkOperator::is_finished() const {
}

Status ExportSinkOperator::set_finishing(RuntimeState* state) {
state->exec_env()->wg_driver_executor()->report_audit_statistics(state->query_ctx(), state->fragment_ctx());
return _export_sink_buffer->set_finishing();
}

Expand Down
Loading

0 comments on commit 81071cc

Please sign in to comment.