Skip to content

Commit

Permalink
[Chore](statistic) do not use memory order relaxed on QueryStatistics…
Browse files Browse the repository at this point in the history
… and add sync on te… (apache#38048)

## Proposed changes
1. do not use memory order relaxed on QueryStatistics
2. remove some unused code
3. add sync to test dry_run
  • Loading branch information
BiteTheDDDDt authored Jul 22, 2024
1 parent 84afb29 commit 4411256
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 134 deletions.
75 changes: 18 additions & 57 deletions be/src/runtime/query_statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,20 @@
namespace doris {

void QueryStatistics::merge(const QueryStatistics& other) {
scan_rows += other.scan_rows.load(std::memory_order_relaxed);
scan_bytes += other.scan_bytes.load(std::memory_order_relaxed);
cpu_nanos += other.cpu_nanos.load(std::memory_order_relaxed);
shuffle_send_bytes += other.shuffle_send_bytes.load(std::memory_order_relaxed);
shuffle_send_rows += other.shuffle_send_rows.load(std::memory_order_relaxed);
_scan_bytes_from_local_storage +=
other._scan_bytes_from_local_storage.load(std::memory_order_relaxed);
_scan_bytes_from_remote_storage +=
other._scan_bytes_from_remote_storage.load(std::memory_order_relaxed);

int64_t other_peak_mem = other.max_peak_memory_bytes.load(std::memory_order_relaxed);
scan_rows += other.scan_rows;
scan_bytes += other.scan_bytes;
cpu_nanos += other.cpu_nanos;
shuffle_send_bytes += other.shuffle_send_bytes;
shuffle_send_rows += other.shuffle_send_rows;
_scan_bytes_from_local_storage += other._scan_bytes_from_local_storage;
_scan_bytes_from_remote_storage += other._scan_bytes_from_remote_storage;

int64_t other_peak_mem = other.max_peak_memory_bytes;
if (other_peak_mem > this->max_peak_memory_bytes) {
this->max_peak_memory_bytes = other_peak_mem;
}

int64_t other_memory_used = other.current_used_memory_bytes.load(std::memory_order_relaxed);
int64_t other_memory_used = other.current_used_memory_bytes;
if (other_memory_used > 0) {
this->current_used_memory_bytes = other_memory_used;
}
Expand All @@ -61,15 +59,14 @@ void QueryStatistics::to_pb(PQueryStatistics* statistics) {

void QueryStatistics::to_thrift(TQueryStatistics* statistics) const {
DCHECK(statistics != nullptr);
statistics->__set_scan_bytes(scan_bytes.load(std::memory_order_relaxed));
statistics->__set_scan_rows(scan_rows.load(std::memory_order_relaxed));
statistics->__set_cpu_ms(cpu_nanos.load(std::memory_order_relaxed) / NANOS_PER_MILLIS);
statistics->__set_returned_rows(returned_rows.load(std::memory_order_relaxed));
statistics->__set_max_peak_memory_bytes(max_peak_memory_bytes.load(std::memory_order_relaxed));
statistics->__set_current_used_memory_bytes(
current_used_memory_bytes.load(std::memory_order_relaxed));
statistics->__set_shuffle_send_bytes(shuffle_send_bytes.load(std::memory_order_relaxed));
statistics->__set_shuffle_send_rows(shuffle_send_rows.load(std::memory_order_relaxed));
statistics->__set_scan_bytes(scan_bytes);
statistics->__set_scan_rows(scan_rows);
statistics->__set_cpu_ms(cpu_nanos / NANOS_PER_MILLIS);
statistics->__set_returned_rows(returned_rows);
statistics->__set_max_peak_memory_bytes(max_peak_memory_bytes);
statistics->__set_current_used_memory_bytes(current_used_memory_bytes);
statistics->__set_shuffle_send_bytes(shuffle_send_bytes);
statistics->__set_shuffle_send_rows(shuffle_send_rows);
statistics->__set_scan_bytes_from_remote_storage(_scan_bytes_from_remote_storage);
statistics->__set_scan_bytes_from_local_storage(_scan_bytes_from_local_storage);
}
Expand All @@ -82,42 +79,6 @@ void QueryStatistics::from_pb(const PQueryStatistics& statistics) {
_scan_bytes_from_remote_storage = statistics.scan_bytes_from_remote_storage();
}

void QueryStatistics::merge(QueryStatisticsRecvr* recvr) {
recvr->merge(this);
}

void QueryStatistics::merge(QueryStatisticsRecvr* recvr, int sender_id) {
DCHECK(recvr != nullptr);
auto QueryStatisticsptr = recvr->find(sender_id);
if (QueryStatisticsptr) {
merge(*QueryStatisticsptr);
}
}

QueryStatistics::~QueryStatistics() {}

void QueryStatisticsRecvr::insert(const PQueryStatistics& statistics, int sender_id) {
std::lock_guard<std::mutex> l(_lock);
if (!_query_statistics.contains(sender_id)) {
_query_statistics[sender_id] = std::make_shared<QueryStatistics>();
}
_query_statistics[sender_id]->from_pb(statistics);
}

void QueryStatisticsRecvr::insert(QueryStatisticsPtr statistics, int sender_id) {
if (!statistics->collected()) return;
if (_query_statistics.contains(sender_id)) return;
std::lock_guard<std::mutex> l(_lock);
_query_statistics[sender_id] = statistics;
}

QueryStatisticsPtr QueryStatisticsRecvr::find(int sender_id) {
std::lock_guard<std::mutex> l(_lock);
auto it = _query_statistics.find(sender_id);
if (it != _query_statistics.end()) {
return it->second;
}
return nullptr;
}

} // namespace doris
90 changes: 13 additions & 77 deletions be/src/runtime/query_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

namespace doris {

class QueryStatisticsRecvr;
class PNodeStatistics;
class PQueryStatistics;

Expand All @@ -53,82 +52,44 @@ class QueryStatistics {

void merge(const QueryStatistics& other);

void add_scan_rows(int64_t delta_scan_rows) {
this->scan_rows.fetch_add(delta_scan_rows, std::memory_order_relaxed);
}
void add_scan_rows(int64_t delta_scan_rows) { scan_rows += delta_scan_rows; }

void add_scan_bytes(int64_t delta_scan_bytes) {
this->scan_bytes.fetch_add(delta_scan_bytes, std::memory_order_relaxed);
}
void add_scan_bytes(int64_t delta_scan_bytes) { scan_bytes += delta_scan_bytes; }

void add_cpu_nanos(int64_t delta_cpu_time) {
this->cpu_nanos.fetch_add(delta_cpu_time, std::memory_order_relaxed);
}
void add_cpu_nanos(int64_t delta_cpu_time) { cpu_nanos += delta_cpu_time; }

void add_shuffle_send_bytes(int64_t delta_bytes) {
this->shuffle_send_bytes.fetch_add(delta_bytes, std::memory_order_relaxed);
}
void add_shuffle_send_bytes(int64_t delta_bytes) { shuffle_send_bytes += delta_bytes; }

void add_shuffle_send_rows(int64_t delta_rows) {
this->shuffle_send_rows.fetch_add(delta_rows, std::memory_order_relaxed);
}
void add_shuffle_send_rows(int64_t delta_rows) { shuffle_send_rows += delta_rows; }

void add_scan_bytes_from_local_storage(int64_t scan_bytes_from_local_storage) {
this->_scan_bytes_from_local_storage += scan_bytes_from_local_storage;
_scan_bytes_from_local_storage += scan_bytes_from_local_storage;
}

void add_scan_bytes_from_remote_storage(int64_t scan_bytes_from_remote_storage) {
this->_scan_bytes_from_remote_storage += scan_bytes_from_remote_storage;
_scan_bytes_from_remote_storage += scan_bytes_from_remote_storage;
}

void add_returned_rows(int64_t num_rows) {
this->returned_rows.fetch_add(num_rows, std::memory_order_relaxed);
}
void add_returned_rows(int64_t num_rows) { returned_rows += num_rows; }

void set_max_peak_memory_bytes(int64_t max_peak_memory_bytes) {
this->max_peak_memory_bytes.store(max_peak_memory_bytes, std::memory_order_relaxed);
this->max_peak_memory_bytes = max_peak_memory_bytes;
}

void set_current_used_memory_bytes(int64_t current_used_memory) {
this->current_used_memory_bytes.store(current_used_memory, std::memory_order_relaxed);
}

void merge(QueryStatisticsRecvr* recvr);

void merge(QueryStatisticsRecvr* recvr, int sender_id);

void clearNodeStatistics();

void clear() {
scan_rows.store(0, std::memory_order_relaxed);
scan_bytes.store(0, std::memory_order_relaxed);
cpu_nanos.store(0, std::memory_order_relaxed);
shuffle_send_bytes.store(0, std::memory_order_relaxed);
shuffle_send_rows.store(0, std::memory_order_relaxed);
_scan_bytes_from_local_storage.store(0);
_scan_bytes_from_remote_storage.store(0);

returned_rows.store(0, std::memory_order_relaxed);
max_peak_memory_bytes.store(0, std::memory_order_relaxed);
clearNodeStatistics();
//clear() is used before collection, so calling "clear" is equivalent to being collected.
set_collected();
current_used_memory_bytes = current_used_memory;
}

void to_pb(PQueryStatistics* statistics);
void to_thrift(TQueryStatistics* statistics) const;
void from_pb(const PQueryStatistics& statistics);
bool collected() const { return _collected; }
void set_collected() { _collected = true; }

int64_t get_scan_rows() { return scan_rows.load(std::memory_order_relaxed); }
int64_t get_scan_bytes() { return scan_bytes.load(std::memory_order_relaxed); }
int64_t get_current_used_memory_bytes() {
return current_used_memory_bytes.load(std::memory_order_relaxed);
}
int64_t get_scan_rows() { return scan_rows; }
int64_t get_scan_bytes() { return scan_bytes; }
int64_t get_current_used_memory_bytes() { return current_used_memory_bytes; }

private:
friend class QueryStatisticsRecvr;
std::atomic<int64_t> scan_rows;
std::atomic<int64_t> scan_bytes;
std::atomic<int64_t> cpu_nanos;
Expand All @@ -148,30 +109,5 @@ class QueryStatistics {
};
using QueryStatisticsPtr = std::shared_ptr<QueryStatistics>;
// It is used for collecting sub plan query statistics in DataStreamRecvr.
class QueryStatisticsRecvr {
public:
~QueryStatisticsRecvr() = default;

// Transmitted via RPC, incurring serialization overhead.
void insert(const PQueryStatistics& statistics, int sender_id);

// using local_exchange for transmission, only need to hold a shared pointer.
void insert(QueryStatisticsPtr statistics, int sender_id);

QueryStatisticsPtr find(int sender_id);

private:
friend class QueryStatistics;

void merge(QueryStatistics* statistics) {
std::lock_guard<std::mutex> l(_lock);
for (auto& pair : _query_statistics) {
statistics->merge(*(pair.second));
}
}

std::map<int, QueryStatisticsPtr> _query_statistics;
std::mutex _lock;
};

} // namespace doris
2 changes: 2 additions & 0 deletions regression-test/suites/query_p0/dry_run/dry_run.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ suite ("dry_run") {
sql "insert into d_table select -4,-4,-4,'d';"
sql "insert into d_table(k4,k2) values('d',4);"

sql "sync"

sql "set dry_run_query=true;"
qt_select_star "select * from d_table;"
}

0 comments on commit 4411256

Please sign in to comment.