Skip to content

Commit

Permalink
[Enhancement] Convert metric to Json format (apache#3635)
Browse files Browse the repository at this point in the history
Add a JSON format for existing metrics like this.
```
{
    "tags":
    {
        "metric":"thread_pool",
        "name":"thrift-server-pool",
        "type":"active_thread_num"
    },
    "unit":"number",
    "value":3
}
```
I add a new JsonMetricVisitor to handle the transformation.
It's not to modify existing PrometheusMetricVisitor and SimpleCoreMetricVisitor.
Also I add
1.  A unit item to indicate the metric better 
2. Cloning tablet statistics divided by database.
3. Use white space to replace newline in audit.log
  • Loading branch information
chaoyli authored May 27, 2020
1 parent 12c59ba commit 1cc78fe
Show file tree
Hide file tree
Showing 43 changed files with 711 additions and 338 deletions.
3 changes: 3 additions & 0 deletions be/src/exec/tablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,9 @@ Status OlapTableSink::send(RuntimeState* state, RowBatch* input_batch) {
// update incrementally so that FE can get the progress.
// the real 'num_rows_load_total' will be set when sink being closed.
state->update_num_rows_load_total(input_batch->num_rows());
state->update_num_bytes_load_total(input_batch->total_byte_size());
DorisMetrics::instance()->load_rows_total.increment(input_batch->num_rows());
DorisMetrics::instance()->load_bytes_total.increment(input_batch->total_byte_size());
RowBatch* batch = input_batch;
if (!_output_expr_ctxs.empty()) {
SCOPED_RAW_TIMER(&_convert_batch_ns);
Expand Down
80 changes: 72 additions & 8 deletions be/src/http/action/metrics_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

#include "http/action/metrics_action.h"

#include <rapidjson/rapidjson.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/document.h>
#include <rapidjson/writer.h>
#include <string>

#include "http/http_request.h"
Expand All @@ -36,7 +40,7 @@ class PrometheusMetricsVisitor : public MetricsVisitor {
std::string to_string() const { return _ss.str(); }
private:
void _visit_simple_metric(
const std::string& name, const MetricLabels& labels, SimpleMetric* metric);
const std::string& name, const MetricLabels& labels, Metric* metric);
private:
std::stringstream _ss;
};
Expand Down Expand Up @@ -88,7 +92,7 @@ void PrometheusMetricsVisitor::visit(const std::string& prefix,
case MetricType::COUNTER:
case MetricType::GAUGE:
for (auto& it : collector->metrics()) {
_visit_simple_metric(metric_name, it.first, (SimpleMetric*) it.second);
_visit_simple_metric(metric_name, it.first, (Metric*) it.second);
}
break;
default:
Expand All @@ -97,7 +101,7 @@ void PrometheusMetricsVisitor::visit(const std::string& prefix,
}

void PrometheusMetricsVisitor::_visit_simple_metric(
const std::string& name, const MetricLabels& labels, SimpleMetric* metric) {
const std::string& name, const MetricLabels& labels, Metric* metric) {
_ss << name;
// labels
if (!labels.empty()) {
Expand Down Expand Up @@ -138,20 +142,80 @@ void SimpleCoreMetricsVisitor::visit(const std::string& prefix,
}

for (auto& it : collector->metrics()) {
_ss << metric_name << " LONG " << ((SimpleMetric*) it.second)->to_string()
_ss << metric_name << " LONG " << ((Metric*) it.second)->to_string()
<< "\n";
}
}

class JsonMetricsVisitor : public MetricsVisitor {
public:
JsonMetricsVisitor() {
}
virtual ~JsonMetricsVisitor() {}
void visit(const std::string& prefix, const std::string& name,
MetricCollector* collector) override;
std::string to_string() {
rapidjson::StringBuffer strBuf;
rapidjson::Writer<rapidjson::StringBuffer> writer(strBuf);
doc.Accept(writer);
return strBuf.GetString();
}

private:
rapidjson::Document doc{rapidjson::kArrayType};
};

void JsonMetricsVisitor::visit(const std::string& prefix,
const std::string& name,
MetricCollector* collector) {
if (collector->empty() || name.empty()) {
return;
}

rapidjson::Document::AllocatorType& allocator = doc.GetAllocator();
switch (collector->type()) {
case MetricType::COUNTER:
case MetricType::GAUGE:
for (auto& it : collector->metrics()) {
const MetricLabels& labels = it.first;
Metric* metric = reinterpret_cast<Metric*>(it.second);
rapidjson::Value metric_obj(rapidjson::kObjectType);
rapidjson::Value tag_obj(rapidjson::kObjectType);
tag_obj.AddMember("metric", rapidjson::Value(name.c_str(), allocator), allocator);
// labels
if (!labels.empty()) {
for (auto& label : labels.labels) {
tag_obj.AddMember(
rapidjson::Value(label.name.c_str(), allocator),
rapidjson::Value(label.value.c_str(), allocator),
allocator);
}
}
metric_obj.AddMember("tags", tag_obj, allocator);
rapidjson::Value unit_val(unit_name(metric->unit()), allocator);
metric_obj.AddMember("unit", unit_val, allocator);
metric->write_value(metric_obj, allocator);
doc.PushBack(metric_obj, allocator);
}
break;
default:
break;
}
}

void MetricsAction::handle(HttpRequest* req) {
const std::string& type = req->param("type");
std::string str;
if (type != "core") {
PrometheusMetricsVisitor visitor;
if (type == "core") {
SimpleCoreMetricsVisitor visitor;
_metrics->collect(&visitor);
str.assign(visitor.to_string());
} else if (type == "agent") {
JsonMetricsVisitor visitor;
_metrics->collect(&visitor);
str.assign(visitor.to_string());
} else {
SimpleCoreMetricsVisitor visitor;
PrometheusMetricsVisitor visitor;
_metrics->collect(&visitor);
str.assign(visitor.to_string());
}
Expand All @@ -160,4 +224,4 @@ void MetricsAction::handle(HttpRequest* req) {
HttpChannel::send_reply(req, str);
}

}
} // namespace doris
28 changes: 14 additions & 14 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@

namespace doris {

IntCounter k_streaming_load_requests_total;
IntCounter k_streaming_load_bytes;
IntCounter k_streaming_load_duration_ms;
static IntGauge k_streaming_load_current_processing;
METRIC_DEFINE_INT_COUNTER(streaming_load_requests_total, MetricUnit::NUMBER);
METRIC_DEFINE_INT_COUNTER(streaming_load_bytes, MetricUnit::BYTES);
METRIC_DEFINE_INT_COUNTER(streaming_load_duration_ms, MetricUnit::MILLISECONDS);
METRIC_DEFINE_INT_GAUGE(streaming_load_current_processing, MetricUnit::NUMBER);

#ifdef BE_TEST
TStreamLoadPutResult k_stream_load_put_result;
Expand All @@ -89,13 +89,13 @@ static bool is_format_support_streaming(TFileFormatType::type format) {

StreamLoadAction::StreamLoadAction(ExecEnv* exec_env) : _exec_env(exec_env) {
DorisMetrics::instance()->metrics()->register_metric("streaming_load_requests_total",
&k_streaming_load_requests_total);
&streaming_load_requests_total);
DorisMetrics::instance()->metrics()->register_metric("streaming_load_bytes",
&k_streaming_load_bytes);
&streaming_load_bytes);
DorisMetrics::instance()->metrics()->register_metric("streaming_load_duration_ms",
&k_streaming_load_duration_ms);
&streaming_load_duration_ms);
DorisMetrics::instance()->metrics()->register_metric("streaming_load_current_processing",
&k_streaming_load_current_processing);
&streaming_load_current_processing);
}

StreamLoadAction::~StreamLoadAction() {
Expand Down Expand Up @@ -131,10 +131,10 @@ void StreamLoadAction::handle(HttpRequest* req) {
HttpChannel::send_reply(req, str);

// update statstics
k_streaming_load_requests_total.increment(1);
k_streaming_load_duration_ms.increment(ctx->load_cost_nanos / 1000000);
k_streaming_load_bytes.increment(ctx->receive_bytes);
k_streaming_load_current_processing.increment(-1);
streaming_load_requests_total.increment(1);
streaming_load_duration_ms.increment(ctx->load_cost_nanos / 1000000);
streaming_load_bytes.increment(ctx->receive_bytes);
streaming_load_current_processing.increment(-1);
}

Status StreamLoadAction::_handle(StreamLoadContext* ctx) {
Expand Down Expand Up @@ -164,7 +164,7 @@ Status StreamLoadAction::_handle(StreamLoadContext* ctx) {
}

int StreamLoadAction::on_header(HttpRequest* req) {
k_streaming_load_current_processing.increment(1);
streaming_load_current_processing.increment(1);

StreamLoadContext* ctx = new StreamLoadContext(_exec_env);
ctx->ref();
Expand Down Expand Up @@ -195,7 +195,7 @@ int StreamLoadAction::on_header(HttpRequest* req) {
}
auto str = ctx->to_json();
HttpChannel::send_reply(req, str);
k_streaming_load_current_processing.increment(-1);
streaming_load_current_processing.increment(-1);
return -1;
}
return 0;
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/client_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,12 @@ void ClientCacheHelper::init_metrics(MetricRegistry* metrics, const std::string&
// usage, but ensures that _metrics_enabled is published.
boost::lock_guard<boost::mutex> lock(_lock);

_used_clients.reset(new IntGauge());
_used_clients.reset(new IntGauge(MetricUnit::NUMBER));
metrics->register_metric("thrift_used_clients",
MetricLabels().add("name", key_prefix),
_used_clients.get());

_opened_clients.reset(new IntGauge());
_opened_clients.reset(new IntGauge(MetricUnit::NUMBER));
metrics->register_metric("thrift_opened_clients",
MetricLabels().add("name", key_prefix),
_opened_clients.get());
Expand Down
12 changes: 6 additions & 6 deletions be/src/runtime/memory/chunk_allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ namespace doris {

ChunkAllocator* ChunkAllocator::_s_instance = nullptr;

static IntCounter local_core_alloc_count;
static IntCounter other_core_alloc_count;
static IntCounter system_alloc_count;
static IntCounter system_free_count;
static IntCounter system_alloc_cost_ns;
static IntCounter system_free_cost_ns;
static IntCounter local_core_alloc_count(MetricUnit::NUMBER);
static IntCounter other_core_alloc_count(MetricUnit::NUMBER);
static IntCounter system_alloc_count(MetricUnit::NUMBER);
static IntCounter system_free_count(MetricUnit::NUMBER);
static IntCounter system_alloc_cost_ns(MetricUnit::NANOSECONDS);
static IntCounter system_free_cost_ns(MetricUnit::NANOSECONDS);

#ifdef BE_TEST
static std::mutex s_mutex;
Expand Down
14 changes: 14 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,10 @@ class RuntimeState {
void append_error_msg_to_file(const std::string& line, const std::string& error_msg,
bool is_summary = false);

int64_t num_bytes_load_total() {
return _num_bytes_load_total.load();
}

int64_t num_rows_load_total() {
return _num_rows_load_total.load();
}
Expand All @@ -413,6 +417,14 @@ class RuntimeState {
_num_rows_load_total.store(num_rows);
}

void update_num_bytes_load_total(int64_t bytes_load) {
_num_bytes_load_total.fetch_add(bytes_load);
}

void set_update_num_bytes_load_total(int64_t bytes_load) {
_num_bytes_load_total.store(bytes_load);
}

void update_num_rows_load_filtered(int64_t num_rows) {
_num_rows_load_filtered.fetch_add(num_rows);
}
Expand Down Expand Up @@ -587,6 +599,8 @@ class RuntimeState {
std::atomic<int64_t> _num_rows_load_unselected; // rows filtered by predicates
std::atomic<int64_t> _num_print_error_rows;

std::atomic<int64_t> _num_bytes_load_total; // total bytes read from source

std::vector<std::string> _export_output_files;

std::string _import_label;
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/tmp_file_mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ Status TmpFileMgr::init_custom(
}

DCHECK(metrics != NULL);
_num_active_scratch_dirs_metric.reset(new IntGauge());
_num_active_scratch_dirs_metric.reset(new IntGauge(MetricUnit::NUMBER));
metrics->register_metric("active_scratch_dirs", _num_active_scratch_dirs_metric.get());
//_active_scratch_dirs_metric = metrics->register_metric(new SetMetric<std::string>(
// TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS_LIST,
Expand Down
13 changes: 9 additions & 4 deletions be/src/util/doris_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ DorisMetrics::DorisMetrics() : _name("doris_be"), _hook_name("doris_metrics"), _
_metrics.register_metric(
"stream_load", MetricLabels().add("type", "load_rows"),
&stream_load_rows_total);
_metrics.register_metric(
"load", MetricLabels().add("type", "receive_bytes"),
&stream_receive_bytes_total);
_metrics.register_metric("load_rows", &load_rows_total);
_metrics.register_metric("load_bytes", &load_bytes_total);

// Gauge
REGISTER_DORIS_METRIC(memory_pool_bytes_total);
Expand Down Expand Up @@ -188,13 +193,13 @@ void DorisMetrics::initialize(
const std::vector<std::string>& network_interfaces) {
// disk usage
for (auto& path : paths) {
IntGauge* gauge = disks_total_capacity.set_key(path);
IntGauge* gauge = disks_total_capacity.set_key(path, MetricUnit::BYTES);
_metrics.register_metric("disks_total_capacity", MetricLabels().add("path", path), gauge);
gauge = disks_avail_capacity.set_key(path);
gauge = disks_avail_capacity.set_key(path, MetricUnit::BYTES);
_metrics.register_metric("disks_avail_capacity", MetricLabels().add("path", path), gauge);
gauge = disks_data_used_capacity.set_key(path);
gauge = disks_data_used_capacity.set_key(path, MetricUnit::BYTES);
_metrics.register_metric("disks_data_used_capacity", MetricLabels().add("path", path), gauge);
gauge = disks_state.set_key(path);
gauge = disks_state.set_key(path, MetricUnit::BYTES);
_metrics.register_metric("disks_state", MetricLabels().add("path", path), gauge);
}

Expand Down
Loading

0 comments on commit 1cc78fe

Please sign in to comment.