Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GCS]Support gcs client job&actor&node table subscribe idempotent #9424

159 changes: 113 additions & 46 deletions src/ray/gcs/gcs_client/service_based_accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,20 @@
namespace ray {
namespace gcs {

bool IdempotentFilter::Filter(const std::string &id, int64_t timestamp) {
auto it = cache_.find(id);
if (it == cache_.end()) {
cache_[id] = timestamp;
} else if (it->second < timestamp) {
it->second = timestamp;
} else {
return false;
}
return true;
}

void IdempotentFilter::Remove(const std::string &id) { cache_.erase(id); }

ServiceBasedJobInfoAccessor::ServiceBasedJobInfoAccessor(
ServiceBasedGcsClient *client_impl)
: client_impl_(client_impl) {}
Expand Down Expand Up @@ -62,36 +76,58 @@ Status ServiceBasedJobInfoAccessor::AsyncMarkFinished(const JobID &job_id,
Status ServiceBasedJobInfoAccessor::AsyncSubscribeAll(
const SubscribeCallback<JobID, JobTableData> &subscribe, const StatusCallback &done) {
RAY_CHECK(subscribe != nullptr);
fetch_all_data_operation_ = [this, subscribe](const StatusCallback &done) {
auto callback = [subscribe, done](

auto filtered_subscribe = [this, subscribe](const JobTableData &data) {
if (subscribe_filter_.Filter(data.job_id(), data.timestamp())) {
subscribe(JobID::FromBinary(data.job_id()), data);
}
};

fetch_all_data_operation_ = [this,
filtered_subscribe](const StatusCallback &fetch_done) {
auto callback = [filtered_subscribe, fetch_done](
const Status &status,
const std::vector<rpc::JobTableData> &job_info_list) {
for (auto &job_info : job_info_list) {
subscribe(JobID::FromBinary(job_info.job_id()), job_info);
filtered_subscribe(job_info);
}
if (done) {
done(status);
if (fetch_done) {
fetch_done(status);
}
};
RAY_CHECK_OK(AsyncGetAll(callback));
};
subscribe_operation_ = [this, subscribe](const StatusCallback &done) {
auto on_subscribe = [subscribe](const std::string &id, const std::string &data) {

subscribe_operation_ = [this,
filtered_subscribe](const StatusCallback &subscribe_done) {
auto on_subscribe = [filtered_subscribe](const std::string &id,
const std::string &data) {
JobTableData job_data;
job_data.ParseFromString(data);
subscribe(JobID::FromBinary(id), job_data);
filtered_subscribe(job_data);
};
return client_impl_->GetGcsPubSub().SubscribeAll(JOB_CHANNEL, on_subscribe, done);
return client_impl_->GetGcsPubSub().SubscribeAll(JOB_CHANNEL, on_subscribe,
subscribe_done);
};

return subscribe_operation_(
[this, done](const Status &status) { fetch_all_data_operation_(done); });
}

void ServiceBasedJobInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restarted) {
RAY_LOG(INFO) << "Reestablishing subscription for job info.";
// If the pub-sub server has restarted, we need to resubscribe to the pub-sub server.
if (subscribe_operation_ != nullptr && is_pubsub_server_restarted) {
RAY_CHECK_OK(subscribe_operation_(nullptr));
// If only the GCS sever has restarted, we only need to fetch data from the GCS server.
// If the pub-sub server has also restarted, we need to resubscribe to the pub-sub
// server first, then fetch data from the GCS server.
if (is_pubsub_server_restarted) {
if (subscribe_operation_ != nullptr) {
RAY_CHECK_OK(subscribe_operation_(
[this](const Status &status) { fetch_all_data_operation_(nullptr); }));
}
} else {
if (fetch_all_data_operation_ != nullptr) {
fetch_all_data_operation_(nullptr);
}
}
}

Expand Down Expand Up @@ -241,27 +277,38 @@ Status ServiceBasedActorInfoAccessor::AsyncSubscribeAll(
const SubscribeCallback<ActorID, rpc::ActorTableData> &subscribe,
const StatusCallback &done) {
RAY_CHECK(subscribe != nullptr);
fetch_all_data_operation_ = [this, subscribe](const StatusCallback &done) {
auto callback = [subscribe, done](

auto filtered_subscribe = [this, subscribe](const ActorTableData &data) {
if (subscribe_all_filter_.Filter(data.actor_id(), data.timestamp())) {
subscribe(ActorID::FromBinary(data.actor_id()), data);
}
};

fetch_all_data_operation_ = [this,
filtered_subscribe](const StatusCallback &fetch_done) {
auto callback = [filtered_subscribe, fetch_done](
const Status &status,
const std::vector<rpc::ActorTableData> &actor_info_list) {
for (auto &actor_info : actor_info_list) {
subscribe(ActorID::FromBinary(actor_info.actor_id()), actor_info);
filtered_subscribe(actor_info);
}
if (done) {
done(status);
if (fetch_done) {
fetch_done(status);
}
};
RAY_CHECK_OK(AsyncGetAll(callback));
};

subscribe_all_operation_ = [this, subscribe](const StatusCallback &done) {
auto on_subscribe = [subscribe](const std::string &id, const std::string &data) {
subscribe_all_operation_ = [this,
filtered_subscribe](const StatusCallback &subscribe_done) {
auto on_subscribe = [filtered_subscribe](const std::string &id,
const std::string &data) {
ActorTableData actor_data;
actor_data.ParseFromString(data);
subscribe(ActorID::FromBinary(actor_data.actor_id()), actor_data);
filtered_subscribe(actor_data);
};
return client_impl_->GetGcsPubSub().SubscribeAll(ACTOR_CHANNEL, on_subscribe, done);
return client_impl_->GetGcsPubSub().SubscribeAll(ACTOR_CHANNEL, on_subscribe,
subscribe_done);
};

return subscribe_all_operation_(
Expand All @@ -275,13 +322,19 @@ Status ServiceBasedActorInfoAccessor::AsyncSubscribe(
RAY_LOG(DEBUG) << "Subscribing update operations of actor, actor id = " << actor_id;
RAY_CHECK(subscribe != nullptr) << "Failed to subscribe actor, actor id = " << actor_id;

auto filtered_subscribe = [this, subscribe](const ActorTableData &data) {
if (subscribe_filter_.Filter(data.actor_id(), data.timestamp())) {
subscribe(ActorID::FromBinary(data.actor_id()), data);
}
};

auto fetch_data_operation = [this, actor_id,
subscribe](const StatusCallback &fetch_done) {
auto callback = [actor_id, subscribe, fetch_done](
filtered_subscribe](const StatusCallback &fetch_done) {
auto callback = [filtered_subscribe, fetch_done](
const Status &status,
const boost::optional<rpc::ActorTableData> &result) {
if (result) {
subscribe(actor_id, *result);
filtered_subscribe(*result);
}
if (fetch_done) {
fetch_done(status);
Expand All @@ -291,11 +344,12 @@ Status ServiceBasedActorInfoAccessor::AsyncSubscribe(
};

auto subscribe_operation = [this, actor_id,
subscribe](const StatusCallback &subscribe_done) {
auto on_subscribe = [subscribe](const std::string &id, const std::string &data) {
filtered_subscribe](const StatusCallback &subscribe_done) {
auto on_subscribe = [filtered_subscribe](const std::string &id,
const std::string &data) {
ActorTableData actor_data;
actor_data.ParseFromString(data);
subscribe(ActorID::FromBinary(actor_data.actor_id()), actor_data);
filtered_subscribe(actor_data);
};
return client_impl_->GetGcsPubSub().Subscribe(ACTOR_CHANNEL, actor_id.Hex(),
on_subscribe, subscribe_done);
Expand All @@ -312,6 +366,7 @@ Status ServiceBasedActorInfoAccessor::AsyncUnsubscribe(const ActorID &actor_id)
auto status = client_impl_->GetGcsPubSub().Unsubscribe(ACTOR_CHANNEL, actor_id.Hex());
subscribe_operations_.erase(actor_id);
fetch_data_operations_.erase(actor_id);
subscribe_filter_.Remove(actor_id.Binary());
RAY_LOG(DEBUG) << "Finished cancelling subscription to an actor, actor id = "
<< actor_id;
return status;
Expand Down Expand Up @@ -531,26 +586,37 @@ Status ServiceBasedNodeInfoAccessor::AsyncSubscribeToNodeChange(
RAY_CHECK(node_change_callback_ == nullptr);
node_change_callback_ = subscribe;

fetch_node_data_operation_ = [this](const StatusCallback &done) {
auto callback = [this, done](const Status &status,
const std::vector<GcsNodeInfo> &node_info_list) {
auto filtered_subscribe = [this](const GcsNodeInfo &data) {
if (subscribe_node_filter_.Filter(data.node_id(), data.timestamp())) {
HandleNotification(data);
}
};

fetch_node_data_operation_ = [this,
filtered_subscribe](const StatusCallback &fetch_done) {
auto callback = [filtered_subscribe, fetch_done](
const Status &status,
const std::vector<GcsNodeInfo> &node_info_list) {
for (auto &node_info : node_info_list) {
HandleNotification(node_info);
filtered_subscribe(node_info);
}
if (done) {
done(status);
if (fetch_done) {
fetch_done(status);
}
};
RAY_CHECK_OK(AsyncGetAll(callback));
};

subscribe_node_operation_ = [this](const StatusCallback &done) {
auto on_subscribe = [this](const std::string &id, const std::string &data) {
subscribe_node_operation_ = [this,
filtered_subscribe](const StatusCallback &subscribe_done) {
auto on_subscribe = [filtered_subscribe](const std::string &id,
const std::string &data) {
GcsNodeInfo node_info;
node_info.ParseFromString(data);
HandleNotification(node_info);
filtered_subscribe(node_info);
};
return client_impl_->GetGcsPubSub().SubscribeAll(NODE_CHANNEL, on_subscribe, done);
return client_impl_->GetGcsPubSub().SubscribeAll(NODE_CHANNEL, on_subscribe,
subscribe_done);
};

return subscribe_node_operation_([this, subscribe, done](const Status &status) {
Expand Down Expand Up @@ -704,15 +770,16 @@ Status ServiceBasedNodeInfoAccessor::AsyncSubscribeBatchHeartbeat(
const ItemCallback<rpc::HeartbeatBatchTableData> &subscribe,
const StatusCallback &done) {
RAY_CHECK(subscribe != nullptr);
subscribe_batch_heartbeat_operation_ = [this, subscribe](const StatusCallback &done) {
auto on_subscribe = [subscribe](const std::string &id, const std::string &data) {
rpc::HeartbeatBatchTableData heartbeat_batch_table_data;
heartbeat_batch_table_data.ParseFromString(data);
subscribe(heartbeat_batch_table_data);
};
return client_impl_->GetGcsPubSub().Subscribe(HEARTBEAT_BATCH_CHANNEL, "",
on_subscribe, done);
};
subscribe_batch_heartbeat_operation_ =
[this, subscribe](const StatusCallback &subscribe_done) {
auto on_subscribe = [subscribe](const std::string &id, const std::string &data) {
rpc::HeartbeatBatchTableData heartbeat_batch_table_data;
heartbeat_batch_table_data.ParseFromString(data);
subscribe(heartbeat_batch_table_data);
};
return client_impl_->GetGcsPubSub().Subscribe(HEARTBEAT_BATCH_CHANNEL, "",
on_subscribe, subscribe_done);
};
return subscribe_batch_heartbeat_operation_(done);
}

Expand Down
34 changes: 34 additions & 0 deletions src/ray/gcs/gcs_client/service_based_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,29 @@ using FetchDataOperation = std::function<void(const StatusCallback &done)>;

class ServiceBasedGcsClient;

/// \class IdempotentFilter
/// `IdempotentFilter` is used to filter duplicate or obsolete message.
/// It saves the timestamp of the latest message of each key, and compares the timestamp
/// to identify duplicate or obsolete messages.
class IdempotentFilter {
public:
/// Check whether the message is duplicate or obsolete. Non-thread safe.
///
/// \param id The id of message.
/// \param timestamp The timestamp of message.
/// \return If message is duplicate or obsolete, return false, otherwise return true.
bool Filter(const std::string &id, int64_t timestamp);

/// Remove the specified id of message. Non-thread safe.
///
/// \param id The id of message.
void Remove(const std::string &id);

private:
/// A cache of the current latest message timestamp for each ID.
std::unordered_map<std::string, int64_t> cache_;
};

/// \class ServiceBasedJobInfoAccessor
/// ServiceBasedJobInfoAccessor is an implementation of `JobInfoAccessor`
/// that uses GCS Service as the backend.
Expand Down Expand Up @@ -59,7 +82,13 @@ class ServiceBasedJobInfoAccessor : public JobInfoAccessor {
/// server restarts from a failure.
SubscribeOperation subscribe_operation_;

/// Save the fetch data operation in this function, so we can call it again when GCS
/// server restarts from a failure.
FetchDataOperation fetch_data_operation_;

ServiceBasedGcsClient *client_impl_;

IdempotentFilter subscribe_filter_;
};

/// \class ServiceBasedActorInfoAccessor
Expand Down Expand Up @@ -133,6 +162,9 @@ class ServiceBasedActorInfoAccessor : public ActorInfoAccessor {
ServiceBasedGcsClient *client_impl_;

Sequencer<ActorID> sequencer_;

IdempotentFilter subscribe_all_filter_;
IdempotentFilter subscribe_filter_;
};

/// \class ServiceBasedNodeInfoAccessor
Expand Down Expand Up @@ -218,6 +250,8 @@ class ServiceBasedNodeInfoAccessor : public NodeInfoAccessor {
/// server restarts from a failure.
FetchDataOperation fetch_node_data_operation_;

IdempotentFilter subscribe_node_filter_;

void HandleNotification(const GcsNodeInfo &node_info);

ServiceBasedGcsClient *client_impl_;
Expand Down
Loading