Skip to content
This repository has been archived by the owner on Nov 29, 2023. It is now read-only.

Commit

Permalink
[xray] Sets good object manager defaults. (ray-project#2255)
Browse files Browse the repository at this point in the history
* better object manager defaults. added max for number of chunks.

* change source of cores.
  • Loading branch information
elibol authored and pcmoritz committed Jun 20, 2018
1 parent 4acb77a commit 60bc3a0
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 33 deletions.
19 changes: 3 additions & 16 deletions src/common/state/ray_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,6 @@ class RayConfig {
return object_manager_push_timeout_ms_;
}

int object_manager_max_sends() const { return object_manager_max_sends_; }

int object_manager_max_receives() const {
return object_manager_max_receives_;
}

uint64_t object_manager_default_chunk_size() const {
return object_manager_default_chunk_size_;
}
Expand Down Expand Up @@ -142,9 +136,7 @@ class RayConfig {
// transfers.
object_manager_pull_timeout_ms_(20),
object_manager_push_timeout_ms_(10000),
object_manager_max_sends_(2),
object_manager_max_receives_(2),
object_manager_default_chunk_size_(100000000),
object_manager_default_chunk_size_(1000000),
num_workers_per_process_(1) {}

~RayConfig() {}
Expand Down Expand Up @@ -244,15 +236,10 @@ class RayConfig {
/// 0: giving up retrying immediately.
int object_manager_push_timeout_ms_;

/// Maximum number of concurrent sends allowed by the object manager.
int object_manager_max_sends_;

/// Maximum number of concurrent receives allowed by the object manager.
int object_manager_max_receives_;

/// Default chunk size for multi-chunk transfers to use in the object manager.
/// In the object manager, no single thread is permitted to transfer more
/// data than what is specified by the chunk size.
/// data than what is specified by the chunk size unless the number of object
/// chunks exceeds the number of available sending threads.
uint64_t object_manager_default_chunk_size_;

/// Number of workers per process
Expand Down
28 changes: 20 additions & 8 deletions src/ray/object_manager/object_buffer_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
namespace ray {

ObjectBufferPool::ObjectBufferPool(const std::string &store_socket_name,
uint64_t chunk_size, int release_delay)
: chunk_size_(chunk_size) {
uint64_t chunk_size, int max_chunks, int release_delay)
: default_chunk_size_(chunk_size), max_chunks_(static_cast<uint64_t>(max_chunks)) {
store_socket_name_ = store_socket_name;
ARROW_CHECK_OK(store_client_.Connect(store_socket_name_.c_str(), "", release_delay));
}
Expand All @@ -24,13 +24,24 @@ ObjectBufferPool::~ObjectBufferPool() {
ARROW_CHECK_OK(store_client_.Disconnect());
}

uint64_t ObjectBufferPool::GetChunkSize(uint64_t data_size) {
// If the number of chunks generated by the default chunk size exceeds the number of
// send threads, then use a chunk size such that the number of chunks is exactly
// the number of send threads.
if (data_size / default_chunk_size_ >= max_chunks_) {
return (data_size + default_chunk_size_ - 1) / max_chunks_;
}
return default_chunk_size_;
}

uint64_t ObjectBufferPool::GetNumChunks(uint64_t data_size) {
return (data_size + chunk_size_ - 1) / chunk_size_;
uint64_t chunk_size = GetChunkSize(data_size);
return (data_size + chunk_size - 1) / chunk_size;
}

uint64_t ObjectBufferPool::GetBufferLength(uint64_t chunk_index, uint64_t data_size) {
return (chunk_index + 1) * chunk_size_ > data_size ? data_size % chunk_size_
: chunk_size_;
uint64_t chunk_size = GetChunkSize(data_size);
return (chunk_index + 1) * chunk_size > data_size ? data_size % chunk_size : chunk_size;
}

std::pair<const ObjectBufferPool::ChunkInfo &, ray::Status> ObjectBufferPool::GetChunk(
Expand Down Expand Up @@ -168,17 +179,18 @@ void ObjectBufferPool::AbortCreate(const ObjectID &object_id) {

std::vector<ObjectBufferPool::ChunkInfo> ObjectBufferPool::BuildChunks(
const ObjectID &object_id, uint8_t *data, uint64_t data_size) {
uint64_t chunk_size = GetChunkSize(data_size);
uint64_t space_remaining = data_size;
std::vector<ChunkInfo> chunks;
int64_t position = 0;
while (space_remaining) {
position = data_size - space_remaining;
if (space_remaining < chunk_size_) {
if (space_remaining < chunk_size) {
chunks.emplace_back(chunks.size(), data + position, space_remaining);
space_remaining = 0;
} else {
chunks.emplace_back(chunks.size(), data + position, chunk_size_);
space_remaining -= chunk_size_;
chunks.emplace_back(chunks.size(), data + position, chunk_size);
space_remaining -= chunk_size;
}
}
return chunks;
Expand Down
9 changes: 7 additions & 2 deletions src/ray/object_manager/object_buffer_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class ObjectBufferPool {
/// \param release_delay The number of release calls before objects are released
/// from the store client (FIFO).
ObjectBufferPool(const std::string &store_socket_name, const uint64_t chunk_size,
const int release_delay);
int max_chunks, const int release_delay);

~ObjectBufferPool();

Expand Down Expand Up @@ -125,6 +125,9 @@ class ObjectBufferPool {
void SealChunk(const ObjectID &object_id, uint64_t chunk_index);

private:
/// Gets the chunk size based on data size.
uint64_t GetChunkSize(uint64_t data_size);

/// Abort the create operation associated with an object. This destroys the buffer
/// state, including create operations in progress for all chunks of the object.
void AbortCreate(const ObjectID &object_id);
Expand Down Expand Up @@ -177,7 +180,9 @@ class ObjectBufferPool {
/// get_buffer_state_, create_buffer_state_, and store_client_.
std::mutex pool_mutex_;
/// Determines the maximum chunk size to be transferred by a single thread.
const uint64_t chunk_size_;
const uint64_t default_chunk_size_;
/// The maximum number of chunks allowed.
const uint64_t max_chunks_;
/// The state of a buffer that's currently being used.
std::unordered_map<ray::ObjectID, GetBufferState> get_buffer_state_;
/// The state of a buffer that's currently being used.
Expand Down
2 changes: 2 additions & 0 deletions src/ray/object_manager/object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ ObjectManager::ObjectManager(asio::io_service &main_service,
// release_delay of 2 * config_.max_sends is to ensure the pool does not release
// an object prematurely whenever we reach the maximum number of sends.
buffer_pool_(config_.store_socket_name, config_.object_chunk_size,
config_.max_sends,
/*release_delay=*/2 * config_.max_sends),
send_work_(send_service_),
receive_work_(receive_service_),
Expand All @@ -40,6 +41,7 @@ ObjectManager::ObjectManager(asio::io_service &main_service,
// release_delay of 2 * config_.max_sends is to ensure the pool does not release
// an object prematurely whenever we reach the maximum number of sends.
buffer_pool_(config_.store_socket_name, config_.object_chunk_size,
config_.max_sends,
/*release_delay=*/2 * config_.max_sends),
send_work_(send_service_),
receive_work_(receive_service_),
Expand Down
18 changes: 14 additions & 4 deletions src/ray/object_manager/test/object_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,6 @@ class TestObjectManagerBase : public ::testing::Test {
store_id_2 = StartStore(UniqueID::from_random().hex());

uint pull_timeout_ms = 1;
int max_sends = 2;
int max_receives = 2;
uint64_t object_chunk_size = static_cast<uint64_t>(std::pow(10, 3));
push_timeout_ms = 1000;

// start first server
Expand Down Expand Up @@ -192,6 +189,10 @@ class TestObjectManagerBase : public ::testing::Test {
std::string store_id_2;

uint push_timeout_ms;

int max_sends = 2;
int max_receives = 2;
uint64_t object_chunk_size = static_cast<uint64_t>(std::pow(10, 3));
};

class TestObjectManager : public TestObjectManagerBase {
Expand Down Expand Up @@ -436,7 +437,16 @@ class TestObjectManager : public TestObjectManagerBase {
}));
}

void TestWaitComplete() { main_service.stop(); }
void TestWaitComplete() { TestBufferPool(); }

void TestBufferPool() {
// Ensure the number of chunks generated do not exceed the number of send threads.
for (uint64_t i = object_chunk_size / 2; i < 10 * object_chunk_size; ++i) {
uint64_t num_chunks = server1->object_manager_.buffer_pool_.GetNumChunks(i);
ASSERT_LE(num_chunks, max_sends);
}
main_service.stop();
}

void TestConnections() {
RAY_LOG(DEBUG) << "\n"
Expand Down
15 changes: 12 additions & 3 deletions src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,23 @@ int main(int argc, char *argv[]) {
object_manager_config.store_socket_name = store_socket_name;
object_manager_config.pull_timeout_ms =
RayConfig::instance().object_manager_pull_timeout_ms();
object_manager_config.max_sends = RayConfig::instance().object_manager_max_sends();
object_manager_config.max_receives =
RayConfig::instance().object_manager_max_receives();
object_manager_config.push_timeout_ms =
RayConfig::instance().object_manager_push_timeout_ms();

int num_cpus = static_cast<int>(static_resource_conf["CPU"]);
object_manager_config.max_sends = std::max(1, num_cpus / 4);
object_manager_config.max_receives = std::max(1, num_cpus / 4);
object_manager_config.object_chunk_size =
RayConfig::instance().object_manager_default_chunk_size();

RAY_LOG(INFO) << "Starting object manager with configuration: \n"
"max_sends = "
<< object_manager_config.max_sends << "\n"
"max_receives = "
<< object_manager_config.max_receives << "\n"
"object_chunk_size = "
<< object_manager_config.object_chunk_size;

// initialize mock gcs & object directory
auto gcs_client = std::make_shared<ray::gcs::AsyncGcsClient>();
RAY_LOG(INFO) << "Initializing GCS client "
Expand Down

0 comments on commit 60bc3a0

Please sign in to comment.