Skip to content

Commit

Permalink
Suppress duplicate pre-emptive object pushes. (ray-project#3276)
Browse files Browse the repository at this point in the history
* Suppress duplicate pre-emptive object pushes.

* Add test.

* Fix linting

* Remove timer and inline recent_pushes_ into local_objects_.

* Improve test.

* Fix

* Fix linting

* Enable retrying pull from same object manager. Randomize object manager.

* Speed up test

* Linting

* Add test.

* Minor

* Lengthen pull timeout and reissue pull every time a new object becomes available.

* Increase pull timeout in test.

* Wait for nodes to start in object manager test.

* Wait longer for nodes to start up in test.

* Small fixes.

* _submit -> _remote

* Change assert to warning.
  • Loading branch information
robertnishihara authored and pcmoritz committed Nov 17, 2018
1 parent ab1e0f5 commit 5cbc597
Show file tree
Hide file tree
Showing 12 changed files with 417 additions and 44 deletions.
2 changes: 1 addition & 1 deletion src/ray/common/client_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ ClientConnection<T>::ClientConnection(MessageHandler<T> &message_handler,
debug_label_(debug_label) {}

template <class T>
const ClientID &ClientConnection<T>::GetClientID() {
const ClientID &ClientConnection<T>::GetClientId() {
return client_id_;
}

Expand Down
2 changes: 1 addition & 1 deletion src/ray/common/client_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class ClientConnection : public ServerConnection<T> {
}

/// \return The ClientID of the remote client.
const ClientID &GetClientID();
const ClientID &GetClientId();

/// \param client_id The ClientID of the remote client.
void SetClientID(const ClientID &client_id);
Expand Down
6 changes: 3 additions & 3 deletions src/ray/object_manager/connection_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ void ConnectionPool::RegisterReceiver(ConnectionType type, const ClientID &clien

void ConnectionPool::RemoveReceiver(std::shared_ptr<TcpClientConnection> conn) {
std::unique_lock<std::mutex> guard(connection_mutex);
ClientID client_id = conn->GetClientID();
const ClientID client_id = conn->GetClientId();
if (message_receive_connections_.count(client_id) != 0) {
Remove(message_receive_connections_, client_id, conn);
}
Expand All @@ -40,7 +40,7 @@ void ConnectionPool::RegisterSender(ConnectionType type, const ClientID &client_

void ConnectionPool::RemoveSender(const std::shared_ptr<SenderConnection> &conn) {
std::unique_lock<std::mutex> guard(connection_mutex);
ClientID client_id = conn->GetClientID();
const ClientID client_id = conn->GetClientId();
if (message_send_connections_.count(client_id) != 0) {
Remove(message_send_connections_, client_id, conn);
}
Expand Down Expand Up @@ -68,7 +68,7 @@ void ConnectionPool::ReleaseSender(ConnectionType type,
SenderMapType &conn_map = (type == ConnectionType::MESSAGE)
? available_message_send_connections_
: available_transfer_send_connections_;
Return(conn_map, conn->GetClientID(), conn);
Return(conn_map, conn->GetClientId(), conn);
}

void ConnectionPool::Add(ReceiverMapType &conn_map, const ClientID &client_id,
Expand Down
97 changes: 66 additions & 31 deletions src/ray/object_manager/object_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ ObjectManager::ObjectManager(asio::io_service &main_service,
/*release_delay=*/2 * config_.max_sends),
send_work_(send_service_),
receive_work_(receive_service_),
connection_pool_() {
connection_pool_(),
gen_(std::chrono::high_resolution_clock::now().time_since_epoch().count()) {
RAY_CHECK(config_.max_sends > 0);
RAY_CHECK(config_.max_receives > 0);
main_service_ = &main_service;
Expand All @@ -47,7 +48,8 @@ ObjectManager::ObjectManager(asio::io_service &main_service,
/*release_delay=*/2 * config_.max_sends),
send_work_(send_service_),
receive_work_(receive_service_),
connection_pool_() {
connection_pool_(),
gen_(std::chrono::high_resolution_clock::now().time_since_epoch().count()) {
RAY_CHECK(config_.max_sends > 0);
RAY_CHECK(config_.max_receives > 0);
// TODO(hme) Client ID is never set with this constructor.
Expand Down Expand Up @@ -94,7 +96,7 @@ void ObjectManager::HandleObjectAdded(
// Notify the object directory that the object has been added to this node.
ObjectID object_id = ObjectID::from_binary(object_info.object_id);
RAY_CHECK(local_objects_.count(object_id) == 0);
local_objects_[object_id] = object_info;
local_objects_[object_id].object_info = object_info;
ray::Status status =
object_directory_->ReportObjectAdded(object_id, client_id_, object_info);

Expand Down Expand Up @@ -174,15 +176,10 @@ ray::Status ObjectManager::Pull(const ObjectID &object_id) {
it->second.timer_set = false;
}
} else {
// New object locations were found.
if (!it->second.timer_set) {
// The timer was not set, which means that we weren't trying any
// clients. We now have some clients to try, so begin trying to
// Pull from one. If we fail to receive an object within the pull
// timeout, then this will try the rest of the clients in the list
// in succession.
TryPull(object_id);
}
// New object locations were found, so begin trying to pull from a
// client. This will be called every time a new client location
// appears.
TryPull(object_id);
}
});
}
Expand All @@ -193,19 +190,30 @@ void ObjectManager::TryPull(const ObjectID &object_id) {
return;
}

auto &client_vector = it->second.client_locations;

// The timer should never fire if there are no expected client locations.
RAY_CHECK(!it->second.client_locations.empty());
RAY_CHECK(!client_vector.empty());
RAY_CHECK(local_objects_.count(object_id) == 0);

// Get the next client to try.
const ClientID client_id = std::move(it->second.client_locations.back());
it->second.client_locations.pop_back();
// Make sure that there is at least one client which is not the local client.
// TODO(rkn): It may actually be possible for this check to fail.
RAY_CHECK(client_vector.size() != 1 || client_vector[0] != client_id_);

// Choose a random client to pull the object from.
// Generate a random index.
std::uniform_int_distribution<int> distribution(0, client_vector.size() - 1);
int client_index = distribution(gen_);
ClientID client_id = client_vector[client_index];
// If the object manager somehow ended up choosing itself, choose a different
// object manager.
if (client_id == client_id_) {
// If we're trying to pull from ourselves, skip this client and try the
// next one.
RAY_LOG(ERROR) << client_id_ << " attempted to pull an object from itself.";
const ClientID client_id = std::move(it->second.client_locations.back());
it->second.client_locations.pop_back();
std::swap(client_vector[client_index], client_vector[client_vector.size() - 1]);
client_vector.pop_back();
RAY_LOG(ERROR) << "The object manager with client ID " << client_id_
<< " is trying to pull object " << object_id
<< " but the object table suggests that this object manager "
<< "already has the object.";
client_id = client_vector[client_index % client_vector.size()];
RAY_CHECK(client_id != client_id_);
}

Expand Down Expand Up @@ -379,10 +387,33 @@ void ObjectManager::Push(const ObjectID &object_id, const ClientID &client_id) {
return;
}

// If we haven't pushed this object to this same object manager yet, then push
// it. If we have, but it was a long time ago, then push it. If we have and it
// was recent, then don't do it again.
auto &recent_pushes = local_objects_[object_id].recent_pushes;
auto it = recent_pushes.find(client_id);
if (it == recent_pushes.end()) {
// We haven't pushed this specific object to this specific object manager
// yet (or if we have then the object must have been evicted and recreated
// locally).
recent_pushes[client_id] = current_sys_time_ms();
} else {
int64_t current_time = current_sys_time_ms();
if (current_time - it->second <=
RayConfig::instance().object_manager_repeated_push_delay_ms()) {
// We pushed this object to the object manager recently, so don't do it
// again.
return;
} else {
it->second = current_time;
}
}

RemoteConnectionInfo connection_info(client_id);
object_directory_->LookupRemoteConnectionInfo(connection_info);
if (connection_info.Connected()) {
const object_manager::protocol::ObjectInfoT &object_info = local_objects_[object_id];
const object_manager::protocol::ObjectInfoT &object_info =
local_objects_[object_id].object_info;
uint64_t data_size =
static_cast<uint64_t>(object_info.data_size + object_info.metadata_size);
uint64_t metadata_size = static_cast<uint64_t>(object_info.metadata_size);
Expand All @@ -397,11 +428,11 @@ void ObjectManager::Push(const ObjectID &object_id, const ClientID &client_id) {
// object manager from another object manager.
ray::Status status = ExecuteSendObject(
client_id, object_id, data_size, metadata_size, chunk_index, connection_info);
double end_time = current_sys_time_seconds();

// Notify the main thread that we have finished sending the chunk.
main_service_->post(
[this, object_id, client_id, chunk_index, start_time, status]() {
double end_time = current_sys_time_seconds();
[this, object_id, client_id, chunk_index, start_time, end_time, status]() {
HandleSendFinished(object_id, client_id, chunk_index, start_time, end_time,
status);
});
Expand Down Expand Up @@ -746,6 +777,9 @@ void ObjectManager::ConnectClient(std::shared_ptr<TcpClientConnection> &conn,
void ObjectManager::DisconnectClient(std::shared_ptr<TcpClientConnection> &conn,
const uint8_t *message) {
connection_pool_.RemoveReceiver(conn);

// We don't need to clean up unfulfilled_push_requests_ because the
// unfulfilled push timers will fire and clean it up.
}

void ObjectManager::ReceivePullRequest(std::shared_ptr<TcpClientConnection> &conn,
Expand Down Expand Up @@ -777,15 +811,16 @@ void ObjectManager::ReceivePushRequest(std::shared_ptr<TcpClientConnection> &con
uint64_t metadata_size = object_header->metadata_size();
receive_service_.post([this, object_id, data_size, metadata_size, chunk_index, conn]() {
double start_time = current_sys_time_seconds();
const ClientID client_id = conn->GetClientID();
const ClientID client_id = conn->GetClientId();
auto status = ExecuteReceiveObject(client_id, object_id, data_size, metadata_size,
chunk_index, *conn);
double end_time = current_sys_time_seconds();
// Notify the main thread that we have finished receiving the object.
main_service_->post([this, object_id, client_id, chunk_index, start_time, status]() {
double end_time = current_sys_time_seconds();
HandleReceiveFinished(object_id, client_id, chunk_index, start_time, end_time,
status);
});
main_service_->post(
[this, object_id, client_id, chunk_index, start_time, end_time, status]() {
HandleReceiveFinished(object_id, client_id, chunk_index, start_time, end_time,
status);
});

});
}
Expand Down
23 changes: 20 additions & 3 deletions src/ray/object_manager/object_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <deque>
#include <map>
#include <memory>
#include <random>
#include <thread>

#include <boost/asio.hpp>
Expand Down Expand Up @@ -51,6 +52,14 @@ struct ObjectManagerConfig {
int push_timeout_ms;
};

struct LocalObjectInfo {
/// Information from the object store about the object.
object_manager::protocol::ObjectInfoT object_info;
/// A map from the ID of a remote object manager to the timestamp of when
/// the object was last pushed to that object manager (if a push took place).
std::unordered_map<ClientID, int64_t> recent_pushes;
};

class ObjectManagerInterface {
public:
virtual ray::Status Pull(const ObjectID &object_id) = 0;
Expand Down Expand Up @@ -102,7 +111,9 @@ class ObjectManager : public ObjectManagerInterface {
/// \return Status of whether adding the subscription succeeded.
ray::Status SubscribeObjDeleted(std::function<void(const ray::ObjectID &)> callback);

/// Push an object to to the node manager on the node corresponding to client id.
/// Consider pushing an object to a remote object manager. This object manager
/// may choose to ignore the Push call (e.g., if Push is called twice in a row
/// on the same object, the second one might be ignored).
///
/// \param object_id The object's object id.
/// \param client_id The remote node's client id.
Expand Down Expand Up @@ -382,8 +393,9 @@ class ObjectManager : public ObjectManagerInterface {
/// Connection pool for reusing outgoing connections to remote object managers.
ConnectionPool connection_pool_;

/// Cache of locally available objects.
std::unordered_map<ObjectID, object_manager::protocol::ObjectInfoT> local_objects_;
/// Mapping from locally available objects to information about those objects
/// including when the object was last pushed to other object managers.
std::unordered_map<ObjectID, LocalObjectInfo> local_objects_;

/// This is used as the callback identifier in Pull for
/// SubscribeObjectLocations. We only need one identifier because we never need to
Expand All @@ -400,11 +412,16 @@ class ObjectManager : public ObjectManagerInterface {
std::unordered_map<ClientID, std::unique_ptr<boost::asio::deadline_timer>>>
unfulfilled_push_requests_;

/// The objects that this object manager is currently trying to fetch from
/// remote object managers.
std::unordered_map<ObjectID, PullRequest> pull_requests_;

/// Profiling events that are to be batched together and added to the profile
/// table in the GCS.
std::vector<ProfileEventT> profile_events_;

/// Internally maintained random number generator.
std::mt19937_64 gen_;
};

} // namespace ray
Expand Down
2 changes: 1 addition & 1 deletion src/ray/object_manager/object_manager_client_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class SenderConnection : public boost::enable_shared_from_this<SenderConnection>
}

/// \return The ClientID of this connection.
const ClientID &GetClientID() { return client_id_; }
const ClientID &GetClientId() { return client_id_; }

private:
bool operator==(const SenderConnection &rhs) const {
Expand Down
2 changes: 1 addition & 1 deletion src/ray/object_manager/test/object_manager_stress_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class TestObjectManagerBase : public ::testing::Test {
store_id_1 = StartStore(UniqueID::from_random().hex());
store_id_2 = StartStore(UniqueID::from_random().hex());

uint pull_timeout_ms = 1;
uint pull_timeout_ms = 1000;
int max_sends_a = 2;
int max_receives_a = 2;
int max_sends_b = 3;
Expand Down
12 changes: 11 additions & 1 deletion src/ray/ray_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ class RayConfig {

int object_manager_push_timeout_ms() const { return object_manager_push_timeout_ms_; }

int object_manager_repeated_push_delay_ms() const {
return object_manager_repeated_push_delay_ms_;
}
uint64_t object_manager_default_chunk_size() const {
return object_manager_default_chunk_size_;
}
Expand Down Expand Up @@ -183,6 +186,8 @@ class RayConfig {
object_manager_push_timeout_ms_ = pair.second;
} else if (pair.first == "object_manager_default_chunk_size") {
object_manager_default_chunk_size_ = pair.second;
} else if (pair.first == "object_manager_repeated_push_delay_ms") {
object_manager_repeated_push_delay_ms_ = pair.second;
} else {
RAY_LOG(FATAL) << "Received unexpected config parameter " << pair.first;
}
Expand Down Expand Up @@ -224,8 +229,9 @@ class RayConfig {
max_tasks_to_spillback_(10),
actor_creation_num_spillbacks_warning_(100),
node_manager_forward_task_retry_timeout_milliseconds_(1000),
object_manager_pull_timeout_ms_(100),
object_manager_pull_timeout_ms_(10000),
object_manager_push_timeout_ms_(10000),
object_manager_repeated_push_delay_ms_(60000),
object_manager_default_chunk_size_(1000000),
num_workers_per_process_(1),
initialized_(false) {}
Expand Down Expand Up @@ -348,6 +354,10 @@ class RayConfig {
/// 0: giving up retrying immediately.
int object_manager_push_timeout_ms_;

/// The period of time that an object manager will wait before pushing the
/// same object again to a specific object manager.
int object_manager_repeated_push_delay_ms_;

/// Default chunk size for multi-chunk transfers to use in the object manager.
/// In the object manager, no single thread is permitted to transfer more
/// data than what is specified by the chunk size unless the number of object
Expand Down
2 changes: 1 addition & 1 deletion src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ void NodeManager::ProcessDisconnectClientMessage(
DispatchTasks(local_queues_.GetReadyTasks());
} else if (is_driver) {
// The client is a driver.
RAY_CHECK_OK(gcs_client_->driver_table().AppendDriverData(client->GetClientID(),
RAY_CHECK_OK(gcs_client_->driver_table().AppendDriverData(client->GetClientId(),
/*is_dead=*/true));
auto driver_id = worker->GetAssignedTaskId();
RAY_CHECK(!driver_id.is_nil());
Expand Down
2 changes: 2 additions & 0 deletions test/jenkins_tests/run_multi_node_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,8 @@ docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA \
--stop '{"training_iteration": 2}' \
--config '{"num_workers": 2, "use_pytorch": true, "sample_async": false}'

docker run --rm --shm-size=10G --memory=10G $DOCKER_SHA python -m pytest /ray/test/object_manager_test.py

python3 $ROOT_DIR/multi_node_docker_test.py \
--docker-image=$DOCKER_SHA \
--num-nodes=5 \
Expand Down
Loading

0 comments on commit 5cbc597

Please sign in to comment.