Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GCS] Delete redis gcs client and redis_xxx_accessor #12996

Merged
merged 19 commits into from
Dec 23, 2020
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 0 additions & 106 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,6 @@ cc_test(
cc_library(
name = "gcs_test_util_lib",
hdrs = [
"src/ray/gcs/test/accessor_test_base.h",
"src/ray/gcs/test/gcs_test_util.h",
],
copts = COPTS,
Expand Down Expand Up @@ -1621,111 +1620,6 @@ cc_library(
],
)

# TODO(micafan) Support test group in future. Use test group we can run all gcs test once.
cc_test(
name = "redis_gcs_client_test",
srcs = ["src/ray/gcs/test/redis_gcs_client_test.cc"],
args = [
"$(location redis-server)",
"$(location redis-cli)",
"$(location libray_redis_module.so)",
],
copts = COPTS,
data = [
"//:libray_redis_module.so",
"//:redis-cli",
"//:redis-server",
],
deps = [
":gcs",
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "redis_actor_info_accessor_test",
srcs = ["src/ray/gcs/test/redis_actor_info_accessor_test.cc"],
args = [
"$(location redis-server)",
"$(location redis-cli)",
"$(location libray_redis_module.so)",
],
copts = COPTS,
data = [
"//:libray_redis_module.so",
"//:redis-cli",
"//:redis-server",
],
deps = [
":gcs",
":gcs_test_util_lib",
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "redis_object_info_accessor_test",
srcs = ["src/ray/gcs/test/redis_object_info_accessor_test.cc"],
args = [
"$(location redis-server)",
"$(location redis-cli)",
"$(location libray_redis_module.so)",
],
copts = COPTS,
data = [
"//:libray_redis_module.so",
"//:redis-cli",
"//:redis-server",
],
deps = [
":gcs",
":gcs_test_util_lib",
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "redis_job_info_accessor_test",
srcs = ["src/ray/gcs/test/redis_job_info_accessor_test.cc"],
args = [
"$(location redis-server)",
"$(location redis-cli)",
"$(location libray_redis_module.so)",
],
copts = COPTS,
data = [
"//:libray_redis_module.so",
"//:redis-cli",
"//:redis-server",
],
deps = [
":gcs",
":gcs_test_util_lib",
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "redis_node_info_accessor_test",
srcs = ["src/ray/gcs/test/redis_node_info_accessor_test.cc"],
args = [
"$(location redis-server)",
"$(location redis-cli)",
"$(location libray_redis_module.so)",
],
copts = COPTS,
data = [
"//:libray_redis_module.so",
"//:redis-cli",
"//:redis-server",
],
deps = [
":gcs",
":gcs_test_util_lib",
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "asio_test",
srcs = ["src/ray/gcs/test/asio_test.cc"],
Expand Down
1 change: 0 additions & 1 deletion ci/travis/ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ test_core() {
case "${OSTYPE}" in
msys)
args+=(
-//:redis_gcs_client_test
-//:core_worker_test
-//:event_test
-//:gcs_pub_sub_test
Expand Down
4 changes: 2 additions & 2 deletions src/ray/core_worker/actor_handle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ ray::rpc::ActorHandle CreateInnerActorHandleFromString(const std::string &serial
}

ray::rpc::ActorHandle CreateInnerActorHandleFromActorTableData(
const ray::gcs::ActorTableData &actor_table_data) {
const ray::rpc::ActorTableData &actor_table_data) {
clay4megtr marked this conversation as resolved.
Show resolved Hide resolved
ray::rpc::ActorHandle inner;
inner.set_actor_id(actor_table_data.actor_id());
inner.set_owner_id(actor_table_data.parent_id());
Expand Down Expand Up @@ -80,7 +80,7 @@ ActorHandle::ActorHandle(
ActorHandle::ActorHandle(const std::string &serialized)
: ActorHandle(CreateInnerActorHandleFromString(serialized)) {}

ActorHandle::ActorHandle(const gcs::ActorTableData &actor_table_data)
ActorHandle::ActorHandle(const rpc::ActorTableData &actor_table_data)
: ActorHandle(CreateInnerActorHandleFromActorTableData(actor_table_data)) {}

void ActorHandle::SetActorTaskSpec(TaskSpecBuilder &builder, const ObjectID new_cursor) {
Expand Down
4 changes: 2 additions & 2 deletions src/ray/core_worker/actor_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#include "ray/common/task/task_util.h"
#include "ray/core_worker/common.h"
#include "ray/core_worker/context.h"
#include "ray/gcs/redis_gcs_client.h"
#include "ray/gcs/gcs_client/service_based_gcs_client.h"
clay4megtr marked this conversation as resolved.
Show resolved Hide resolved
#include "src/ray/protobuf/core_worker.pb.h"
#include "src/ray/protobuf/gcs.pb.h"

Expand All @@ -42,7 +42,7 @@ class ActorHandle {
ActorHandle(const std::string &serialized);

/// Constructs an ActorHandle from a gcs::ActorTableData message.
ActorHandle(const gcs::ActorTableData &actor_table_data);
ActorHandle(const rpc::ActorTableData &actor_table_data);

ActorID GetActorID() const { return ActorID::FromBinary(inner_.actor_id()); };

Expand Down
11 changes: 5 additions & 6 deletions src/ray/core_worker/actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#include "ray/core_worker/actor_manager.h"

#include "ray/gcs/pb_util.h"
#include "ray/gcs/redis_accessor.h"

namespace ray {

Expand Down Expand Up @@ -122,23 +121,23 @@ void ActorManager::WaitForActorOutOfScope(
}

void ActorManager::HandleActorStateNotification(const ActorID &actor_id,
const gcs::ActorTableData &actor_data) {
const auto &actor_state = gcs::ActorTableData::ActorState_Name(actor_data.state());
const rpc::ActorTableData &actor_data) {
const auto &actor_state = rpc::ActorTableData::ActorState_Name(actor_data.state());
RAY_LOG(INFO) << "received notification on actor, state: " << actor_state
<< ", actor_id: " << actor_id
<< ", ip address: " << actor_data.address().ip_address()
<< ", port: " << actor_data.address().port() << ", worker_id: "
<< WorkerID::FromBinary(actor_data.address().worker_id())
<< ", raylet_id: " << NodeID::FromBinary(actor_data.address().raylet_id())
<< ", num_restarts: " << actor_data.num_restarts();
if (actor_data.state() == gcs::ActorTableData::RESTARTING) {
if (actor_data.state() == rpc::ActorTableData::RESTARTING) {
direct_actor_submitter_->DisconnectActor(actor_id, actor_data.num_restarts(), false);
} else if (actor_data.state() == gcs::ActorTableData::DEAD) {
} else if (actor_data.state() == rpc::ActorTableData::DEAD) {
direct_actor_submitter_->DisconnectActor(actor_id, actor_data.num_restarts(), true);
// We cannot erase the actor handle here because clients can still
// submit tasks to dead actors. This also means we defer unsubscription,
// otherwise we crash when bulk unsubscribing all actor handles.
} else if (actor_data.state() == gcs::ActorTableData::ALIVE) {
} else if (actor_data.state() == rpc::ActorTableData::ALIVE) {
direct_actor_submitter_->ConnectActor(actor_id, actor_data.address(),
actor_data.num_restarts());
} else {
Expand Down
4 changes: 2 additions & 2 deletions src/ray/core_worker/actor_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include "ray/core_worker/actor_handle.h"
#include "ray/core_worker/reference_count.h"
#include "ray/core_worker/transport/direct_actor_transport.h"
#include "ray/gcs/redis_gcs_client.h"
#include "ray/gcs/gcs_client/service_based_gcs_client.h"
clay4megtr marked this conversation as resolved.
Show resolved Hide resolved

namespace ray {

Expand Down Expand Up @@ -177,7 +177,7 @@ class ActorManager {
/// \param[in] actor_id The actor id of this notification.
/// \param[in] actor_data The GCS actor data.
void HandleActorStateNotification(const ActorID &actor_id,
const gcs::ActorTableData &actor_data);
const rpc::ActorTableData &actor_data);

/// GCS client.
std::shared_ptr<gcs::GcsClient> gcs_client_;
Expand Down
4 changes: 2 additions & 2 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
TaskID::ComputeDriverTaskId(worker_context_.GetWorkerID()),
GetCallerId(), rpc_address_);

std::shared_ptr<gcs::TaskTableData> data = std::make_shared<gcs::TaskTableData>();
std::shared_ptr<rpc::TaskTableData> data = std::make_shared<rpc::TaskTableData>();
data->mutable_task()->mutable_task_spec()->CopyFrom(builder.Build().GetMessage());
if (!options_.is_local_mode) {
RAY_CHECK_OK(gcs_client_->Tasks().AsyncAdd(data, nullptr));
Expand Down Expand Up @@ -1640,7 +1640,7 @@ std::pair<const ActorHandle *, Status> CoreWorker::GetNamedActorHandle(
std::make_shared<std::promise<void>>(std::promise<void>());
RAY_CHECK_OK(gcs_client_->Actors().AsyncGetByName(
name, [this, &actor_id, name, ready_promise](
Status status, const boost::optional<gcs::ActorTableData> &result) {
Status status, const boost::optional<rpc::ActorTableData> &result) {
if (status.ok() && result) {
auto actor_handle = std::unique_ptr<ActorHandle>(new ActorHandle(*result));
actor_id = actor_handle->GetActorID();
Expand Down
3 changes: 1 addition & 2 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@
#include "ray/core_worker/store_provider/plasma_store_provider.h"
#include "ray/core_worker/transport/direct_actor_transport.h"
#include "ray/core_worker/transport/direct_task_transport.h"
#include "ray/gcs/redis_gcs_client.h"
#include "ray/gcs/subscription_executor.h"
#include "ray/gcs/gcs_client/service_based_gcs_client.h"
clay4megtr marked this conversation as resolved.
Show resolved Hide resolved
#include "ray/raylet_client/raylet_client.h"
#include "ray/rpc/node_manager/node_manager_client.h"
#include "ray/rpc/worker/core_worker_client.h"
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/profiling.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include "absl/synchronization/mutex.h"
#include "absl/time/clock.h"
#include "ray/core_worker/context.h"
#include "ray/gcs/redis_gcs_client.h"
#include "ray/gcs/gcs_client/service_based_gcs_client.h"
clay4megtr marked this conversation as resolved.
Show resolved Hide resolved

namespace ray {

Expand Down
22 changes: 10 additions & 12 deletions src/ray/core_worker/test/actor_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@
#include "ray/common/test_util.h"
#include "ray/core_worker/reference_count.h"
#include "ray/core_worker/transport/direct_actor_transport.h"
#include "ray/gcs/redis_accessor.h"
#include "ray/gcs/redis_gcs_client.h"
#include "ray/gcs/gcs_client/service_based_accessor.h"
#include "ray/gcs/gcs_client/service_based_gcs_client.h"

namespace ray {

using ::testing::_;

class MockActorInfoAccessor : public gcs::RedisActorInfoAccessor {
class MockActorInfoAccessor : public gcs::ServiceBasedActorInfoAccessor {
clay4megtr marked this conversation as resolved.
Show resolved Hide resolved
public:
MockActorInfoAccessor(gcs::RedisGcsClient *client)
: gcs::RedisActorInfoAccessor(client) {}
MockActorInfoAccessor(gcs::ServiceBasedGcsClient *client)
: gcs::ServiceBasedActorInfoAccessor(client) {}

~MockActorInfoAccessor() {}

Expand All @@ -44,7 +44,7 @@ class MockActorInfoAccessor : public gcs::RedisActorInfoAccessor {
}

bool ActorStateNotificationPublished(const ActorID &actor_id,
const gcs::ActorTableData &actor_data) {
const rpc::ActorTableData &actor_data) {
auto it = callback_map_.find(actor_id);
if (it == callback_map_.end()) return false;
auto actor_state_notification_callback = it->second;
Expand All @@ -60,15 +60,13 @@ class MockActorInfoAccessor : public gcs::RedisActorInfoAccessor {
callback_map_;
};

class MockGcsClient : public gcs::RedisGcsClient {
class MockGcsClient : public gcs::ServiceBasedGcsClient {
public:
MockGcsClient(const gcs::GcsClientOptions &options) : gcs::RedisGcsClient(options) {}
MockGcsClient(gcs::GcsClientOptions options) : gcs::ServiceBasedGcsClient(options) {}

void Init(MockActorInfoAccessor *actor_accesor_mock) {
actor_accessor_.reset(actor_accesor_mock);
void Init(MockActorInfoAccessor *actor_info_accessor) {
actor_accessor_.reset(actor_info_accessor);
}

~MockGcsClient() {}
};

class MockDirectActorSubmitter : public CoreWorkerDirectActorTaskSubmitterInterface {
Expand Down
5 changes: 0 additions & 5 deletions src/ray/core_worker/test/direct_actor_transport_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ TEST_F(DirectActorSubmitterTest, TestActorDead) {
addr.set_worker_id(worker_id.Binary());
ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0);
submitter_.AddActorQueueIfNotExists(actor_id);
gcs::ActorTableData actor_data;
submitter_.ConnectActor(actor_id, addr, 0);
ASSERT_EQ(worker_client_->callbacks.size(), 0);

Expand Down Expand Up @@ -256,7 +255,6 @@ TEST_F(DirectActorSubmitterTest, TestActorRestartNoRetry) {
addr.set_worker_id(worker_id.Binary());
ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0);
submitter_.AddActorQueueIfNotExists(actor_id);
gcs::ActorTableData actor_data;
addr.set_port(0);
submitter_.ConnectActor(actor_id, addr, 0);
ASSERT_EQ(worker_client_->callbacks.size(), 0);
Expand Down Expand Up @@ -299,7 +297,6 @@ TEST_F(DirectActorSubmitterTest, TestActorRestartRetry) {
addr.set_worker_id(worker_id.Binary());
ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0);
submitter_.AddActorQueueIfNotExists(actor_id);
gcs::ActorTableData actor_data;
addr.set_port(0);
submitter_.ConnectActor(actor_id, addr, 0);
ASSERT_EQ(worker_client_->callbacks.size(), 0);
Expand Down Expand Up @@ -351,7 +348,6 @@ TEST_F(DirectActorSubmitterTest, TestActorRestartOutOfOrderRetry) {
addr.set_worker_id(worker_id.Binary());
ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0);
submitter_.AddActorQueueIfNotExists(actor_id);
gcs::ActorTableData actor_data;
addr.set_port(0);
submitter_.ConnectActor(actor_id, addr, 0);
ASSERT_EQ(worker_client_->callbacks.size(), 0);
Expand Down Expand Up @@ -401,7 +397,6 @@ TEST_F(DirectActorSubmitterTest, TestActorRestartOutOfOrderGcs) {
addr.set_worker_id(worker_id.Binary());
ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0);
submitter_.AddActorQueueIfNotExists(actor_id);
gcs::ActorTableData actor_data;
addr.set_port(0);
submitter_.ConnectActor(actor_id, addr, 0);
ASSERT_EQ(worker_client_->callbacks.size(), 0);
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/transport/direct_actor_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
#include "ray/core_worker/store_provider/memory_store/memory_store.h"
#include "ray/core_worker/task_manager.h"
#include "ray/core_worker/transport/dependency_resolver.h"
#include "ray/gcs/redis_gcs_client.h"
#include "ray/gcs/gcs_client/service_based_gcs_client.h"
clay4megtr marked this conversation as resolved.
Show resolved Hide resolved
#include "ray/rpc/grpc_server.h"
#include "ray/rpc/worker/core_worker_client.h"

Expand Down
2 changes: 1 addition & 1 deletion src/ray/gcs/gcs_client/global_state_accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ std::vector<std::string> GlobalStateAccessor::GetAllWorkerInfo() {
}

bool GlobalStateAccessor::AddWorkerInfo(const std::string &serialized_string) {
auto data_ptr = std::make_shared<WorkerTableData>();
auto data_ptr = std::make_shared<rpc::WorkerTableData>();
data_ptr->ParseFromString(serialized_string);
std::promise<bool> promise;
RAY_CHECK_OK(
Expand Down
Loading