Skip to content

Commit

Permalink
[Core] Remove the unused MetricPointExporter (#43580)
Browse files Browse the repository at this point in the history
MetricPointExporter internally uses StdoutExporterClient which is a no-op so it doesn't export anything. We are effectively only using OpenCensusProtoExporter to export metrics to metrics agent.

Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
  • Loading branch information
jjyao authored Mar 6, 2024
1 parent f92928c commit ba5f041
Show file tree
Hide file tree
Showing 13 changed files with 48 additions and 693 deletions.
17 changes: 0 additions & 17 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -582,12 +582,10 @@ ray_cc_library(
name = "stats_lib",
srcs = [
"src/ray/stats/metric_exporter.cc",
"src/ray/stats/metric_exporter_client.cc",
],
hdrs = [
"src/ray/stats/metric.h",
"src/ray/stats/metric_exporter.h",
"src/ray/stats/metric_exporter_client.h",
"src/ray/stats/stats.h",
"src/ray/stats/tag_defs.h",
],
Expand Down Expand Up @@ -1448,21 +1446,6 @@ ray_cc_test(
],
)

ray_cc_test(
name = "metric_exporter_client_test",
size = "small",
srcs = ["src/ray/stats/metric_exporter_client_test.cc"],
tags = [
"stats",
"team:core",
"no_tsan",
],
deps = [
":stats_lib",
"@com_google_googletest//:gtest_main",
],
)

ray_cc_test(
name = "metric_exporter_grpc_test",
size = "small",
Expand Down
15 changes: 0 additions & 15 deletions src/ray/protobuf/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -879,21 +879,6 @@ message NodeResourceUsage {
string json = 1;
}

message MetricPoint {
// Name of the metric.
string metric_name = 1;
// Timestamp when metric is exported.
int64 timestamp = 2;
// Value of the metric point.
double value = 3;
// Tags of the metric.
map<string, string> tags = 4;
// [Optional] Description of the metric.
string description = 5;
// [Optional] Unit of the metric.
string units = 6;
}

// Type of a worker exit.
enum WorkerExitType {
// Worker exits due to system level failures (i.e. worker crash).
Expand Down
10 changes: 0 additions & 10 deletions src/ray/protobuf/reporter.proto
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,6 @@ message MemoryProfilingReply {
optional string warning = 3;
}

message ReportMetricsRequest {
repeated MetricPoint metrics_points = 1;
}

message ReportMetricsReply {
bool metrcs_description_required = 1;
}

message ReportOCMetricsRequest {
repeated opencensus.proto.metrics.v1.Metric metrics = 1;
bytes worker_id = 2;
Expand All @@ -104,8 +96,6 @@ message ReportOCMetricsReply {}
service ReporterService {
// Get the profiling stats.
rpc GetProfilingStats(GetProfilingStatsRequest) returns (GetProfilingStatsReply);
// Report metrics to the local metrics agents.
rpc ReportMetrics(ReportMetricsRequest) returns (ReportMetricsReply);
// Report OpenCensus metrics to the local metrics agent.
rpc ReportOCMetrics(ReportOCMetricsRequest) returns (ReportOCMetricsReply);
rpc GetTraceback(GetTracebackRequest) returns (GetTracebackReply);
Expand Down
12 changes: 0 additions & 12 deletions src/ray/rpc/metrics_agent_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,6 @@ class MetricsAgentClient {
public:
virtual ~MetricsAgentClient() = default;

/// Report metrics to metrics agent.
///
/// \param[in] request The request message.
/// \param[in] callback The callback function that handles reply.
VOID_RPC_CLIENT_VIRTUAL_METHOD_DECL(ReporterService, ReportMetrics)

/// Report open census protobuf metrics to metrics agent.
///
/// \param[in] request The request message.
Expand All @@ -62,12 +56,6 @@ class MetricsAgentClientImpl : public MetricsAgentClient {
address, port, client_call_manager_);
};

VOID_RPC_CLIENT_METHOD(ReporterService,
ReportMetrics,
grpc_client_,
/*method_timeout_ms*/ -1,
override)

VOID_RPC_CLIENT_METHOD(ReporterService,
ReportOCMetrics,
grpc_client_,
Expand Down
9 changes: 0 additions & 9 deletions src/ray/stats/metric.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,15 +200,6 @@ class Sum : public Metric {

}; // class Sum

/// Raw metric view point for exporter.
struct MetricPoint {
std::string metric_name;
int64_t timestamp;
double value;
std::unordered_map<std::string, std::string> tags;
const opencensus::stats::MeasureDescriptor &measure_descriptor;
};

enum StatsType : int { COUNT, SUM, GAUGE, HISTOGRAM };

namespace internal {
Expand Down
102 changes: 0 additions & 102 deletions src/ray/stats/metric_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,108 +24,6 @@ namespace {
inline constexpr std::string_view kGrpcIoMetricsNamePrefix = "grpc.io/";
}

template <>
void MetricPointExporter::ExportToPoints(
const opencensus::stats::ViewData::DataMap<opencensus::stats::Distribution>
&view_data,
const opencensus::stats::MeasureDescriptor &measure_descriptor,
std::vector<std::string> &keys,
std::vector<MetricPoint> &points) {
// Return if no raw data found in view map.
if (view_data.size() == 0) {
return;
}

const auto &metric_name = measure_descriptor.name();

// NOTE(lingxuan.zlx): No sampling in histogram data, so all points all be filled in.
std::unordered_map<std::string, std::string> tags;
for (size_t i = 0; i < view_data.begin()->first.size(); ++i) {
tags[keys[i]] = view_data.begin()->first[i];
}
// Histogram metric will be append suffix with mean/max/min.
double hist_mean = 0.0;
double hist_max = 0.0;
double hist_min = 0.0;
bool in_first_hist_data = true;
for (const auto &row : view_data) {
if (in_first_hist_data) {
hist_mean = static_cast<double>(row.second.mean());
hist_max = static_cast<double>(row.second.max());
hist_min = static_cast<double>(row.second.min());
in_first_hist_data = false;
} else {
hist_mean += static_cast<double>(row.second.mean());
hist_max = std::max(hist_max, static_cast<double>(row.second.max()));
hist_min = std::min(hist_min, static_cast<double>(row.second.min()));
}
}
hist_mean /= view_data.size();
MetricPoint mean_point = {
metric_name + ".mean", current_sys_time_ms(), hist_mean, tags, measure_descriptor};
MetricPoint max_point = {
metric_name + ".max", current_sys_time_ms(), hist_max, tags, measure_descriptor};
MetricPoint min_point = {
metric_name + ".min", current_sys_time_ms(), hist_min, tags, measure_descriptor};
points.push_back(std::move(mean_point));
points.push_back(std::move(max_point));
points.push_back(std::move(min_point));

if (points.size() >= report_batch_size_) {
metric_exporter_client_->ReportMetrics(points);
points.clear();
}
}

void MetricPointExporter::ExportViewData(
const std::vector<std::pair<opencensus::stats::ViewDescriptor,
opencensus::stats::ViewData>> &data) {
std::vector<MetricPoint> points;
// NOTE(lingxuan.zlx): There is no sampling in view data, so all raw metric
// data will be processed.
for (const auto &datum : data) {
auto &descriptor = datum.first;
auto &view_data = datum.second;

std::vector<std::string> keys;
for (size_t i = 0; i < descriptor.columns().size(); ++i) {
keys.push_back(descriptor.columns()[i].name());
}
const auto &measure_descriptor = descriptor.measure_descriptor();
switch (view_data.type()) {
case opencensus::stats::ViewData::Type::kDouble:
ExportToPoints<double>(view_data.double_data(), measure_descriptor, keys, points);
break;
case opencensus::stats::ViewData::Type::kInt64:
ExportToPoints<int64_t>(view_data.int_data(), measure_descriptor, keys, points);
break;
case opencensus::stats::ViewData::Type::kDistribution:
ExportToPoints<opencensus::stats::Distribution>(
view_data.distribution_data(), measure_descriptor, keys, points);
break;
default:
RAY_LOG(FATAL) << "Unknown view data type.";
break;
}
}
for (auto &point : points) {
addGlobalTagsToGrpcMetric(point);
}
metric_exporter_client_->ReportMetrics(points);
}

/// Hack. We want to add GlobalTags to all our metrics, but gRPC OpenCencus plugin is not
/// configurable at all so we don't have chance to add our own tags. We use this hack to
/// append the tags in export time.
void MetricPointExporter::addGlobalTagsToGrpcMetric(MetricPoint &metric) {
if (std::string_view(metric.metric_name).substr(0, kGrpcIoMetricsNamePrefix.size()) ==
kGrpcIoMetricsNamePrefix) {
for (const auto &[key, value] : ray::stats::StatsConfig::instance().GetGlobalTags()) {
metric.tags[key.name()] = value;
}
}
}

OpenCensusProtoExporter::OpenCensusProtoExporter(const int port,
instrumented_io_context &io_service,
const std::string address,
Expand Down
65 changes: 1 addition & 64 deletions src/ray/stats/metric_exporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/id.h"
#include "ray/rpc/client_call.h"
#include "ray/rpc/metrics_agent_client.h"
#include "ray/stats/metric.h"
#include "ray/stats/metric_exporter_client.h"
#include "ray/util/logging.h"
#include "ray/util/util.h"

Expand All @@ -33,69 +33,6 @@ namespace stats {
/// opencensus data view, and sends it to the remote (for example
/// sends metrics to dashboard agents through RPC). How to use it? Register metrics
/// exporter after a main thread launched.
class MetricPointExporter final : public opencensus::stats::StatsExporter::Handler {
public:
explicit MetricPointExporter(
std::shared_ptr<MetricExporterClient> metric_exporter_client,
size_t report_batch_size = kDefaultBatchSize)
: metric_exporter_client_(metric_exporter_client),
report_batch_size_(report_batch_size) {}

~MetricPointExporter() = default;

static void Register(std::shared_ptr<MetricExporterClient> metric_exporter_client,
size_t report_batch_size) {
opencensus::stats::StatsExporter::RegisterPushHandler(
absl::make_unique<MetricPointExporter>(metric_exporter_client,
report_batch_size));
}

void ExportViewData(
const std::vector<std::pair<opencensus::stats::ViewDescriptor,
opencensus::stats::ViewData>> &data) override;

protected:
void addGlobalTagsToGrpcMetric(MetricPoint &metric);

private:
template <class DTYPE>
/// Extract raw data from view data, then metric exporter clients can use them
/// in points schema.
/// \param view_data, raw data in map
/// \param metric_name, metric name of view data
/// \param keys, metric tags map
/// \param points, memory metric vector instance
void ExportToPoints(const opencensus::stats::ViewData::DataMap<DTYPE> &view_data,
const opencensus::stats::MeasureDescriptor &measure_descriptor,
std::vector<std::string> &keys,
std::vector<MetricPoint> &points) {
const auto &metric_name = measure_descriptor.name();
for (const auto &row : view_data) {
std::unordered_map<std::string, std::string> tags;
for (size_t i = 0; i < keys.size(); ++i) {
tags[keys[i]] = row.first[i];
}
// Current timestamp is used for point not view data time.
MetricPoint point{metric_name,
current_sys_time_ms(),
static_cast<double>(row.second),
tags,
measure_descriptor};
points.push_back(std::move(point));
if (points.size() >= report_batch_size_) {
metric_exporter_client_->ReportMetrics(points);
points.clear();
}
}
}

private:
std::shared_ptr<MetricExporterClient> metric_exporter_client_;
/// Auto max minbatch size for reporting metrics to external components.
static constexpr size_t kDefaultBatchSize = 100;
size_t report_batch_size_;
};

class OpenCensusProtoExporter final : public opencensus::stats::StatsExporter::Handler {
public:
OpenCensusProtoExporter(const int port,
Expand Down
51 changes: 0 additions & 51 deletions src/ray/stats/metric_exporter_client.cc

This file was deleted.

Loading

0 comments on commit ba5f041

Please sign in to comment.