Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion src/common/state/ray_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,20 @@ class RayConfig {
return actor_creation_num_spillbacks_warning_;
}

int object_manager_pull_timeout_ms() const {
return object_manager_pull_timeout_ms_;
}

int object_manager_max_sends() const { return object_manager_max_sends_; }

int object_manager_max_receives() const {
return object_manager_max_receives_;
}

uint64_t object_manager_default_chunk_size() const {
return object_manager_default_chunk_size_;
}

private:
RayConfig()
: ray_protocol_version_(0x0000000000000000),
Expand Down Expand Up @@ -113,7 +127,14 @@ class RayConfig {
plasma_default_release_delay_(64),
L3_cache_size_bytes_(100000000),
max_tasks_to_spillback_(10),
actor_creation_num_spillbacks_warning_(100) {}
actor_creation_num_spillbacks_warning_(100),
// TODO: Setting this to large values results in latency, which needs to
// be addressed. This timeout is often on the critical path for object
// transfers.
object_manager_pull_timeout_ms_(20),
object_manager_max_sends_(2),
object_manager_max_receives_(2),
object_manager_default_chunk_size_(100000000) {}

~RayConfig() {}

Expand Down Expand Up @@ -196,6 +217,21 @@ class RayConfig {
/// corresponding driver. Since spillback currently occurs on a 100ms timer,
/// a value of 100 corresponds to a warning every 10 seconds.
int64_t actor_creation_num_spillbacks_warning_;

/// Timeout, in milliseconds, to wait before retrying a failed pull in the
/// ObjectManager.
int object_manager_pull_timeout_ms_;

/// Maximum number of concurrent sends allowed by the object manager.
int object_manager_max_sends_;

/// Maximum number of concurrent receives allowed by the object manager.
int object_manager_max_receives_;

/// Default chunk size for multi-chunk transfers to use in the object manager.
/// In the object manager, no single thread is permitted to transfer more
/// data than what is specified by the chunk size.
uint64_t object_manager_default_chunk_size_;
};

#endif // RAY_CONFIG_H
22 changes: 11 additions & 11 deletions src/ray/object_manager/object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ ObjectManager::ObjectManager(asio::io_service &main_service,
std::shared_ptr<gcs::AsyncGcsClient> gcs_client)
// TODO(hme): Eliminate knowledge of GCS.
: client_id_(gcs_client->client_table().GetLocalClientId()),
config_(config),
object_directory_(new ObjectDirectory(gcs_client)),
store_notification_(main_service, config.store_socket_name),
// release_delay of 2 * config.max_sends is to ensure the pool does not release
store_notification_(main_service, config_.store_socket_name),
// release_delay of 2 * config_.max_sends is to ensure the pool does not release
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just use plasma_default_release_delay_? Coupling it with max_sends seems pretty strange to me

Copy link
Contributor Author

@elibol elibol Apr 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's the scenario we had in mind when we implemented this. If we have max_sends objects split into 2 chunks, and the first chunk of each is gotten and released before each second chunk is gotten, then any release delay <max_sends (or <=max_sends if the bound is inclusive) will result in the first object actually being released. When we get the second chunk of the first object that was gotten, the associated get call to the object store client will incur the usual cost of a get. Perhaps a release delay of max_sends+1 is sufficient.

// an object prematurely whenever we reach the maximum number of sends.
buffer_pool_(config.store_socket_name, config.object_chunk_size,
/*release_delay=*/2 * config.max_sends),
buffer_pool_(config_.store_socket_name, config_.object_chunk_size,
/*release_delay=*/2 * config_.max_sends),
object_manager_service_(std::move(object_manager_service)),
work_(*object_manager_service_),
connection_pool_(),
Expand All @@ -28,7 +29,6 @@ ObjectManager::ObjectManager(asio::io_service &main_service,
RAY_CHECK(config_.max_sends > 0);
RAY_CHECK(config_.max_receives > 0);
main_service_ = &main_service;
config_ = config;
store_notification_.SubscribeObjAdded(
[this](const ObjectInfoT &object_info) { NotifyDirectoryObjectAdd(object_info); });
store_notification_.SubscribeObjDeleted(
Expand All @@ -40,12 +40,13 @@ ObjectManager::ObjectManager(asio::io_service &main_service,
std::unique_ptr<asio::io_service> object_manager_service,
const ObjectManagerConfig &config,
std::unique_ptr<ObjectDirectoryInterface> od)
: object_directory_(std::move(od)),
store_notification_(main_service, config.store_socket_name),
// release_delay of 2 * config.max_sends is to ensure the pool does not release
: config_(config),
object_directory_(std::move(od)),
store_notification_(main_service, config_.store_socket_name),
// release_delay of 2 * config_.max_sends is to ensure the pool does not release
// an object prematurely whenever we reach the maximum number of sends.
buffer_pool_(config.store_socket_name, config.object_chunk_size,
/*release_delay=*/2 * config.max_sends),
buffer_pool_(config_.store_socket_name, config_.object_chunk_size,
/*release_delay=*/2 * config_.max_sends),
object_manager_service_(std::move(object_manager_service)),
work_(*object_manager_service_),
connection_pool_(),
Expand All @@ -57,7 +58,6 @@ ObjectManager::ObjectManager(asio::io_service &main_service,
RAY_CHECK(config_.max_receives > 0);
// TODO(hme) Client ID is never set with this constructor.
main_service_ = &main_service;
config_ = config;
store_notification_.SubscribeObjAdded(
[this](const ObjectInfoT &object_info) { NotifyDirectoryObjectAdd(object_info); });
store_notification_.SubscribeObjDeleted(
Expand Down
10 changes: 5 additions & 5 deletions src/ray/object_manager/object_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ namespace ray {
struct ObjectManagerConfig {
/// The time in milliseconds to wait before retrying a pull
/// that failed due to client id lookup.
uint pull_timeout_ms = 100;
uint pull_timeout_ms;
/// Maximum number of sends allowed.
int max_sends = 2;
int max_sends;
/// Maximum number of receives allowed.
int max_receives = 2;
int max_receives;
/// Object chunk size, in bytes
uint64_t object_chunk_size = std::pow(10, 8);
uint64_t object_chunk_size;
// TODO(hme): Implement num retries (to avoid infinite retries).
std::string store_socket_name;
};
Expand Down Expand Up @@ -152,7 +152,7 @@ class ObjectManager {

private:
ClientID client_id_;
ObjectManagerConfig config_;
const ObjectManagerConfig config_;
std::unique_ptr<ObjectDirectoryInterface> object_directory_;
ObjectStoreNotificationManager store_notification_;
ObjectBufferPool buffer_pool_;
Expand Down
19 changes: 13 additions & 6 deletions src/ray/object_manager/test/object_manager_stress_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,23 +125,30 @@ class TestObjectManagerBase : public ::testing::Test {
store_id_1 = StartStore(UniqueID::from_random().hex());
store_id_2 = StartStore(UniqueID::from_random().hex());

uint pull_timeout_ms = 1;
int max_sends = 2;
int max_receives = 2;
uint64_t object_chunk_size = static_cast<uint64_t>(std::pow(10, 3));

// start first server
gcs_client_1 = std::shared_ptr<gcs::AsyncGcsClient>(new gcs::AsyncGcsClient());
ObjectManagerConfig om_config_1;
om_config_1.store_socket_name = store_id_1;
om_config_1.max_sends = 2;
om_config_1.max_receives = 2;
om_config_1.object_chunk_size = static_cast<uint64_t>(std::pow(10, 3));
om_config_1.pull_timeout_ms = pull_timeout_ms;
om_config_1.max_sends = max_sends;
om_config_1.max_receives = max_receives;
om_config_1.object_chunk_size = object_chunk_size;
server1.reset(new MockServer(main_service, std::move(object_manager_service_1),
om_config_1, gcs_client_1));

// start second server
gcs_client_2 = std::shared_ptr<gcs::AsyncGcsClient>(new gcs::AsyncGcsClient());
ObjectManagerConfig om_config_2;
om_config_2.store_socket_name = store_id_2;
om_config_2.max_sends = 2;
om_config_2.max_receives = 2;
om_config_2.object_chunk_size = static_cast<uint64_t>(std::pow(10, 3));
om_config_2.pull_timeout_ms = pull_timeout_ms;
om_config_2.max_sends = max_sends;
om_config_2.max_receives = max_receives;
om_config_2.object_chunk_size = object_chunk_size;
server2.reset(new MockServer(main_service, std::move(object_manager_service_2),
om_config_2, gcs_client_2));

Expand Down
13 changes: 13 additions & 0 deletions src/ray/object_manager/test/object_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,17 +115,30 @@ class TestObjectManager : public ::testing::Test {
store_id_1 = StartStore(UniqueID::from_random().hex());
store_id_2 = StartStore(UniqueID::from_random().hex());

uint pull_timeout_ms = 1;
int max_sends = 2;
int max_receives = 2;
uint64_t object_chunk_size = static_cast<uint64_t>(std::pow(10, 3));

// start first server
gcs_client_1 = std::shared_ptr<gcs::AsyncGcsClient>(new gcs::AsyncGcsClient());
ObjectManagerConfig om_config_1;
om_config_1.store_socket_name = store_id_1;
om_config_1.pull_timeout_ms = pull_timeout_ms;
om_config_1.max_sends = max_sends;
om_config_1.max_receives = max_receives;
om_config_1.object_chunk_size = object_chunk_size;
server1.reset(new MockServer(main_service, std::move(object_manager_service_1),
om_config_1, gcs_client_1));

// start second server
gcs_client_2 = std::shared_ptr<gcs::AsyncGcsClient>(new gcs::AsyncGcsClient());
ObjectManagerConfig om_config_2;
om_config_2.store_socket_name = store_id_2;
om_config_2.pull_timeout_ms = pull_timeout_ms;
om_config_2.max_sends = max_sends;
om_config_2.max_receives = max_receives;
om_config_2.object_chunk_size = object_chunk_size;
server2.reset(new MockServer(main_service, std::move(object_manager_service_2),
om_config_2, gcs_client_2));

Expand Down
8 changes: 4 additions & 4 deletions src/ray/object_manager/transfer_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace ray {
void TransferQueue::QueueSend(const ClientID &client_id, const ObjectID &object_id,
uint64_t data_size, uint64_t metadata_size,
uint64_t chunk_index, const RemoteConnectionInfo &info) {
std::unique_lock<std::mutex> guard(send_mutex_);
std::lock_guard<std::mutex> guard(send_mutex_);
SendRequest req = {client_id, object_id, data_size, metadata_size, chunk_index, info};
// TODO(hme): Use a set to speed this up.
if (std::find(send_queue_.begin(), send_queue_.end(), req) != send_queue_.end()) {
Expand All @@ -19,7 +19,7 @@ void TransferQueue::QueueReceive(const ClientID &client_id, const ObjectID &obje
uint64_t data_size, uint64_t metadata_size,
uint64_t chunk_index,
std::shared_ptr<TcpClientConnection> conn) {
std::unique_lock<std::mutex> guard(receive_mutex_);
std::lock_guard<std::mutex> guard(receive_mutex_);
ReceiveRequest req = {client_id, object_id, data_size,
metadata_size, chunk_index, conn};
if (std::find(receive_queue_.begin(), receive_queue_.end(), req) !=
Expand All @@ -31,7 +31,7 @@ void TransferQueue::QueueReceive(const ClientID &client_id, const ObjectID &obje
}

bool TransferQueue::DequeueSendIfPresent(TransferQueue::SendRequest *send_ptr) {
std::unique_lock<std::mutex> guard(send_mutex_);
std::lock_guard<std::mutex> guard(send_mutex_);
if (send_queue_.empty()) {
return false;
}
Expand All @@ -41,7 +41,7 @@ bool TransferQueue::DequeueSendIfPresent(TransferQueue::SendRequest *send_ptr) {
}

bool TransferQueue::DequeueReceiveIfPresent(TransferQueue::ReceiveRequest *receive_ptr) {
std::unique_lock<std::mutex> guard(receive_mutex_);
std::lock_guard<std::mutex> guard(receive_mutex_);
if (receive_queue_.empty()) {
return false;
}
Expand Down
13 changes: 13 additions & 0 deletions src/ray/object_manager/transfer_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ class TransferQueue {
///
/// \param client_id The ClientID to which the object needs to be sent.
/// \param object_id The ObjectID of the object to be sent.
/// \param data_size The actual object size + the metadata size.
/// \param metadata_size The size of the object's metadata.
/// \param chunk_index The chunk index, which corresponds to the chunk of object_id that
/// is queued for transfer.
/// \param info Connection information to the remote node, which is required if a new
/// connection needs to be established.
void QueueSend(const ClientID &client_id, const ObjectID &object_id, uint64_t data_size,
uint64_t metadata_size, uint64_t chunk_index,
const RemoteConnectionInfo &info);
Expand All @@ -77,6 +83,11 @@ class TransferQueue {
///
/// \param client_id The ClientID from which the object is being received.
/// \param object_id The ObjectID of the object to be received.
/// \param data_size The actual object size + the metadata size.
/// \param metadata_size The size of the object's metadata.
/// \param chunk_index The chunk index, which corresponds to the chunk of object_id that
/// is queued for transfer.
/// \param conn Connection to the remote object manager that's sending data.
void QueueReceive(const ClientID &client_id, const ObjectID &object_id,
uint64_t data_size, uint64_t metadata_size, uint64_t chunk_index,
std::shared_ptr<TcpClientConnection> conn);
Expand All @@ -93,7 +104,9 @@ class TransferQueue {
RAY_DISALLOW_COPY_AND_ASSIGN(TransferQueue);

private:
/// Locks access to send_queue_.
std::mutex send_mutex_;
/// Locks access to receive_queue_.
std::mutex receive_mutex_;
std::deque<SendRequest> send_queue_;
std::deque<ReceiveRequest> receive_queue_;
Expand Down
16 changes: 8 additions & 8 deletions src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <iostream>

#include "common/state/ray_config.h"
#include "ray/raylet/raylet.h"
#include "ray/status.h"

Expand Down Expand Up @@ -47,14 +48,13 @@ int main(int argc, char *argv[]) {
// Configuration for the object manager.
ray::ObjectManagerConfig object_manager_config;
object_manager_config.store_socket_name = store_socket_name;
// Time out in milliseconds to wait before retrying a failed pull.
object_manager_config.pull_timeout_ms = 100;
// Maximum number of sends allowed.
object_manager_config.max_sends = 2;
// Maximum number of receives allowed.
object_manager_config.max_receives = 2;
// Object chunk size, in bytes.
object_manager_config.object_chunk_size = static_cast<uint64_t>(std::pow(10, 8));
object_manager_config.pull_timeout_ms =
RayConfig::instance().object_manager_pull_timeout_ms();
object_manager_config.max_sends = RayConfig::instance().object_manager_max_sends();
object_manager_config.max_receives =
RayConfig::instance().object_manager_max_receives();
object_manager_config.object_chunk_size =
RayConfig::instance().object_manager_default_chunk_size();

// initialize mock gcs & object directory
auto gcs_client = std::make_shared<ray::gcs::AsyncGcsClient>();
Expand Down
3 changes: 1 addition & 2 deletions src/ray/test/run_object_manager_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ echo "$REDIS_DIR/redis-server --loglevel warning --loadmodule $REDIS_MODULE --po
echo "$REDIS_DIR/redis-cli -p 6379 shutdown"

# Allow cleanup commands to fail.
# killall plasma_store || true
# $REDIS_DIR/redis-cli -p 6379 shutdown || true
$REDIS_DIR/redis-cli -p 6379 shutdown || true
sleep 1s
$REDIS_DIR/redis-server --loglevel warning --loadmodule $REDIS_MODULE --port 6379 &
sleep 1s
Expand Down