Skip to content

Commit

Permalink
[GCS] Add StatsInfoAccessor to GCS Client (ray-project#6748)
Browse files Browse the repository at this point in the history
  • Loading branch information
micafan authored and zhijunfu committed Jan 10, 2020
1 parent 4097d07 commit ce8170d
Show file tree
Hide file tree
Showing 13 changed files with 94 additions and 41 deletions.
33 changes: 21 additions & 12 deletions src/ray/core_worker/profiling.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,40 @@ Profiler::Profiler(WorkerContext &worker_context, const std::string &node_ip_add
const std::shared_ptr<gcs::RedisGcsClient> &gcs_client)
: io_service_(io_service),
timer_(io_service_, boost::asio::chrono::seconds(1)),
rpc_profile_data_(new rpc::ProfileTableData()),
gcs_client_(gcs_client) {
rpc_profile_data_.set_component_type(WorkerTypeString(worker_context.GetWorkerType()));
rpc_profile_data_.set_component_id(worker_context.GetWorkerID().Binary());
rpc_profile_data_.set_node_ip_address(node_ip_address);
rpc_profile_data_->set_component_type(WorkerTypeString(worker_context.GetWorkerType()));
rpc_profile_data_->set_component_id(worker_context.GetWorkerID().Binary());
rpc_profile_data_->set_node_ip_address(node_ip_address);
timer_.async_wait(boost::bind(&Profiler::FlushEvents, this));
}

void Profiler::AddEvent(const rpc::ProfileTableData::ProfileEvent &event) {
absl::MutexLock lock(&mutex_);
rpc_profile_data_.add_profile_events()->CopyFrom(event);
rpc_profile_data_->add_profile_events()->CopyFrom(event);
}

void Profiler::FlushEvents() {
absl::MutexLock lock(&mutex_);
if (rpc_profile_data_.profile_events_size() != 0) {
// TODO(edoakes): this should be migrated to use the new GCS client interface
// instead of the raw table interface once it's ready.
if (!gcs_client_->profile_table().AddProfileEventBatch(rpc_profile_data_).ok()) {
auto cur_profile_data = std::make_shared<rpc::ProfileTableData>();
{
absl::MutexLock lock(&mutex_);
if (rpc_profile_data_->profile_events_size() != 0) {
cur_profile_data->set_component_type(rpc_profile_data_->component_type());
cur_profile_data->set_component_id(rpc_profile_data_->component_id());
cur_profile_data->set_node_ip_address(rpc_profile_data_->node_ip_address());
rpc_profile_data_.swap(cur_profile_data);
}
}

if (cur_profile_data->profile_events_size() != 0) {
if (!gcs_client_->Stats().AsyncAddProfileData(cur_profile_data, nullptr).ok()) {
RAY_LOG(WARNING) << "Failed to push profile events to GCS.";
} else {
RAY_LOG(DEBUG) << "Pushed " << rpc_profile_data_.profile_events_size()
<< "events to GCS.";
RAY_LOG(DEBUG) << "Pushed " << cur_profile_data->profile_events_size()
<< " events to GCS.";
}
rpc_profile_data_.clear_profile_events();
}

// Reset the timer to 1 second from the previous expiration time to avoid drift.
timer_.expires_at(timer_.expiry() + boost::asio::chrono::seconds(1));
timer_.async_wait(boost::bind(&Profiler::FlushEvents, this));
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/profiling.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class Profiler {

// RPC message containing profiling data. Holds the queue of profile events
// until they are flushed.
rpc::ProfileTableData rpc_profile_data_ GUARDED_BY(mutex_);
std::shared_ptr<rpc::ProfileTableData> rpc_profile_data_ GUARDED_BY(mutex_);

// Client to the GCS used to push profile events to it.
std::shared_ptr<gcs::RedisGcsClient> gcs_client_;
Expand Down
16 changes: 16 additions & 0 deletions src/ray/gcs/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,22 @@ class NodeInfoAccessor {
NodeInfoAccessor() = default;
};

/// \class StatsInfoAccessor
/// `StatsInfoAccessor` is a sub-interface of `GcsClient`.
/// This class includes all the methods that are related to accessing
/// stats in the GCS.
class StatsInfoAccessor {
public:
virtual ~StatsInfoAccessor() = default;

virtual Status AsyncAddProfileData(
const std::shared_ptr<rpc::ProfileTableData> &data_ptr,
const StatusCallback &callback) = 0;

protected:
StatsInfoAccessor() = default;
};

} // namespace gcs

} // namespace ray
Expand Down
8 changes: 8 additions & 0 deletions src/ray/gcs/gcs_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ class GcsClient : public std::enable_shared_from_this<GcsClient> {
return *task_accessor_;
}

/// Get the sub-interface for accessing stats in GCS.
/// This function is thread safe.
StatsInfoAccessor &Stats() {
RAY_CHECK(stats_accessor_ != nullptr);
return *stats_accessor_;
}

protected:
/// Constructor of GcsClient.
///
Expand All @@ -111,6 +118,7 @@ class GcsClient : public std::enable_shared_from_this<GcsClient> {
std::unique_ptr<ObjectInfoAccessor> object_accessor_;
std::unique_ptr<NodeInfoAccessor> node_accessor_;
std::unique_ptr<TaskInfoAccessor> task_accessor_;
std::unique_ptr<StatsInfoAccessor> stats_accessor_;
};

} // namespace gcs
Expand Down
15 changes: 15 additions & 0 deletions src/ray/gcs/redis_accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,21 @@ Status RedisNodeInfoAccessor::AsyncSubscribeToResources(
return resource_sub_executor_.AsyncSubscribeAll(ClientID::Nil(), subscribe, done);
}

RedisStatsInfoAccessor::RedisStatsInfoAccessor(RedisGcsClient *client_impl)
: client_impl_(client_impl) {}

Status RedisStatsInfoAccessor::AsyncAddProfileData(
const std::shared_ptr<ProfileTableData> &data_ptr, const StatusCallback &callback) {
ProfileTable::WriteCallback on_done = nullptr;
if (callback != nullptr) {
on_done = [callback](RedisGcsClient *client, const UniqueID &id,
const ProfileTableData &data) { callback(Status::OK()); };
}

ProfileTable &profile_table = client_impl_->profile_table();
return profile_table.Append(JobID::Nil(), UniqueID::FromRandom(), data_ptr, on_done);
}

} // namespace gcs

} // namespace ray
16 changes: 16 additions & 0 deletions src/ray/gcs/redis_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,22 @@ class RedisNodeInfoAccessor : public NodeInfoAccessor {
HeartbeatBatchSubscriptionExecutor heartbeat_batch_sub_executor_;
};

