Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature](profile)add shuffle send rows/bytes #30456

Merged
merged 1 commit into from
Jan 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions be/src/exec/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
#include <string>

#include "common/config.h"
#include "runtime/query_context.h"
#include "runtime/query_statistics.h"
#include "vec/sink/async_writer_sink.h"
#include "vec/sink/group_commit_block_sink.h"
#include "vec/sink/multi_cast_data_stream_sink.h"
Expand All @@ -44,6 +46,14 @@ namespace doris {
class DescriptorTbl;
class TExpr;

DataSink::DataSink(const RowDescriptor& desc) : _row_desc(desc) {
_query_statistics = std::make_shared<QueryStatistics>();
}

std::shared_ptr<QueryStatistics> DataSink::get_query_statistics_ptr() {
return _query_statistics;
}

Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
const std::vector<TExpr>& output_exprs,
const TPlanFragmentExecParams& params,
Expand Down Expand Up @@ -175,6 +185,9 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink

if (*sink != nullptr) {
RETURN_IF_ERROR((*sink)->init(thrift_sink));
if (state->get_query_ctx()) {
state->get_query_ctx()->register_query_statistics((*sink)->get_query_statistics_ptr());
}
}

return Status::OK();
Expand Down Expand Up @@ -318,6 +331,9 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
if (*sink != nullptr) {
RETURN_IF_ERROR((*sink)->init(thrift_sink));
RETURN_IF_ERROR((*sink)->prepare(state));
if (state->get_query_ctx()) {
state->get_query_ctx()->register_query_statistics((*sink)->get_query_statistics_ptr());
}
}

return Status::OK();
Expand Down
7 changes: 6 additions & 1 deletion be/src/exec/data_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class TDataSink;
class TExpr;
class TPipelineFragmentParams;
class TOlapTableSink;
class QueryStatistics;

namespace vectorized {
class Block;
Expand All @@ -48,7 +49,7 @@ class Block;
// Superclass of all data sinks.
class DataSink {
public:
DataSink(const RowDescriptor& desc) : _row_desc(desc) {}
DataSink(const RowDescriptor& desc);
virtual ~DataSink() {}

virtual Status init(const TDataSink& thrift_sink);
Expand Down Expand Up @@ -103,6 +104,8 @@ class DataSink {

virtual bool can_write() { return true; }

std::shared_ptr<QueryStatistics> get_query_statistics_ptr();

private:
static bool _has_inverted_index_or_partial_update(TOlapTableSink sink);

Expand All @@ -124,6 +127,8 @@ class DataSink {
_output_rows_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced", TUnit::UNIT, 1);
_blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced", TUnit::UNIT, 1);
}

std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
};

} // namespace doris
2 changes: 1 addition & 1 deletion be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ Status ExecNode::create_tree_helper(RuntimeState* state, ObjectPool* pool,
// Step 1 Create current ExecNode according to current thrift plan node.
ExecNode* cur_exec_node = nullptr;
RETURN_IF_ERROR(create_node(state, pool, cur_plan_node, descs, &cur_exec_node));
if (cur_exec_node != nullptr) {
if (cur_exec_node != nullptr && state->get_query_ctx()) {
state->get_query_ctx()->register_query_statistics(cur_exec_node->get_query_statistics());
}

Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/exchange_sink_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,6 @@ class ExchangeSinkBuffer : public HasTaskExecutionCtx {
static constexpr int QUEUE_CAPACITY_FACTOR = 64;
std::shared_ptr<ExchangeSinkQueueDependency> _queue_dependency;
std::shared_ptr<Dependency> _finish_dependency;
QueryStatistics* _statistics = nullptr;
std::atomic<bool> _should_stop {false};
};

Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/pipeline_x/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,9 @@ Status OperatorX<LocalStateType>::setup_local_state(RuntimeState* state, LocalSt

PipelineXSinkLocalStateBase::PipelineXSinkLocalStateBase(DataSinkOperatorXBase* parent,
RuntimeState* state)
: _parent(parent), _state(state) {}
: _parent(parent), _state(state) {
_query_statistics = std::make_shared<QueryStatistics>();
}

PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, OperatorXBase* parent)
: _num_rows_returned(0),
Expand Down
6 changes: 5 additions & 1 deletion be/src/pipeline/pipeline_x/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class PipelineXLocalStateBase {
// override in Scan MultiCastSink
virtual RuntimeFilterDependency* filterdependency() { return nullptr; }

std::shared_ptr<QueryStatistics> query_statistics_ptr() { return _query_statistics; }
std::shared_ptr<QueryStatistics> get_query_statistics_ptr() { return _query_statistics; }

protected:
friend class OperatorXBase;
Expand Down Expand Up @@ -424,6 +424,8 @@ class PipelineXSinkLocalStateBase {
// override in exchange sink , AsyncWriterSink
virtual Dependency* finishdependency() { return nullptr; }

std::shared_ptr<QueryStatistics> get_query_statistics_ptr() { return _query_statistics; }

protected:
DataSinkOperatorXBase* _parent = nullptr;
RuntimeState* _state = nullptr;
Expand All @@ -447,6 +449,8 @@ class PipelineXSinkLocalStateBase {
RuntimeProfile::Counter* _exec_timer = nullptr;
RuntimeProfile::Counter* _memory_used_counter = nullptr;
RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr;

std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
};

class DataSinkOperatorXBase : public OperatorBase {
Expand Down
5 changes: 4 additions & 1 deletion be/src/pipeline/pipeline_x/pipeline_x_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ Status PipelineXTask::prepare(const TPipelineInstanceParams& local_params, const
auto scan_ranges = find_with_default(local_params.per_node_scan_ranges,
_operators.front()->node_id(), no_scan_ranges);
auto* parent_profile = _state->get_sink_local_state(_sink->operator_id())->profile();
query_ctx->register_query_statistics(
_state->get_sink_local_state(_sink->operator_id())->get_query_statistics_ptr());

for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) {
auto& op = _operators[op_idx];
auto& deps = get_upstream_dependency(op->operator_id());
Expand All @@ -105,7 +108,7 @@ Status PipelineXTask::prepare(const TPipelineInstanceParams& local_params, const
RETURN_IF_ERROR(op->setup_local_state(_state, info));
parent_profile = _state->get_local_state(op->operator_id())->profile();
query_ctx->register_query_statistics(
_state->get_local_state(op->operator_id())->query_statistics_ptr());
_state->get_local_state(op->operator_id())->get_query_statistics_ptr());
}

_block = doris::vectorized::Block::create_unique();
Expand Down
11 changes: 7 additions & 4 deletions be/src/runtime/query_statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ void NodeStatistics::from_pb(const PNodeStatistics& node_statistics) {
}

void QueryStatistics::merge(const QueryStatistics& other) {
scan_rows += other.scan_rows;
scan_bytes += other.scan_bytes;
int64_t other_cpu_time = other.cpu_nanos.load(std::memory_order_relaxed);
cpu_nanos += other_cpu_time;
scan_rows += other.scan_rows.load(std::memory_order_relaxed);
scan_bytes += other.scan_bytes.load(std::memory_order_relaxed);
cpu_nanos += other.cpu_nanos.load(std::memory_order_relaxed);
shuffle_send_bytes += other.shuffle_send_bytes.load(std::memory_order_relaxed);
shuffle_send_rows += other.shuffle_send_rows.load(std::memory_order_relaxed);

int64_t other_peak_mem = other.max_peak_memory_bytes.load(std::memory_order_relaxed);
if (other_peak_mem > this->max_peak_memory_bytes) {
Expand Down Expand Up @@ -85,6 +86,8 @@ void QueryStatistics::to_thrift(TQueryStatistics* statistics) const {
statistics->__set_max_peak_memory_bytes(max_peak_memory_bytes.load(std::memory_order_relaxed));
statistics->__set_current_used_memory_bytes(
current_used_memory_bytes.load(std::memory_order_relaxed));
statistics->__set_shuffle_send_bytes(shuffle_send_bytes.load(std::memory_order_relaxed));
statistics->__set_shuffle_send_rows(shuffle_send_rows.load(std::memory_order_relaxed));
}

void QueryStatistics::from_pb(const PQueryStatistics& statistics) {
Expand Down
19 changes: 17 additions & 2 deletions be/src/runtime/query_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ class QueryStatistics {
cpu_nanos(0),
returned_rows(0),
max_peak_memory_bytes(0),
current_used_memory_bytes(0) {}
current_used_memory_bytes(0),
shuffle_send_bytes(0),
shuffle_send_rows(0) {}
virtual ~QueryStatistics();

void merge(const QueryStatistics& other);
Expand All @@ -82,6 +84,14 @@ class QueryStatistics {
this->cpu_nanos.fetch_add(delta_cpu_time, std::memory_order_relaxed);
}

void add_shuffle_send_bytes(int64_t delta_bytes) {
this->shuffle_send_bytes.fetch_add(delta_bytes, std::memory_order_relaxed);
}

void add_shuffle_send_rows(int64_t delta_rows) {
this->shuffle_send_rows.fetch_add(delta_rows, std::memory_order_relaxed);
}

NodeStatistics* add_nodes_statistics(int64_t node_id) {
NodeStatistics* nodeStatistics = nullptr;
auto iter = _nodes_statistics_map.find(node_id);
Expand Down Expand Up @@ -115,8 +125,10 @@ class QueryStatistics {
void clear() {
scan_rows.store(0, std::memory_order_relaxed);
scan_bytes.store(0, std::memory_order_relaxed);

cpu_nanos.store(0, std::memory_order_relaxed);
shuffle_send_bytes.store(0, std::memory_order_relaxed);
shuffle_send_rows.store(0, std::memory_order_relaxed);

returned_rows = 0;
max_peak_memory_bytes.store(0, std::memory_order_relaxed);
clearNodeStatistics();
Expand Down Expand Up @@ -152,6 +164,9 @@ class QueryStatistics {
NodeStatisticsMap _nodes_statistics_map;
bool _collected = false;
std::atomic<int64_t> current_used_memory_bytes;

std::atomic<int64_t> shuffle_send_bytes;
std::atomic<int64_t> shuffle_send_rows;
};
using QueryStatisticsPtr = std::shared_ptr<QueryStatistics>;
// It is used for collecting sub plan query statistics in DataStreamRecvr.
Expand Down
3 changes: 3 additions & 0 deletions be/src/vec/sink/vdata_stream_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,9 @@ Status BlockSerializer<Parent>::serialize_block(const Block* src, PBlock* dest,
COUNTER_UPDATE(_parent->_bytes_sent_counter, compressed_bytes * num_receivers);
COUNTER_UPDATE(_parent->_uncompressed_bytes_counter, uncompressed_bytes * num_receivers);
COUNTER_UPDATE(_parent->_compress_timer, src->get_compress_time());
_parent->get_query_statistics_ptr()->add_shuffle_send_bytes(compressed_bytes *
num_receivers);
_parent->get_query_statistics_ptr()->add_shuffle_send_rows(src->rows() * num_receivers);
}

return Status::OK();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ public enum EventType {
public String stmt = "";
@AuditField(value = "CpuTimeMS")
public long cpuTimeMs = -1;
@AuditField(value = "ShuffleSendBytes")
public long shuffleSendBytes = -1;
@AuditField(value = "ShuffleSendRows")
public long shuffleSendRows = -1;
@AuditField(value = "SqlHash")
public String sqlHash = "";
@AuditField(value = "peakMemoryBytes")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ protected void runOneCycle() {
auditEvent.scanBytes = queryStats.scan_bytes;
auditEvent.peakMemoryBytes = queryStats.max_peak_memory_bytes;
auditEvent.cpuTimeMs = queryStats.cpu_ms;
auditEvent.shuffleSendBytes = queryStats.shuffle_send_bytes;
auditEvent.shuffleSendRows = queryStats.shuffle_send_rows;
}
Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent);
}
Expand Down Expand Up @@ -139,6 +141,7 @@ public void updateBeQueryStats(TReportWorkloadRuntimeStatusParams params) {
} else {
long currentTime = System.currentTimeMillis();
for (Map.Entry<String, TQueryStatistics> entry : params.query_statistics_map.entrySet()) {
LOG.info("log2109 queryid={}, shuffle={}", entry.getKey(), entry.getValue().shuffle_send_bytes);
queryIdMap.put(entry.getKey(), entry.getValue());
queryLastReportTime.put(entry.getKey(), currentTime);
}
Expand Down Expand Up @@ -175,6 +178,8 @@ private void mergeQueryStatistics(TQueryStatistics dst, TQueryStatistics src) {
dst.scan_rows += src.scan_rows;
dst.scan_bytes += src.scan_bytes;
dst.cpu_ms += src.cpu_ms;
dst.shuffle_send_bytes += src.shuffle_send_bytes;
dst.shuffle_send_rows += src.shuffle_send_rows;
if (dst.max_peak_memory_bytes < src.max_peak_memory_bytes) {
dst.max_peak_memory_bytes = src.max_peak_memory_bytes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public class ActiveQueriesTableValuedFunction extends MetadataTableValuedFunctio
new Column("ScanBytes", PrimitiveType.BIGINT),
new Column("BePeakMemoryBytes", PrimitiveType.BIGINT),
new Column("CurrentUsedMemoryBytes", PrimitiveType.BIGINT),
new Column("ShuffleSendBytes", PrimitiveType.BIGINT),
new Column("ShuffleSendRows", PrimitiveType.BIGINT),
new Column("Database", ScalarType.createStringType()),
new Column("FrontendInstance", ScalarType.createStringType()),
new Column("Sql", ScalarType.createStringType()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,13 +441,17 @@ private static TRow makeQueryStatisticsTRow(SimpleDateFormat sdf, String queryId
trow.addToColumnValue(new TCell().setLongVal(qs.scan_bytes));
trow.addToColumnValue(new TCell().setLongVal(qs.max_peak_memory_bytes));
trow.addToColumnValue(new TCell().setLongVal(qs.current_used_memory_bytes));
trow.addToColumnValue(new TCell().setLongVal(qs.shuffle_send_bytes));
trow.addToColumnValue(new TCell().setLongVal(qs.shuffle_send_rows));
} else {
trow.addToColumnValue(new TCell().setLongVal(0L));
trow.addToColumnValue(new TCell().setLongVal(0L));
trow.addToColumnValue(new TCell().setLongVal(0L));
trow.addToColumnValue(new TCell().setLongVal(0L));
trow.addToColumnValue(new TCell().setLongVal(0L));
trow.addToColumnValue(new TCell().setLongVal(0L));
trow.addToColumnValue(new TCell().setLongVal(0L));
trow.addToColumnValue(new TCell().setLongVal(0L));
}

if (queryInfo.getConnectContext() != null) {
Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,8 @@ struct TQueryStatistics {
5: optional i64 max_peak_memory_bytes
6: optional i64 current_used_memory_bytes
7: optional i64 workload_group_id
8: optional i64 shuffle_send_bytes
9: optional i64 shuffle_send_rows
}

struct TReportWorkloadRuntimeStatusParams {
Expand Down
Loading