Skip to content

Commit

Permalink
[BugFix] Support audit for insert into statement executing from follo…
Browse files Browse the repository at this point in the history
…wer (StarRocks#30663)

Signed-off-by: liuyehcf <1559500551@qq.com>
  • Loading branch information
liuyehcf authored Sep 14, 2023
1 parent 7c296ec commit 4a6b0c9
Show file tree
Hide file tree
Showing 20 changed files with 280 additions and 65 deletions.
9 changes: 6 additions & 3 deletions be/src/exec/pipeline/olap_table_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ Status OlapTableSinkOperator::set_cancelled(RuntimeState* state) {
Status OlapTableSinkOperator::set_finishing(RuntimeState* state) {
_is_finished = true;

state->exec_env()->wg_driver_executor()->report_audit_statistics(state->query_ctx(), state->fragment_ctx());
if (_num_sinkers.fetch_sub(1, std::memory_order_acq_rel) == 1) {
state->exec_env()->wg_driver_executor()->report_audit_statistics(state->query_ctx(), state->fragment_ctx());
}
if (_is_open_done && !_automatic_partition_chunk) {
// sink's open already finish, we can try_close
return _sink->try_close(state);
Expand Down Expand Up @@ -195,12 +197,13 @@ Status OlapTableSinkOperator::push_chunk(RuntimeState* state, const ChunkPtr& ch
}

OperatorPtr OlapTableSinkOperatorFactory::create(int32_t degree_of_parallelism, int32_t driver_sequence) {
_increment_num_sinkers_no_barrier();
if (driver_sequence == 0) {
return std::make_shared<OlapTableSinkOperator>(this, _id, _plan_node_id, driver_sequence, _cur_sender_id++,
_sink0, _fragment_ctx);
_sink0, _fragment_ctx, _num_sinkers);
} else {
return std::make_shared<OlapTableSinkOperator>(this, _id, _plan_node_id, driver_sequence, _cur_sender_id++,
_sinks[driver_sequence - 1].get(), _fragment_ctx);
_sinks[driver_sequence - 1].get(), _fragment_ctx, _num_sinkers);
}
}

Expand Down
7 changes: 6 additions & 1 deletion be/src/exec/pipeline/olap_table_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ class OlapTableSinkOperator final : public Operator {
public:
OlapTableSinkOperator(OperatorFactory* factory, int32_t id, int32_t plan_node_id, int32_t driver_sequence,
int32_t sender_id, starrocks::stream_load::OlapTableSink* sink,
FragmentContext* const fragment_ctx)
FragmentContext* const fragment_ctx, std::atomic<int32_t>& num_sinkers)
: Operator(factory, id, "olap_table_sink", plan_node_id, false, driver_sequence),
_sink(sink),
_fragment_ctx(fragment_ctx),
_num_sinkers(num_sinkers),
_sender_id(sender_id) {}

~OlapTableSinkOperator() override = default;
Expand Down Expand Up @@ -67,6 +68,7 @@ class OlapTableSinkOperator final : public Operator {
private:
starrocks::stream_load::OlapTableSink* _sink;
FragmentContext* const _fragment_ctx;
std::atomic<int32_t>& _num_sinkers;

bool _is_finished = false;
mutable bool _is_open_done = false;
Expand Down Expand Up @@ -101,9 +103,12 @@ class OlapTableSinkOperatorFactory final : public OperatorFactory {
void close(RuntimeState* state) override;

private:
void _increment_num_sinkers_no_barrier() { _num_sinkers.fetch_add(1, std::memory_order_relaxed); }

std::unique_ptr<starrocks::DataSink> _data_sink;
starrocks::stream_load::OlapTableSink* _sink0;
FragmentContext* const _fragment_ctx;
std::atomic<int32_t> _num_sinkers = 0;
int32_t _cur_sender_id;
std::vector<std::unique_ptr<starrocks::stream_load::OlapTableSink>> _sinks;
};
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/pipeline/pipeline_driver_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,8 @@ void GlobalDriverExecutor::report_audit_statistics(QueryContext* query_ctx, Frag
TReportAuditStatisticsParams params;
params.__set_query_id(fragment_ctx->query_id());
params.__set_fragment_instance_id(fragment_ctx->fragment_instance_id());
query_statistics->to_params(&params);
params.__set_audit_statistics({});
query_statistics->to_params(&params.audit_statistics);

auto fe_addr = fragment_ctx->fe_addr();
if (fe_addr.hostname.empty()) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/pipeline/result_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ void ResultSinkOperator::close(RuntimeState* state) {
}

// Close the shared sender when the last result sink operator is closing.
if (_num_result_sinkers.fetch_sub(1, std::memory_order_acq_rel) == 1) {
if (_num_sinkers.fetch_sub(1, std::memory_order_acq_rel) == 1) {
if (_sender != nullptr) {
// Incrementing and reading _num_written_rows needn't memory barrier, because
// the visibility of _num_written_rows is guaranteed by _num_result_sinkers.fetch_sub().
// the visibility of _num_written_rows is guaranteed by _num_sinkers.fetch_sub().
_sender->update_num_written_rows(_num_written_rows.load(std::memory_order_relaxed));

QueryContext* query_ctx = state->query_ctx();
Expand Down
18 changes: 9 additions & 9 deletions be/src/exec/pipeline/result_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ class ResultSinkOperator final : public Operator {
ResultSinkOperator(OperatorFactory* factory, int32_t id, int32_t plan_node_id, int32_t driver_sequence,
TResultSinkType::type sink_type, bool is_binary_format, TResultSinkFormatType::type format_type,
std::vector<ExprContext*> output_expr_ctxs, const std::shared_ptr<BufferControlBlock>& sender,
std::atomic<int32_t>& num_result_sinks, std::atomic<int64_t>& num_written_rows,
std::atomic<int32_t>& num_sinks, std::atomic<int64_t>& num_written_rows,
FragmentContext* const fragment_ctx)
: Operator(factory, id, "result_sink", plan_node_id, false, driver_sequence),
_sink_type(sink_type),
_is_binary_format(is_binary_format),
_format_type(format_type),
_output_expr_ctxs(std::move(output_expr_ctxs)),
_sender(sender),
_num_result_sinkers(num_result_sinks),
_num_sinkers(num_sinks),
_num_written_rows(num_written_rows),
_fragment_ctx(fragment_ctx) {}

Expand Down Expand Up @@ -78,7 +78,7 @@ class ResultSinkOperator final : public Operator {
/// The following three fields are shared by all the ResultSinkOperators
/// created by the same ResultSinkOperatorFactory.
const std::shared_ptr<BufferControlBlock>& _sender;
std::atomic<int32_t>& _num_result_sinkers;
std::atomic<int32_t>& _num_sinkers;
std::atomic<int64_t>& _num_written_rows;

std::shared_ptr<ResultWriter> _writer;
Expand Down Expand Up @@ -107,22 +107,22 @@ class ResultSinkOperatorFactory final : public OperatorFactory {
~ResultSinkOperatorFactory() override = default;

OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override {
// _num_result_sinkers is incremented when creating a ResultSinkOperator instance here at the preparing
// _num_sinkers is incremented when creating a ResultSinkOperator instance here at the preparing
// phase of FragmentExecutor, and decremented and read when closing ResultSinkOperator. The visibility
// of increasing _num_result_sinkers to ResultSinkOperator::close is guaranteed by pipeline driver queue,
// of increasing _num_sinkers to ResultSinkOperator::close is guaranteed by pipeline driver queue,
// so it doesn't need memory barrier here.
_increment_num_result_sinkers_no_barrier();
_increment_num_sinkers_no_barrier();
return std::make_shared<ResultSinkOperator>(this, _id, _plan_node_id, driver_sequence, _sink_type,
_is_binary_format, _format_type, _output_expr_ctxs, _sender,
_num_result_sinkers, _num_written_rows, _fragment_ctx);
_num_sinkers, _num_written_rows, _fragment_ctx);
}

Status prepare(RuntimeState* state) override;

void close(RuntimeState* state) override;

private:
void _increment_num_result_sinkers_no_barrier() { _num_result_sinkers.fetch_add(1, std::memory_order_relaxed); }
void _increment_num_sinkers_no_barrier() { _num_sinkers.fetch_add(1, std::memory_order_relaxed); }

TResultSinkType::type _sink_type;
bool _is_binary_format;
Expand All @@ -134,7 +134,7 @@ class ResultSinkOperatorFactory final : public OperatorFactory {
// A fragment_instance_id can only have ONE sender, because result_mgr saves the mapping from fragment_instance_id
// to sender. Therefore, sender is created in this factory and shared by all the ResultSinkOperator instances.
std::shared_ptr<BufferControlBlock> _sender;
std::atomic<int32_t> _num_result_sinkers = 0;
std::atomic<int32_t> _num_sinkers = 0;
std::atomic<int64_t> _num_written_rows = 0;

FragmentContext* const _fragment_ctx;
Expand Down
6 changes: 4 additions & 2 deletions be/src/exec/pipeline/sink/export_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,9 @@ bool ExportSinkOperator::is_finished() const {
}

Status ExportSinkOperator::set_finishing(RuntimeState* state) {
state->exec_env()->wg_driver_executor()->report_audit_statistics(state->query_ctx(), state->fragment_ctx());
if (_num_sinkers.fetch_sub(1, std::memory_order_acq_rel) == 1) {
state->exec_env()->wg_driver_executor()->report_audit_statistics(state->query_ctx(), state->fragment_ctx());
}
return _export_sink_buffer->set_finishing();
}

Expand Down Expand Up @@ -220,7 +222,7 @@ Status ExportSinkOperatorFactory::prepare(RuntimeState* state) {
RETURN_IF_ERROR(Expr::open(_output_expr_ctxs, state));

_export_sink_buffer =
std::make_shared<ExportSinkIOBuffer>(_t_export_sink, _output_expr_ctxs, _num_sinkers, _fragment_ctx);
std::make_shared<ExportSinkIOBuffer>(_t_export_sink, _output_expr_ctxs, _total_num_sinkers, _fragment_ctx);
return Status::OK();
}

Expand Down
17 changes: 12 additions & 5 deletions be/src/exec/pipeline/sink/export_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ class FragmentContext;
class ExportSinkOperator final : public Operator {
public:
ExportSinkOperator(OperatorFactory* factory, int32_t id, int32_t plan_node_id, int32_t driver_sequence,
std::shared_ptr<ExportSinkIOBuffer> export_sink_buffer)
std::shared_ptr<ExportSinkIOBuffer> export_sink_buffer, std::atomic<int32_t>& num_sinkers)
: Operator(factory, id, "export_sink", plan_node_id, false, driver_sequence),
_export_sink_buffer(std::move(std::move(export_sink_buffer))) {}
_export_sink_buffer(std::move(std::move(export_sink_buffer))),
_num_sinkers(num_sinkers) {}

~ExportSinkOperator() override = default;

Expand All @@ -62,6 +63,7 @@ class ExportSinkOperator final : public Operator {

private:
std::shared_ptr<ExportSinkIOBuffer> _export_sink_buffer;
std::atomic<int32_t>& _num_sinkers;
};

class ExportSinkOperatorFactory final : public OperatorFactory {
Expand All @@ -71,26 +73,31 @@ class ExportSinkOperatorFactory final : public OperatorFactory {
: OperatorFactory(id, "export_sink", Operator::s_pseudo_plan_node_id_for_final_sink),
_t_export_sink(t_export_sink),
_t_output_expr(std::move(std::move(t_output_expr))),
_num_sinkers(num_sinkers),
_total_num_sinkers(num_sinkers),
_fragment_ctx(fragment_ctx) {}

~ExportSinkOperatorFactory() override = default;

OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override {
return std::make_shared<ExportSinkOperator>(this, _id, _plan_node_id, driver_sequence, _export_sink_buffer);
_increment_num_sinkers_no_barrier();
return std::make_shared<ExportSinkOperator>(this, _id, _plan_node_id, driver_sequence, _export_sink_buffer,
_num_sinkers);
}

Status prepare(RuntimeState* state) override;

void close(RuntimeState* state) override;

private:
void _increment_num_sinkers_no_barrier() { _num_sinkers.fetch_add(1, std::memory_order_relaxed); }

TExportSink _t_export_sink;

const std::vector<TExpr> _t_output_expr;
std::vector<ExprContext*> _output_expr_ctxs;

int32_t _num_sinkers;
const int32_t _total_num_sinkers;
std::atomic<int32_t> _num_sinkers = 0;
std::shared_ptr<ExportSinkIOBuffer> _export_sink_buffer;

FragmentContext* _fragment_ctx = nullptr;
Expand Down
4 changes: 3 additions & 1 deletion be/src/exec/pipeline/sink/iceberg_table_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ bool IcebergTableSinkOperator::is_finished() const {
}

Status IcebergTableSinkOperator::set_finishing(RuntimeState* state) {
state->exec_env()->wg_driver_executor()->report_audit_statistics(state->query_ctx(), state->fragment_ctx());
if (_num_sinkers.fetch_sub(1, std::memory_order_acq_rel) == 1) {
state->exec_env()->wg_driver_executor()->report_audit_statistics(state->query_ctx(), state->fragment_ctx());
}

for (const auto& writer : _partition_writers) {
if (!writer.second->closed()) {
Expand Down
18 changes: 12 additions & 6 deletions be/src/exec/pipeline/sink/iceberg_table_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ class IcebergTableSinkOperator final : public Operator {
const TCloudConfiguration& cloud_conf, IcebergTableDescriptor* iceberg_table,
FragmentContext* fragment_ctx, const std::shared_ptr<::parquet::schema::GroupNode>& schema,
const std::vector<ExprContext*>& output_expr_ctxs,
const vector<ExprContext*>& partition_output_expr, bool is_static_partition_insert)
const vector<ExprContext*>& partition_output_expr, bool is_static_partition_insert,
std::atomic<int32_t>& num_sinkers)
: Operator(factory, id, "iceberg_table_sink", plan_node_id, false, driver_sequence),
_location(std::move(location)),
_iceberg_table_data_location(_location + "/data/"),
Expand All @@ -46,7 +47,8 @@ class IcebergTableSinkOperator final : public Operator {
_parquet_file_schema(std::move(schema)),
_output_expr(output_expr_ctxs),
_partition_expr(partition_output_expr),
_is_static_partition_insert(is_static_partition_insert) {}
_is_static_partition_insert(is_static_partition_insert),
_num_sinkers(num_sinkers) {}

~IcebergTableSinkOperator() override = default;

Expand Down Expand Up @@ -90,6 +92,7 @@ class IcebergTableSinkOperator final : public Operator {
std::unordered_map<std::string, std::unique_ptr<starrocks::RollingAsyncParquetWriter>> _partition_writers;
std::atomic<bool> _is_finished = false;
bool _is_static_partition_insert = false;
std::atomic<int32_t>& _num_sinkers;
};

class IcebergTableSinkOperatorFactory final : public OperatorFactory {
Expand All @@ -102,17 +105,19 @@ class IcebergTableSinkOperatorFactory final : public OperatorFactory {
~IcebergTableSinkOperatorFactory() override = default;

OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override {
return std::make_shared<IcebergTableSinkOperator>(this, _id, _plan_node_id, driver_sequence, _location,
_file_format, _compression_codec, _cloud_conf, _iceberg_table,
_fragment_ctx, _parquet_file_schema, _output_expr_ctxs,
_partition_expr_ctxs, is_static_partition_insert);
_increment_num_sinkers_no_barrier();
return std::make_shared<IcebergTableSinkOperator>(
this, _id, _plan_node_id, driver_sequence, _location, _file_format, _compression_codec, _cloud_conf,
_iceberg_table, _fragment_ctx, _parquet_file_schema, _output_expr_ctxs, _partition_expr_ctxs,
is_static_partition_insert, _num_sinkers);
}

Status prepare(RuntimeState* state) override;

void close(RuntimeState* state) override;

private:
void _increment_num_sinkers_no_barrier() { _num_sinkers.fetch_add(1, std::memory_order_relaxed); }
std::vector<parquet::FileColumnId> generate_parquet_field_ids(const std::vector<TIcebergSchemaField>& fields);

private:
Expand All @@ -129,6 +134,7 @@ class IcebergTableSinkOperatorFactory final : public OperatorFactory {
std::shared_ptr<::parquet::schema::GroupNode> _parquet_file_schema;
std::vector<ExprContext*> _partition_expr_ctxs;
bool is_static_partition_insert = false;
std::atomic<int32_t> _num_sinkers = 0;
};

} // namespace pipeline
Expand Down
4 changes: 3 additions & 1 deletion be/src/exec/pipeline/sink/memory_scratch_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ bool MemoryScratchSinkOperator::is_finished() const {

Status MemoryScratchSinkOperator::set_finishing(RuntimeState* state) {
_is_finished = true;
state->exec_env()->wg_driver_executor()->report_audit_statistics(state->query_ctx(), state->fragment_ctx());
if (_num_sinkers.fetch_sub(1, std::memory_order_acq_rel) == 1) {
state->exec_env()->wg_driver_executor()->report_audit_statistics(state->query_ctx(), state->fragment_ctx());
}
return Status::OK();
}

Expand Down
11 changes: 8 additions & 3 deletions be/src/exec/pipeline/sink/memory_scratch_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ class MemoryScratchSinkOperator final : public Operator {
public:
MemoryScratchSinkOperator(OperatorFactory* factory, int32_t id, int32_t plan_node_id, int32_t driver_sequence,
std::vector<ExprContext*> output_expr_ctxs, std::shared_ptr<arrow::Schema> arrow_schema,
BlockQueueSharedPtr queue)
BlockQueueSharedPtr queue, std::atomic<int32_t>& num_sinkers)
: Operator(factory, id, "memory_scratch_sink", plan_node_id, false, driver_sequence),
_output_expr_ctxs(std::move(output_expr_ctxs)),
_arrow_schema(std::move(arrow_schema)),
_queue(std::move(queue)) {}
_queue(std::move(queue)),
_num_sinkers(num_sinkers) {}

~MemoryScratchSinkOperator() override = default;

Expand Down Expand Up @@ -71,6 +72,7 @@ class MemoryScratchSinkOperator final : public Operator {
std::vector<ExprContext*> _output_expr_ctxs;
std::shared_ptr<arrow::Schema> _arrow_schema;
BlockQueueSharedPtr _queue;
std::atomic<int32_t>& _num_sinkers;
mutable std::shared_ptr<arrow::RecordBatch> _pending_result;
bool _is_finished = false;
bool _has_put_sentinel = false;
Expand All @@ -84,15 +86,17 @@ class MemoryScratchSinkOperatorFactory final : public OperatorFactory {
~MemoryScratchSinkOperatorFactory() override = default;

OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override {
_increment_num_sinkers_no_barrier();
return std::make_shared<MemoryScratchSinkOperator>(this, _id, _plan_node_id, driver_sequence, _output_expr_ctxs,
_arrow_schema, _queue);
_arrow_schema, _queue, _num_sinkers);
}

Status prepare(RuntimeState* state) override;

void close(RuntimeState* state) override;

private:
void _increment_num_sinkers_no_barrier() { _num_sinkers.fetch_add(1, std::memory_order_relaxed); }
void _prepare_id_to_col_name_map();

const RowDescriptor _row_desc;
Expand All @@ -101,6 +105,7 @@ class MemoryScratchSinkOperatorFactory final : public OperatorFactory {
std::vector<ExprContext*> _output_expr_ctxs;

BlockQueueSharedPtr _queue;
std::atomic<int32_t> _num_sinkers = 0;
std::unordered_map<int64_t, std::string> _id_to_col_name;
};

Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/query_statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ void QueryStatistics::to_pb(PQueryStatistics* statistics) {
}
}

void QueryStatistics::to_params(TReportAuditStatisticsParams* params) {
void QueryStatistics::to_params(TAuditStatistics* params) {
DCHECK(params != nullptr);
params->__set_scan_rows(scan_rows);
params->__set_scan_bytes(scan_bytes);
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/query_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class QueryStatistics {
void add_spill_bytes(int64_t bytes) { spill_bytes += bytes; }

void to_pb(PQueryStatistics* statistics);
void to_params(TReportAuditStatisticsParams* params);
void to_params(TAuditStatistics* params);

void merge(int sender_id, QueryStatistics& other);
void merge_pb(const PQueryStatistics& statistics);
Expand Down
Loading

0 comments on commit 4a6b0c9

Please sign in to comment.