/// \class RedisStatsInfoAccessor
/// RedisStatsInfoAccessor is an implementation of `StatsInfoAccessor`
/// that uses Redis as the backend storage.
class RedisStatsInfoAccessor : public StatsInfoAccessor {
public:
explicit RedisStatsInfoAccessor(RedisGcsClient *client_impl);

virtual ~RedisStatsInfoAccessor() = default;

Status AsyncAddProfileData(const std::shared_ptr<ProfileTableData> &data_ptr,
const StatusCallback &callback) override;

private:
RedisGcsClient *client_impl_{nullptr};
};

} // namespace gcs

} // namespace ray
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/redis_gcs_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ Status RedisGcsClient::Connect(boost::asio::io_service &io_service) {
object_accessor_.reset(new RedisObjectInfoAccessor(this));
node_accessor_.reset(new RedisNodeInfoAccessor(this));
task_accessor_.reset(new RedisTaskInfoAccessor(this));
stats_accessor_.reset(new RedisStatsInfoAccessor(this));

is_connected_ = true;

Expand Down
6 changes: 4 additions & 2 deletions src/ray/gcs/redis_gcs_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class RAY_EXPORT RedisGcsClient : public GcsClient {
friend class RedisTaskInfoAccessor;
friend class RedisNodeInfoAccessor;
friend class RedisObjectInfoAccessor;
friend class RedisStatsInfoAccessor;
friend class SubscriptionExecutorTest;
friend class LogSubscribeTestHelper;
friend class LogLookupTestHelper;
Expand Down Expand Up @@ -65,7 +66,6 @@ class RAY_EXPORT RedisGcsClient : public GcsClient {

// TODO: Some API for getting the error on the driver
ErrorTable &error_table();
ProfileTable &profile_table();

// We also need something to export generic code to run on workers from the
// driver (to set the PYTHONPATH)
Expand Down Expand Up @@ -94,7 +94,7 @@ class RAY_EXPORT RedisGcsClient : public GcsClient {
ActorCheckpointIdTable &actor_checkpoint_id_table();
/// This method will be deprecated, use method Jobs() instead.
JobTable &job_table();
/// This method will be deprecated, use method Objects() instead
/// This method will be deprecated, use method Objects() instead.
ObjectTable &object_table();
/// The following four methods will be deprecated, use method Nodes() instead.
ClientTable &client_table();
Expand All @@ -105,6 +105,8 @@ class RAY_EXPORT RedisGcsClient : public GcsClient {
raylet::TaskTable &raylet_task_table();
TaskLeaseTable &task_lease_table();
TaskReconstructionLog &task_reconstruction_log();
/// This method will be deprecated, use method Stats() instead.
ProfileTable &profile_table();

// GCS command type. If CommandType::kChain, chain-replicated versions of the tables
// might be used, if available.
Expand Down
8 changes: 0 additions & 8 deletions src/ray/gcs/tables.cc
Original file line number Diff line number Diff line change
Expand Up @@ -529,14 +529,6 @@ std::string ErrorTable::DebugString() const {
return Log<JobID, ErrorTableData>::DebugString();
}

Status ProfileTable::AddProfileEventBatch(const ProfileTableData &profile_events) {
// TODO(hchen): Change the parameter to shared_ptr to avoid copying data.
auto data = std::make_shared<ProfileTableData>();
data->CopyFrom(profile_events);
return Append(JobID::Nil(), UniqueID::FromRandom(), data,
/*done_callback=*/nullptr);
}

std::string ProfileTable::DebugString() const {
return Log<UniqueID, ProfileTableData>::DebugString();
}
Expand Down
8 changes: 1 addition & 7 deletions src/ray/gcs/tables.h
Original file line number Diff line number Diff line change
Expand Up @@ -842,20 +842,14 @@ class ErrorTable : private Log<JobID, ErrorTableData> {
std::string DebugString() const;
};

class ProfileTable : private Log<UniqueID, ProfileTableData> {
class ProfileTable : public Log<UniqueID, ProfileTableData> {
public:
ProfileTable(const std::vector<std::shared_ptr<RedisContext>> &contexts,
RedisGcsClient *client)
: Log(contexts, client) {
prefix_ = TablePrefix::PROFILE;
};

/// Add a batch of profiling events to the profile table.
///
/// \param profile_events The profile events to record.
/// \return Status.
Status AddProfileEventBatch(const ProfileTableData &profile_events);

/// Returns debug string for class.
///
/// \return string.
Expand Down
10 changes: 5 additions & 5 deletions src/ray/object_manager/object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -804,15 +804,15 @@ std::shared_ptr<rpc::ObjectManagerClient> ObjectManager::GetRpcClient(
return it->second;
}

rpc::ProfileTableData ObjectManager::GetAndResetProfilingInfo() {
rpc::ProfileTableData profile_info;
profile_info.set_component_type("object_manager");
profile_info.set_component_id(self_node_id_.Binary());
std::shared_ptr<rpc::ProfileTableData> ObjectManager::GetAndResetProfilingInfo() {
auto profile_info = std::make_shared<rpc::ProfileTableData>();
profile_info->set_component_type("object_manager");
profile_info->set_component_id(self_node_id_.Binary());

{
std::lock_guard<std::mutex> lock(profile_mutex_);
for (auto const &profile_event : profile_events_) {
profile_info.add_profile_events()->CopyFrom(profile_event);
profile_info->add_profile_events()->CopyFrom(profile_event);
}
profile_events_.clear();
}
Expand Down
2 changes: 1 addition & 1 deletion src/ray/object_manager/object_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ class ObjectManager : public ObjectManagerInterface,
///
/// \return All profiling information that has accumulated since the last call
/// to this method.
rpc::ProfileTableData GetAndResetProfilingInfo();
std::shared_ptr<rpc::ProfileTableData> GetAndResetProfilingInfo();

/// Returns debug string for class.
///
Expand Down
10 changes: 5 additions & 5 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -402,8 +402,8 @@ void NodeManager::GetObjectManagerProfileInfo() {

auto profile_info = object_manager_.GetAndResetProfilingInfo();

if (profile_info.profile_events_size() > 0) {
RAY_CHECK_OK(gcs_client_->profile_table().AddProfileEventBatch(profile_info));
if (profile_info->profile_events_size() > 0) {
RAY_CHECK_OK(gcs_client_->Stats().AsyncAddProfileData(profile_info, nullptr));
}

// Reset the timer.
Expand Down Expand Up @@ -905,10 +905,10 @@ void NodeManager::ProcessClientMessage(
} break;
case protocol::MessageType::PushProfileEventsRequest: {
auto fbs_message = flatbuffers::GetRoot<flatbuffers::String>(message_data);
rpc::ProfileTableData profile_table_data;
auto profile_table_data = std::make_shared<rpc::ProfileTableData>();
RAY_CHECK(
profile_table_data.ParseFromArray(fbs_message->data(), fbs_message->size()));
RAY_CHECK_OK(gcs_client_->profile_table().AddProfileEventBatch(profile_table_data));
profile_table_data->ParseFromArray(fbs_message->data(), fbs_message->size()));
RAY_CHECK_OK(gcs_client_->Stats().AsyncAddProfileData(profile_table_data, nullptr));
} break;
case protocol::MessageType::FreeObjectsInObjectStoreRequest: {
auto message = flatbuffers::GetRoot<protocol::FreeObjectsRequest>(message_data);
Expand Down

0 comments on commit ce8170d

Please sign in to comment.