Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Split object manager into small C++ targets #50885

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 191 additions & 23 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,6 @@ PLASMA_LINKOPTS = [] + select({
ray_cc_library(
name = "plasma_client",
srcs = [
"src/ray/object_manager/common.cc",
"src/ray/object_manager/plasma/client.cc",
"src/ray/object_manager/plasma/connection.cc",
"src/ray/object_manager/plasma/malloc.cc",
Expand All @@ -355,7 +354,6 @@ ray_cc_library(
],
}),
hdrs = [
"src/ray/object_manager/common.h",
"src/ray/object_manager/plasma/client.h",
"src/ray/object_manager/plasma/common.h",
"src/ray/object_manager/plasma/connection.h",
Expand All @@ -380,6 +378,7 @@ ray_cc_library(
deps = [
":plasma_fbs",
":ray_common",
":object_manager_common",
"//src/ray/protobuf:common_cc_proto",
"//src/ray/util",
"//src/ray/util:compat",
Expand Down Expand Up @@ -948,6 +947,7 @@ ray_cc_library(
],
}),
deps = [
":core_worker_lib",
":gcs",
":gcs_client_lib",
":node_manager_fbs",
Expand Down Expand Up @@ -2400,37 +2400,205 @@ ray_cc_test(
ray_cc_library(
name = "object_manager",
srcs = [
"src/ray/object_manager/chunk_object_reader.cc",
"src/ray/object_manager/common.cc",
"src/ray/object_manager/memory_object_reader.cc",
"src/ray/object_manager/object_buffer_pool.cc",
"src/ray/object_manager/object_manager.cc",
"src/ray/object_manager/ownership_based_object_directory.cc",
"src/ray/object_manager/pull_manager.cc",
"src/ray/object_manager/push_manager.cc",
"src/ray/object_manager/spilled_object_reader.cc",
],
hdrs = [
"src/ray/object_manager/chunk_object_reader.h",
"src/ray/object_manager/common.h",
"src/ray/object_manager/memory_object_reader.h",
"src/ray/object_manager/object_buffer_pool.h",
"src/ray/object_manager/object_directory.h",
"src/ray/object_manager/object_manager.h",
"src/ray/object_manager/object_reader.h",
"src/ray/object_manager/ownership_based_object_directory.h",
"src/ray/object_manager/pull_manager.h",
"src/ray/object_manager/push_manager.h",
"src/ray/object_manager/spilled_object_reader.h",
],
deps = [
":core_worker_lib",
":gcs",
":chunk_object_reader",
":object_buffer_pool",
":object_manager_common",
":object_directory",
":ownership_based_object_directory",
":object_manager_rpc",
":plasma_store_server_lib",
":ray_common",
":pull_manager",
":push_manager",
"//src/ray/common:asio",
"//src/ray/common:id",
"//src/ray/common:ray_config",
"//src/ray/common:status",
"//src/ray/protobuf:common_cc_proto",
"//src/ray/protobuf:node_manager_cc_proto",
"@boost//:asio",
"@boost//:bind",
"@com_google_absl//absl/container:flat_hash_map",
"@com_google_absl//absl/container:flat_hash_set",
"@com_google_absl//absl/time",
],
)

ray_cc_library(
name = "push_manager",
srcs = [
"src/ray/object_manager/push_manager.cc",
],
hdrs = [
"src/ray/object_manager/push_manager.h",
],
deps = [
":stats_metric",
"//src/ray/common:id",
"//src/ray/common:ray_config",
"//src/ray/common:status",
"//src/ray/util",
"@com_google_absl//absl/container:flat_hash_map",
"@com_google_absl//absl/container:flat_hash_set",
],
)

ray_cc_library(
name = "pull_manager",
srcs = [
"src/ray/object_manager/pull_manager.cc",
],
hdrs = [
"src/ray/object_manager/pull_manager.h",
],
deps = [
":object_manager_rpc",
":ownership_based_object_directory",
":stats_metric",
"//src/ray/common:id",
"//src/ray/common:ray_config",
"//src/ray/common:ray_object",
"//src/ray/common:status",
"//src/ray/util:counter_map",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, please add all required dependencies.

"//src/ray/util:container_util",
"@boost//:asio",
"@boost//:bind",
"@com_google_absl//absl/container:flat_hash_map",
"@com_google_absl//absl/container:flat_hash_set",
"@com_google_absl//absl/time",
],
)

ray_cc_library(
name = "ownership_based_object_directory",
srcs = [
"src/ray/object_manager/ownership_based_object_directory.cc",
],
hdrs = [
"src/ray/object_manager/ownership_based_object_directory.h",
],
deps = [
":gcs_client_lib",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few more deps should be included (include what you use).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also see opportunities to forward declare.

  • instrumented_io_context
  • GcsClient
  • SubscriberInterface

Could be forward declared, in terms of ROI, I would suggest try GcsClient

":object_directory",
":subscriber_lib",
":worker_rpc",
"//src/ray/common:asio",
"//src/ray/common:id",
"//src/ray/common:status",
"//src/ray/util:sequencer",
"@com_google_absl//absl/container:flat_hash_map",
],
)

ray_cc_library(
name = "object_directory",
hdrs = [
"src/ray/object_manager/object_directory.h",
],
deps = [
":gcs_client_lib",
":object_manager_common",
"//src/ray/common:asio",
"//src/ray/common:id",
"//src/ray/common:status",
],
)

ray_cc_library(
name = "object_buffer_pool",
srcs = [
"src/ray/object_manager/object_buffer_pool.cc",
],
hdrs = [
"src/ray/object_manager/object_buffer_pool.h",
],
deps = [
":memory_object_reader",
"//src/ray/common:id",
"//src/ray/common:status",
"//src/ray/util:logging",
"@boost//:asio",
"@boost//:bind",
"@com_google_absl//absl/base",
"@com_google_absl//absl/container:flat_hash_map",
"@com_google_absl//absl/synchronization",
"@com_google_absl//absl/time",
],
)

ray_cc_library(
name = "object_manager_common",
srcs = [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

srcs = ["src/ray/object_manager/common.cc"],
hdrs = ["src/ray/object_manager/common.h"],

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same below.

"src/ray/object_manager/common.cc",
],
hdrs = [
"src/ray/object_manager/common.h",
],
deps = [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please cleanup the dependency as well, I don't think we need all common deps.

"//src/ray/common:id",
"//src/ray/common:ray_config",
"//src/ray/common:status",
"@boost//:asio",
"@com_google_absl//absl/strings",
],
)

ray_cc_library(
name = "memory_object_reader",
srcs = [
"src/ray/object_manager/memory_object_reader.cc",
],
hdrs = [
"src/ray/object_manager/memory_object_reader.h",
],
deps = [
":object_reader",
":plasma_client",
],
)

ray_cc_library(
name = "chunk_object_reader",
srcs = [
"src/ray/object_manager/chunk_object_reader.cc",
],
hdrs = [
"src/ray/object_manager/chunk_object_reader.h",
],
deps = [
":spilled_object_reader",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add logging dependency as well.

"//src/ray/util:logging",
],
)

ray_cc_library(
name = "spilled_object_reader",
srcs = [
"src/ray/object_manager/spilled_object_reader.cc",
],
hdrs = [
"src/ray/object_manager/spilled_object_reader.h",
],
deps = [
":object_reader",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More dependencies are expected, including absl optional, common pb, logging.

"//src/ray/util:logging",
"@com_google_absl//absl/types:optional",
"@com_google_googletest//:gtest_prod",
],
)

ray_cc_library(
name = "object_reader",
hdrs = [
"src/ray/object_manager/object_reader.h",
],
deps = [
"//src/ray/protobuf:common_cc_proto",
],
)

Expand Down
1 change: 0 additions & 1 deletion src/ray/object_manager/object_buffer_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include <optional>

#include "absl/time/time.h"
#include "ray/common/status.h"
#include "ray/util/logging.h"

namespace ray {
Expand Down
1 change: 0 additions & 1 deletion src/ray/object_manager/object_buffer_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
#include "ray/common/id.h"
#include "ray/common/status.h"
#include "ray/object_manager/memory_object_reader.h"
#include "ray/object_manager/plasma/client.h"

namespace ray {

Expand Down
1 change: 0 additions & 1 deletion src/ray/object_manager/pull_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#include "ray/common/common_protocol.h"
#include "ray/stats/metric_defs.h"
#include "ray/util/container_util.h"

namespace ray {

Expand Down
2 changes: 0 additions & 2 deletions src/ray/object_manager/pull_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
#include "ray/common/ray_config.h"
#include "ray/common/ray_object.h"
#include "ray/common/status.h"
#include "ray/object_manager/common.h"
#include "ray/object_manager/object_directory.h"
#include "ray/object_manager/ownership_based_object_directory.h"
#include "ray/rpc/object_manager/object_manager_client.h"
#include "ray/rpc/object_manager/object_manager_server.h"
Expand Down
1 change: 0 additions & 1 deletion src/ray/object_manager/spilled_object_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

#include "absl/types/optional.h"
#include "ray/object_manager/object_reader.h"
#include "src/ray/protobuf/common.pb.h"

namespace ray {
/// Reader for a local object spilled in the object_url.
Expand Down