Skip to content

Commit

Permalink
[Metric] Fix crashed when register metric view in multithread (ray-pr…
Browse files Browse the repository at this point in the history
…oject#13485)

* Fix crashed when register metric view in multithread

* fix comments

* fix
  • Loading branch information
ashione authored Jan 25, 2021
1 parent db2c836 commit f9f2bfa
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 11 deletions.
29 changes: 18 additions & 11 deletions src/ray/stats/metric.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ namespace ray {

namespace stats {

absl::Mutex Metric::registration_mutex_;

static void RegisterAsView(opencensus::stats::ViewDescriptor view_descriptor,
const std::vector<opencensus::tags::TagKey> &keys) {
// Register global keys.
Expand Down Expand Up @@ -85,19 +87,24 @@ void Metric::Record(double value, const TagsType &tags) {
return;
}

// NOTE(lingxuan.zlx): Double check for recording performance while
// processing in multithread and avoid race since metrics may invoke
// record in different threads or code pathes.
if (measure_ == nullptr) {
// Measure could be registered before, so we try to get it first.
MeasureDouble registered_measure =
opencensus::stats::MeasureRegistry::GetMeasureDoubleByName(name_);

if (registered_measure.IsValid()) {
measure_.reset(new MeasureDouble(registered_measure));
} else {
measure_.reset(
new MeasureDouble(MeasureDouble::Register(name_, description_, unit_)));
absl::MutexLock lock(&registration_mutex_);
if (measure_ == nullptr) {
// Measure could be registered before, so we try to get it first.
MeasureDouble registered_measure =
opencensus::stats::MeasureRegistry::GetMeasureDoubleByName(name_);

if (registered_measure.IsValid()) {
measure_.reset(new MeasureDouble(registered_measure));
} else {
measure_.reset(
new MeasureDouble(MeasureDouble::Register(name_, description_, unit_)));
}
RegisterView();
}

RegisterView();
}

// Do record.
Expand Down
3 changes: 3 additions & 0 deletions src/ray/stats/metric.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ class Metric {
std::vector<opencensus::tags::TagKey> tag_keys_;
std::unique_ptr<opencensus::stats::Measure<double>> measure_;

// For making sure thread-safe to all of metric registrations.
static absl::Mutex registration_mutex_;

}; // class Metric

class Gauge : public Metric {
Expand Down
32 changes: 32 additions & 0 deletions src/ray/stats/stats_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,38 @@ TEST_F(StatsTest, InitializationTest) {
ASSERT_TRUE(new_first_tag.second == test_tag_value_that_shouldnt_be_applied);
}

TEST(Metric, MultiThreadMetricRegisterViewTest) {
ray::stats::Shutdown();
std::shared_ptr<stats::MetricExporterClient> exporter(
new stats::StdoutExporterClient());
ray::stats::Init({}, MetricsAgentPort, exporter);
std::vector<std::thread> threads;
const stats::TagKeyType tag1 = stats::TagKeyType::Register("k1");
const stats::TagKeyType tag2 = stats::TagKeyType::Register("k2");
for (int index = 0; index < 10; ++index) {
threads.emplace_back([tag1, tag2, index]() {
for (int i = 0; i < 100; i++) {
stats::Count random_counter(
"ray.random.counter" + std::to_string(index) + std::to_string(i), "", "",
{tag1, tag2});
random_counter.Record(i);
stats::Gauge random_gauge(
"ray.random.gauge" + std::to_string(index) + std::to_string(i), "", "",
{tag1, tag2});
random_gauge.Record(i);
stats::Sum random_sum(
"ray.random.sum" + std::to_string(index) + std::to_string(i), "", "",
{tag1, tag2});
random_sum.Record(i);
}
});
}
for (auto &thread : threads) {
thread.join();
}
ray::stats::Shutdown();
}

TEST_F(StatsTest, MultiThreadedInitializationTest) {
// Make sure stats module is thread-safe.
// Shutdown the stats module first.
Expand Down

0 comments on commit f9f2bfa

Please sign in to comment.