Skip to content

Commit

Permalink
[core] Change most boost::optional to std::optional (ray-project#46779)
Browse files Browse the repository at this point in the history
In Ray codebase we have `std::optional`, `boost::optional` and
`absl::optional`. This PR removes most of the boost one.

Its existence is mostly due to Ray predates `std::optional`. Now we are
at C++17 there's little reason to continue to use the Boost one. There
are only 2 use cases left:

1. Ray C++ API uses it, keeping backward compatibility.
2. `boost::optional<const rpc::JobConfig &> GetJobConfig(const JobID
&job_id) const`, keeping because the std one does not support
references.

This PR helps because we have std::optional bindings in Cython
(`python/ray/includes/optional.pxd`) but not for boost::optional.

Signed-off-by: Ruiyang Wang <rywang014@gmail.com>
  • Loading branch information
rynewang authored Jul 25, 2024
1 parent 42eb499 commit e853826
Show file tree
Hide file tree
Showing 24 changed files with 57 additions and 60 deletions.
1 change: 0 additions & 1 deletion src/ray/common/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#pragma once

#include <boost/optional.hpp>
#include <functional>
#include <future>
#include <string>
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1505,7 +1505,7 @@ Status CoreWorker::ExperimentalRegisterMutableObjectReaderRemote(
.AsyncGet(reader_actor,
[&addr, &promise](
Status status,
const boost::optional<rpc::ActorTableData> &result) {
const std::optional<rpc::ActorTableData> &result) {
RAY_CHECK(result);
if (result) {
addr.set_ip_address(result->address().ip_address());
Expand Down
4 changes: 2 additions & 2 deletions src/ray/gcs/callback.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

#pragma once

#include <boost/optional/optional.hpp>
#include <optional>
#include <vector>

#include "ray/common/status.h"
Expand All @@ -36,7 +36,7 @@ using StatusCallback = std::function<void(Status status)>;
/// this optional object is empty.
template <typename Data>
using OptionalItemCallback =
std::function<void(Status status, const boost::optional<Data> &result)>;
std::function<void(Status status, const std::optional<Data> &result)>;

/// This callback is used to receive multiple items from GCS when a read completes.
/// \param status Status indicates whether the read was successful.
Expand Down
24 changes: 12 additions & 12 deletions src/ray/gcs/gcs_client/accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ Status ActorInfoAccessor::AsyncGet(
if (reply.has_actor_table_data()) {
callback(status, reply.actor_table_data());
} else {
callback(status, boost::none);
callback(status, std::nullopt);
}
RAY_LOG(DEBUG) << "Finished getting actor info, status = " << status
<< ", actor id = " << actor_id
Expand Down Expand Up @@ -210,7 +210,7 @@ Status ActorInfoAccessor::AsyncGetByName(
if (reply.has_actor_table_data()) {
callback(status, reply.actor_table_data());
} else {
callback(status, boost::none);
callback(status, std::nullopt);
}
RAY_LOG(DEBUG) << "Finished getting actor info, status = " << status
<< ", name = " << name;
Expand Down Expand Up @@ -249,7 +249,7 @@ Status ActorInfoAccessor::AsyncListNamedActors(
request,
[callback](const Status &status, const rpc::ListNamedActorsReply &reply) {
if (!status.ok()) {
callback(status, boost::none);
callback(status, std::nullopt);
} else {
callback(status, VectorFromProtobuf(reply.named_actors_list()));
}
Expand Down Expand Up @@ -359,7 +359,7 @@ Status ActorInfoAccessor::AsyncSubscribe(
[this, actor_id, subscribe](const StatusCallback &fetch_done) {
auto callback = [actor_id, subscribe, fetch_done](
const Status &status,
const boost::optional<rpc::ActorTableData> &result) {
const std::optional<rpc::ActorTableData> &result) {
if (result) {
subscribe(actor_id, *result);
}
Expand Down Expand Up @@ -945,7 +945,7 @@ Status WorkerInfoAccessor::AsyncGet(
if (reply.has_worker_table_data()) {
callback(status, reply.worker_table_data());
} else {
callback(status, boost::none);
callback(status, std::nullopt);
}
RAY_LOG(DEBUG) << "Finished getting worker info, worker id = " << worker_id;
});
Expand Down Expand Up @@ -1059,7 +1059,7 @@ Status PlacementGroupInfoAccessor::AsyncGet(
if (reply.has_placement_group_table_data()) {
callback(status, reply.placement_group_table_data());
} else {
callback(status, boost::none);
callback(status, std::nullopt);
}
RAY_LOG(DEBUG) << "Finished getting placement group info, placement group id = "
<< placement_group_id;
Expand All @@ -1083,7 +1083,7 @@ Status PlacementGroupInfoAccessor::AsyncGetByName(
if (reply.has_placement_group_table_data()) {
callback(status, reply.placement_group_table_data());
} else {
callback(status, boost::none);
callback(status, std::nullopt);
}
RAY_LOG(DEBUG) << "Finished getting named placement group info, status = "
<< status << ", name = " << name;
Expand Down Expand Up @@ -1133,7 +1133,7 @@ Status InternalKVAccessor::AsyncInternalKVGet(
req,
[callback](const Status &status, const rpc::InternalKVGetReply &reply) {
if (reply.status().code() == (int)StatusCode::NotFound) {
callback(status, boost::none);
callback(status, std::nullopt);
} else {
callback(status, reply.value());
}
Expand Down Expand Up @@ -1239,7 +1239,7 @@ Status InternalKVAccessor::AsyncInternalKVKeys(
req,
[callback](const Status &status, const rpc::InternalKVKeysReply &reply) {
if (!status.ok()) {
callback(status, boost::none);
callback(status, std::nullopt);
} else {
callback(status, VectorFromProtobuf(reply.results()));
}
Expand All @@ -1261,7 +1261,7 @@ Status InternalKVAccessor::Put(const std::string &ns,
value,
overwrite,
timeout_ms,
[&ret_promise, &added](Status status, boost::optional<int> added_num) {
[&ret_promise, &added](Status status, std::optional<int> added_num) {
added = static_cast<bool>(added_num.value_or(0));
ret_promise.set_value(status);
}));
Expand Down Expand Up @@ -1324,7 +1324,7 @@ Status InternalKVAccessor::Del(const std::string &ns,
key,
del_by_prefix,
timeout_ms,
[&ret_promise, &num_deleted](Status status, const boost::optional<int> &value) {
[&ret_promise, &num_deleted](Status status, const std::optional<int> &value) {
num_deleted = value.value_or(0);
ret_promise.set_value(status);
}));
Expand All @@ -1340,7 +1340,7 @@ Status InternalKVAccessor::Exists(const std::string &ns,
ns,
key,
timeout_ms,
[&ret_promise, &exists](Status status, const boost::optional<bool> &value) {
[&ret_promise, &exists](Status status, const std::optional<bool> &value) {
exists = value.value_or(false);
ret_promise.set_value(status);
}));
Expand Down
4 changes: 2 additions & 2 deletions src/ray/gcs/gcs_client/global_state_accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ uint32_t GlobalStateAccessor::GetWorkerDebuggerPort(const WorkerID &worker_id) {
RAY_CHECK_OK(gcs_client_->Workers().AsyncGet(
worker_id,
[&promise](const Status &status,
const boost::optional<rpc::WorkerTableData> &result) {
const std::optional<rpc::WorkerTableData> &result) {
RAY_CHECK_OK(status);
if (result.has_value()) {
promise.set_value(result->debugger_port());
Expand Down Expand Up @@ -378,7 +378,7 @@ std::string GlobalStateAccessor::GetSystemConfig() {
absl::ReaderMutexLock lock(&mutex_);
RAY_CHECK_OK(gcs_client_->Nodes().AsyncGetInternalConfig(
[&promise](const Status &status,
const boost::optional<std::string> &stored_raylet_config) {
const std::optional<std::string> &stored_raylet_config) {
RAY_CHECK_OK(status);
promise.set_value(*stored_raylet_config);
}));
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_client/global_state_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ class GlobalStateAccessor {
template <class DATA>
OptionalItemCallback<DATA> TransformForOptionalItemCallback(
std::unique_ptr<std::string> &data, std::promise<bool> &promise) {
return [&data, &promise](const Status &status, const boost::optional<DATA> &result) {
return [&data, &promise](const Status &status, const std::optional<DATA> &result) {
RAY_CHECK_OK(status);
if (result) {
data.reset(new std::string(result->SerializeAsString()));
Expand Down
4 changes: 2 additions & 2 deletions src/ray/gcs/gcs_client/test/gcs_client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,8 @@ class GcsClientTest : public ::testing::TestWithParam<bool> {
rpc::ActorTableData actor_table_data;
RAY_CHECK_OK(gcs_client_->Actors().AsyncGet(
actor_id,
[&actor_table_data, &promise](
Status status, const boost::optional<rpc::ActorTableData> &result) {
[&actor_table_data, &promise](Status status,
const std::optional<rpc::ActorTableData> &result) {
assert(result);
actor_table_data.CopyFrom(*result);
promise.set_value(true);
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_client/usage_stats_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ void UsageStatsClient::RecordExtraUsageTag(usage::TagKey key, const std::string
value,
/*overwrite=*/true,
GetGcsTimeoutMs(),
[](Status status, boost::optional<int> added_num) {
[](Status status, std::optional<int> added_num) {
if (!status.ok()) {
RAY_LOG(DEBUG) << "Failed to put extra usage tag, status = " << status;
}
Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/gcs_job_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ void GcsJobManager::HandleMarkJobFinished(rpc::MarkJobFinishedRequest request,
Status status = gcs_table_storage_->JobTable().Get(
job_id,
[this, job_id, send_reply](Status status,
const boost::optional<rpc::JobTableData> &result) {
const std::optional<rpc::JobTableData> &result) {
if (status.ok() && result) {
MarkJobAsFinished(*result, send_reply);
} else {
Expand Down
4 changes: 2 additions & 2 deletions src/ray/gcs/gcs_server/gcs_node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,9 @@ void GcsNodeManager::HandleGetInternalConfig(rpc::GetInternalConfigRequest reque
rpc::SendReplyCallback send_reply_callback) {
auto get_system_config = [reply, send_reply_callback](
const ray::Status &status,
const boost::optional<rpc::StoredConfig> &config) {
const std::optional<rpc::StoredConfig> &config) {
if (config.has_value()) {
reply->set_config(config.get().config());
reply->set_config(config->config());
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Expand Down
8 changes: 4 additions & 4 deletions src/ray/gcs/gcs_server/gcs_placement_group_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ void GcsPlacementGroupManager::HandleGetPlacementGroup(

auto on_done = [placement_group_id, reply, send_reply_callback](
const Status &status,
const boost::optional<PlacementGroupTableData> &result) {
const std::optional<PlacementGroupTableData> &result) {
if (result) {
reply->mutable_placement_group_table_data()->CopyFrom(*result);
}
Expand All @@ -572,7 +572,7 @@ void GcsPlacementGroupManager::HandleGetPlacementGroup(
Status status =
gcs_table_storage_->PlacementGroupTable().Get(placement_group_id, on_done);
if (!status.ok()) {
on_done(status, boost::none);
on_done(status, std::nullopt);
}
}
++counts_[CountType::GET_PLACEMENT_GROUP_REQUEST];
Expand Down Expand Up @@ -683,7 +683,7 @@ void GcsPlacementGroupManager::WaitPlacementGroup(
// Check whether the placement group does not exist or is removed.
auto on_done = [this, placement_group_id, callback](
const Status &status,
const boost::optional<PlacementGroupTableData> &result) {
const std::optional<PlacementGroupTableData> &result) {
if (result) {
RAY_LOG(DEBUG) << "Placement group is removed, placement group id = "
<< placement_group_id;
Expand All @@ -703,7 +703,7 @@ void GcsPlacementGroupManager::WaitPlacementGroup(
Status status =
gcs_table_storage_->PlacementGroupTable().Get(placement_group_id, on_done);
if (!status.ok()) {
on_done(status, boost::none);
on_done(status, std::nullopt);
}
} else if (iter->second->GetState() == rpc::PlacementGroupTableData::CREATED) {
RAY_LOG(DEBUG) << "Placement group is created, placement group id = "
Expand Down
4 changes: 2 additions & 2 deletions src/ray/gcs/gcs_server/gcs_table_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ template <typename Key, typename Data>
Status GcsTable<Key, Data>::Get(const Key &key,
const OptionalItemCallback<Data> &callback) {
auto on_done = [callback](const Status &status,
const boost::optional<std::string> &result) {
const std::optional<std::string> &result) {
if (!callback) {
return;
}
boost::optional<Data> value;
std::optional<Data> value;
if (result) {
Data data;
data.ParseFromString(*result);
Expand Down
17 changes: 8 additions & 9 deletions src/ray/gcs/gcs_server/gcs_worker_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ void GcsWorkerManager::HandleReportWorkerFailure(
worker_id = std::move(worker_id),
request = std::move(request),
worker_address =
std::move(worker_address)](const boost::optional<WorkerTableData> &result) {
std::move(worker_address)](const std::optional<WorkerTableData> &result) {
const auto node_id = NodeID::FromBinary(worker_address.raylet_id());
std::string message =
absl::StrCat("Reporting worker exit, worker id = ",
Expand Down Expand Up @@ -136,7 +136,7 @@ void GcsWorkerManager::HandleGetWorkerInfo(rpc::GetWorkerInfoRequest request,

GetWorkerInfo(worker_id,
[reply, send_reply_callback, worker_id = std::move(worker_id)](
const boost::optional<WorkerTableData> &result) {
const std::optional<WorkerTableData> &result) {
if (result) {
reply->mutable_worker_table_data()->CopyFrom(*result);
}
Expand Down Expand Up @@ -221,7 +221,7 @@ void GcsWorkerManager::HandleUpdateWorkerDebuggerPort(

auto on_worker_get_done =
[&, worker_id, reply, debugger_port, on_worker_update_done, send_reply_callback](
const Status &status, const boost::optional<WorkerTableData> &result) {
const Status &status, const std::optional<WorkerTableData> &result) {
if (!status.ok()) {
RAY_LOG(WARNING) << "Failed to get worker info, worker id = " << worker_id
<< ", status = " << status;
Expand Down Expand Up @@ -274,7 +274,7 @@ void GcsWorkerManager::HandleUpdateWorkerNumPausedThreads(
on_worker_update_done,
send_reply_callback](
const Status &status,
const boost::optional<WorkerTableData> &result) {
const std::optional<WorkerTableData> &result) {
if (!status.ok()) {
RAY_LOG(WARNING) << "Failed to get worker info, worker id = " << worker_id
<< ", status = " << status;
Expand Down Expand Up @@ -309,22 +309,21 @@ void GcsWorkerManager::AddWorkerDeadListener(

void GcsWorkerManager::GetWorkerInfo(
const WorkerID &worker_id,
std::function<void(const boost::optional<WorkerTableData> &)> callback) const {
std::function<void(const std::optional<WorkerTableData> &)> callback) const {
auto on_done = [worker_id, callback = std::move(callback)](
const Status &status,
const boost::optional<WorkerTableData> &result) {
const Status &status, const std::optional<WorkerTableData> &result) {
if (!status.ok()) {
RAY_LOG(WARNING) << "Failed to get worker info, worker id = " << worker_id
<< ", status = " << status;
callback(boost::none);
callback(std::nullopt);
} else {
callback(result);
}
};

Status status = gcs_table_storage_->WorkerTable().Get(worker_id, on_done);
if (!status.ok()) {
on_done(status, boost::none);
on_done(status, std::nullopt);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_server/gcs_worker_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class GcsWorkerManager : public rpc::WorkerInfoHandler {
private:
void GetWorkerInfo(
const WorkerID &worker_id,
std::function<void(const boost::optional<WorkerTableData> &)> callback) const;
std::function<void(const std::optional<WorkerTableData> &)> callback) const;

std::shared_ptr<gcs::GcsTableStorage> gcs_table_storage_;
std::shared_ptr<GcsPublisher> gcs_publisher_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include <boost/asio.hpp>
#include <boost/chrono.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/optional.hpp>
#include <boost/thread.hpp>
#include <cstdlib>
#include <unordered_map>
Expand Down
14 changes: 7 additions & 7 deletions src/ray/gcs/gcs_server/test/gcs_server_rpc_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ class GcsServerTest : public ::testing::Test {
return WaitReady(promise.get_future(), timeout_ms_);
}

boost::optional<rpc::ActorTableData> GetActorInfo(const std::string &actor_id) {
std::optional<rpc::ActorTableData> GetActorInfo(const std::string &actor_id) {
rpc::GetActorInfoRequest request;
request.set_actor_id(actor_id);
boost::optional<rpc::ActorTableData> actor_table_data_opt;
std::optional<rpc::ActorTableData> actor_table_data_opt;
std::promise<bool> promise;
client_->GetActorInfo(request,
[&actor_table_data_opt, &promise](
Expand All @@ -99,7 +99,7 @@ class GcsServerTest : public ::testing::Test {
if (reply.has_actor_table_data()) {
actor_table_data_opt = reply.actor_table_data();
} else {
actor_table_data_opt = boost::none;
actor_table_data_opt = std::nullopt;
}
promise.set_value(true);
});
Expand Down Expand Up @@ -158,10 +158,10 @@ class GcsServerTest : public ::testing::Test {
return WaitReady(promise.get_future(), timeout_ms_);
}

boost::optional<rpc::WorkerTableData> GetWorkerInfo(const std::string &worker_id) {
std::optional<rpc::WorkerTableData> GetWorkerInfo(const std::string &worker_id) {
rpc::GetWorkerInfoRequest request;
request.set_worker_id(worker_id);
boost::optional<rpc::WorkerTableData> worker_table_data_opt;
std::optional<rpc::WorkerTableData> worker_table_data_opt;
std::promise<bool> promise;
client_->GetWorkerInfo(
request,
Expand All @@ -171,7 +171,7 @@ class GcsServerTest : public ::testing::Test {
if (reply.has_worker_table_data()) {
worker_table_data_opt = reply.worker_table_data();
} else {
worker_table_data_opt = boost::none;
worker_table_data_opt = std::nullopt;
}
promise.set_value(true);
});
Expand Down Expand Up @@ -320,7 +320,7 @@ TEST_F(GcsServerTest, TestWorkerInfo) {
ASSERT_TRUE(GetAllWorkerInfo().size() == 2);

// Get worker info
boost::optional<rpc::WorkerTableData> result =
std::optional<rpc::WorkerTableData> result =
GetWorkerInfo(worker_data->worker_address().worker_id());
ASSERT_TRUE(result->worker_address().worker_id() ==
worker_data->worker_address().worker_id());
Expand Down
Loading

0 comments on commit e853826

Please sign in to comment.