Skip to content
This repository has been archived by the owner on Nov 29, 2023. It is now read-only.

Commit

Permalink
Addresses missed comments from multichunk object transfer PR. (ray-pr…
Browse files Browse the repository at this point in the history
…oject#1908)

* Move object manager parameters to ray config,
object manager config bug fix.
addresses other comments from ray-project#1827.

* linting and uint?

* typos

* remove uint.
  • Loading branch information
elibol authored and pcmoritz committed Apr 16, 2018
1 parent 6ca2c2a commit cff3776
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 37 deletions.
38 changes: 37 additions & 1 deletion src/common/state/ray_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,20 @@ class RayConfig {
return actor_creation_num_spillbacks_warning_;
}

int object_manager_pull_timeout_ms() const {
return object_manager_pull_timeout_ms_;
}

int object_manager_max_sends() const { return object_manager_max_sends_; }

int object_manager_max_receives() const {
return object_manager_max_receives_;
}

uint64_t object_manager_default_chunk_size() const {
return object_manager_default_chunk_size_;
}

private:
RayConfig()
: ray_protocol_version_(0x0000000000000000),
Expand Down Expand Up @@ -113,7 +127,14 @@ class RayConfig {
plasma_default_release_delay_(64),
L3_cache_size_bytes_(100000000),
max_tasks_to_spillback_(10),
actor_creation_num_spillbacks_warning_(100) {}
actor_creation_num_spillbacks_warning_(100),
// TODO: Setting this to large values results in latency, which needs to
// be addressed. This timeout is often on the critical path for object
// transfers.
object_manager_pull_timeout_ms_(20),
object_manager_max_sends_(2),
object_manager_max_receives_(2),
object_manager_default_chunk_size_(100000000) {}

~RayConfig() {}

Expand Down Expand Up @@ -196,6 +217,21 @@ class RayConfig {
/// corresponding driver. Since spillback currently occurs on a 100ms timer,
/// a value of 100 corresponds to a warning every 10 seconds.
int64_t actor_creation_num_spillbacks_warning_;

/// Timeout, in milliseconds, to wait before retrying a failed pull in the
/// ObjectManager.
int object_manager_pull_timeout_ms_;

/// Maximum number of concurrent sends allowed by the object manager.
int object_manager_max_sends_;

/// Maximum number of concurrent receives allowed by the object manager.
int object_manager_max_receives_;

/// Default chunk size for multi-chunk transfers to use in the object manager.
/// In the object manager, no single thread is permitted to transfer more
/// data than what is specified by the chunk size.
uint64_t object_manager_default_chunk_size_;
};

#endif // RAY_CONFIG_H
22 changes: 11 additions & 11 deletions src/ray/object_manager/object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ ObjectManager::ObjectManager(asio::io_service &main_service,
std::shared_ptr<gcs::AsyncGcsClient> gcs_client)
// TODO(hme): Eliminate knowledge of GCS.
: client_id_(gcs_client->client_table().GetLocalClientId()),
config_(config),
object_directory_(new ObjectDirectory(gcs_client)),
store_notification_(main_service, config.store_socket_name),
// release_delay of 2 * config.max_sends is to ensure the pool does not release
store_notification_(main_service, config_.store_socket_name),
// release_delay of 2 * config_.max_sends is to ensure the pool does not release
// an object prematurely whenever we reach the maximum number of sends.
buffer_pool_(config.store_socket_name, config.object_chunk_size,
/*release_delay=*/2 * config.max_sends),
buffer_pool_(config_.store_socket_name, config_.object_chunk_size,
/*release_delay=*/2 * config_.max_sends),
object_manager_service_(std::move(object_manager_service)),
work_(*object_manager_service_),
connection_pool_(),
Expand All @@ -28,7 +29,6 @@ ObjectManager::ObjectManager(asio::io_service &main_service,
RAY_CHECK(config_.max_sends > 0);
RAY_CHECK(config_.max_receives > 0);
main_service_ = &main_service;
config_ = config;
store_notification_.SubscribeObjAdded(
[this](const ObjectInfoT &object_info) { NotifyDirectoryObjectAdd(object_info); });
store_notification_.SubscribeObjDeleted(
Expand All @@ -40,12 +40,13 @@ ObjectManager::ObjectManager(asio::io_service &main_service,
std::unique_ptr<asio::io_service> object_manager_service,
const ObjectManagerConfig &config,
std::unique_ptr<ObjectDirectoryInterface> od)
: object_directory_(std::move(od)),
store_notification_(main_service, config.store_socket_name),
// release_delay of 2 * config.max_sends is to ensure the pool does not release
: config_(config),
object_directory_(std::move(od)),
store_notification_(main_service, config_.store_socket_name),
// release_delay of 2 * config_.max_sends is to ensure the pool does not release
// an object prematurely whenever we reach the maximum number of sends.
buffer_pool_(config.store_socket_name, config.object_chunk_size,
/*release_delay=*/2 * config.max_sends),
buffer_pool_(config_.store_socket_name, config_.object_chunk_size,
/*release_delay=*/2 * config_.max_sends),
object_manager_service_(std::move(object_manager_service)),
work_(*object_manager_service_),
connection_pool_(),
Expand All @@ -57,7 +58,6 @@ ObjectManager::ObjectManager(asio::io_service &main_service,
RAY_CHECK(config_.max_receives > 0);
// TODO(hme) Client ID is never set with this constructor.
main_service_ = &main_service;
config_ = config;
store_notification_.SubscribeObjAdded(
[this](const ObjectInfoT &object_info) { NotifyDirectoryObjectAdd(object_info); });
store_notification_.SubscribeObjDeleted(
Expand Down
10 changes: 5 additions & 5 deletions src/ray/object_manager/object_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ namespace ray {
struct ObjectManagerConfig {
/// The time in milliseconds to wait before retrying a pull
/// that failed due to client id lookup.
uint pull_timeout_ms = 100;
uint pull_timeout_ms;
/// Maximum number of sends allowed.
int max_sends = 2;
int max_sends;
/// Maximum number of receives allowed.
int max_receives = 2;
int max_receives;
/// Object chunk size, in bytes
uint64_t object_chunk_size = std::pow(10, 8);
uint64_t object_chunk_size;
// TODO(hme): Implement num retries (to avoid infinite retries).
std::string store_socket_name;
};
Expand Down Expand Up @@ -152,7 +152,7 @@ class ObjectManager {

private:
ClientID client_id_;
ObjectManagerConfig config_;
const ObjectManagerConfig config_;
std::unique_ptr<ObjectDirectoryInterface> object_directory_;
ObjectStoreNotificationManager store_notification_;
ObjectBufferPool buffer_pool_;
Expand Down
19 changes: 13 additions & 6 deletions src/ray/object_manager/test/object_manager_stress_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,23 +125,30 @@ class TestObjectManagerBase : public ::testing::Test {
store_id_1 = StartStore(UniqueID::from_random().hex());
store_id_2 = StartStore(UniqueID::from_random().hex());

uint pull_timeout_ms = 1;
int max_sends = 2;
int max_receives = 2;
uint64_t object_chunk_size = static_cast<uint64_t>(std::pow(10, 3));

// start first server
gcs_client_1 = std::shared_ptr<gcs::AsyncGcsClient>(new gcs::AsyncGcsClient());
ObjectManagerConfig om_config_1;
om_config_1.store_socket_name = store_id_1;
om_config_1.max_sends = 2;
om_config_1.max_receives = 2;
om_config_1.object_chunk_size = static_cast<uint64_t>(std::pow(10, 3));
om_config_1.pull_timeout_ms = pull_timeout_ms;
om_config_1.max_sends = max_sends;
om_config_1.max_receives = max_receives;
om_config_1.object_chunk_size = object_chunk_size;
server1.reset(new MockServer(main_service, std::move(object_manager_service_1),
om_config_1, gcs_client_1));

// start second server
gcs_client_2 = std::shared_ptr<gcs::AsyncGcsClient>(new gcs::AsyncGcsClient());
ObjectManagerConfig om_config_2;
om_config_2.store_socket_name = store_id_2;
om_config_2.max_sends = 2;
om_config_2.max_receives = 2;
om_config_2.object_chunk_size = static_cast<uint64_t>(std::pow(10, 3));
om_config_2.pull_timeout_ms = pull_timeout_ms;
om_config_2.max_sends = max_sends;
om_config_2.max_receives = max_receives;
om_config_2.object_chunk_size = object_chunk_size;
server2.reset(new MockServer(main_service, std::move(object_manager_service_2),
om_config_2, gcs_client_2));

Expand Down
13 changes: 13 additions & 0 deletions src/ray/object_manager/test/object_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,17 +115,30 @@ class TestObjectManager : public ::testing::Test {
store_id_1 = StartStore(UniqueID::from_random().hex());
store_id_2 = StartStore(UniqueID::from_random().hex());

uint pull_timeout_ms = 1;
int max_sends = 2;
int max_receives = 2;
uint64_t object_chunk_size = static_cast<uint64_t>(std::pow(10, 3));

// start first server
gcs_client_1 = std::shared_ptr<gcs::AsyncGcsClient>(new gcs::AsyncGcsClient());
ObjectManagerConfig om_config_1;
om_config_1.store_socket_name = store_id_1;
om_config_1.pull_timeout_ms = pull_timeout_ms;
om_config_1.max_sends = max_sends;
om_config_1.max_receives = max_receives;
om_config_1.object_chunk_size = object_chunk_size;
server1.reset(new MockServer(main_service, std::move(object_manager_service_1),
om_config_1, gcs_client_1));

// start second server
gcs_client_2 = std::shared_ptr<gcs::AsyncGcsClient>(new gcs::AsyncGcsClient());
ObjectManagerConfig om_config_2;
om_config_2.store_socket_name = store_id_2;
om_config_2.pull_timeout_ms = pull_timeout_ms;
om_config_2.max_sends = max_sends;
om_config_2.max_receives = max_receives;
om_config_2.object_chunk_size = object_chunk_size;
server2.reset(new MockServer(main_service, std::move(object_manager_service_2),
om_config_2, gcs_client_2));

Expand Down
8 changes: 4 additions & 4 deletions src/ray/object_manager/transfer_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace ray {
void TransferQueue::QueueSend(const ClientID &client_id, const ObjectID &object_id,
uint64_t data_size, uint64_t metadata_size,
uint64_t chunk_index, const RemoteConnectionInfo &info) {
std::unique_lock<std::mutex> guard(send_mutex_);
std::lock_guard<std::mutex> guard(send_mutex_);
SendRequest req = {client_id, object_id, data_size, metadata_size, chunk_index, info};
// TODO(hme): Use a set to speed this up.
if (std::find(send_queue_.begin(), send_queue_.end(), req) != send_queue_.end()) {
Expand All @@ -19,7 +19,7 @@ void TransferQueue::QueueReceive(const ClientID &client_id, const ObjectID &obje
uint64_t data_size, uint64_t metadata_size,
uint64_t chunk_index,
std::shared_ptr<TcpClientConnection> conn) {
std::unique_lock<std::mutex> guard(receive_mutex_);
std::lock_guard<std::mutex> guard(receive_mutex_);
ReceiveRequest req = {client_id, object_id, data_size,
metadata_size, chunk_index, conn};
if (std::find(receive_queue_.begin(), receive_queue_.end(), req) !=
Expand All @@ -31,7 +31,7 @@ void TransferQueue::QueueReceive(const ClientID &client_id, const ObjectID &obje
}

bool TransferQueue::DequeueSendIfPresent(TransferQueue::SendRequest *send_ptr) {
std::unique_lock<std::mutex> guard(send_mutex_);
std::lock_guard<std::mutex> guard(send_mutex_);
if (send_queue_.empty()) {
return false;
}
Expand All @@ -41,7 +41,7 @@ bool TransferQueue::DequeueSendIfPresent(TransferQueue::SendRequest *send_ptr) {
}

bool TransferQueue::DequeueReceiveIfPresent(TransferQueue::ReceiveRequest *receive_ptr) {
std::unique_lock<std::mutex> guard(receive_mutex_);
std::lock_guard<std::mutex> guard(receive_mutex_);
if (receive_queue_.empty()) {
return false;
}
Expand Down
13 changes: 13 additions & 0 deletions src/ray/object_manager/transfer_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ class TransferQueue {
///
/// \param client_id The ClientID to which the object needs to be sent.
/// \param object_id The ObjectID of the object to be sent.
/// \param data_size The actual object size + the metadata size.
/// \param metadata_size The size of the object's metadata.
/// \param chunk_index The chunk index, which corresponds to the chunk of object_id that
/// is queued for transfer.
/// \param info Connection information to the remote node, which is required if a new
/// connection needs to be established.
void QueueSend(const ClientID &client_id, const ObjectID &object_id, uint64_t data_size,
uint64_t metadata_size, uint64_t chunk_index,
const RemoteConnectionInfo &info);
Expand All @@ -77,6 +83,11 @@ class TransferQueue {
///
/// \param client_id The ClientID from which the object is being received.
/// \param object_id The ObjectID of the object to be received.
/// \param data_size The actual object size + the metadata size.
/// \param metadata_size The size of the object's metadata.
/// \param chunk_index The chunk index, which corresponds to the chunk of object_id that
/// is queued for transfer.
/// \param conn Connection to the remote object manager that's sending data.
void QueueReceive(const ClientID &client_id, const ObjectID &object_id,
uint64_t data_size, uint64_t metadata_size, uint64_t chunk_index,
std::shared_ptr<TcpClientConnection> conn);
Expand All @@ -93,7 +104,9 @@ class TransferQueue {
RAY_DISALLOW_COPY_AND_ASSIGN(TransferQueue);

private:
/// Locks access to send_queue_.
std::mutex send_mutex_;
/// Locks access to receive_queue_.
std::mutex receive_mutex_;
std::deque<SendRequest> send_queue_;
std::deque<ReceiveRequest> receive_queue_;
Expand Down
16 changes: 8 additions & 8 deletions src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <iostream>

#include "common/state/ray_config.h"
#include "ray/raylet/raylet.h"
#include "ray/status.h"

Expand Down Expand Up @@ -47,14 +48,13 @@ int main(int argc, char *argv[]) {
// Configuration for the object manager.
ray::ObjectManagerConfig object_manager_config;
object_manager_config.store_socket_name = store_socket_name;
// Time out in milliseconds to wait before retrying a failed pull.
object_manager_config.pull_timeout_ms = 100;
// Maximum number of sends allowed.
object_manager_config.max_sends = 2;
// Maximum number of receives allowed.
object_manager_config.max_receives = 2;
// Object chunk size, in bytes.
object_manager_config.object_chunk_size = static_cast<uint64_t>(std::pow(10, 8));
object_manager_config.pull_timeout_ms =
RayConfig::instance().object_manager_pull_timeout_ms();
object_manager_config.max_sends = RayConfig::instance().object_manager_max_sends();
object_manager_config.max_receives =
RayConfig::instance().object_manager_max_receives();
object_manager_config.object_chunk_size =
RayConfig::instance().object_manager_default_chunk_size();

// initialize mock gcs & object directory
auto gcs_client = std::make_shared<ray::gcs::AsyncGcsClient>();
Expand Down
3 changes: 1 addition & 2 deletions src/ray/test/run_object_manager_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ echo "$REDIS_DIR/redis-server --loglevel warning --loadmodule $REDIS_MODULE --po
echo "$REDIS_DIR/redis-cli -p 6379 shutdown"

# Allow cleanup commands to fail.
# killall plasma_store || true
# $REDIS_DIR/redis-cli -p 6379 shutdown || true
$REDIS_DIR/redis-cli -p 6379 shutdown || true
sleep 1s
$REDIS_DIR/redis-server --loglevel warning --loadmodule $REDIS_MODULE --port 6379 &
sleep 1s
Expand Down

0 comments on commit cff3776

Please sign in to comment.