Skip to content

Perf fix #2110

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/common/task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ TaskExecutionSpec::TaskExecutionSpec(TaskExecutionSpec *other)
spec_ = std::unique_ptr<TaskSpec[]>(spec_copy);
}

std::vector<ObjectID> TaskExecutionSpec::ExecutionDependencies() const {
const std::vector<ObjectID> &TaskExecutionSpec::ExecutionDependencies() const {
return execution_dependencies_;
}

Expand Down
2 changes: 1 addition & 1 deletion src/common/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class TaskExecutionSpec {
///
/// @return A vector of object IDs representing this task's execution
/// dependencies.
std::vector<ObjectID> ExecutionDependencies() const;
const std::vector<ObjectID> &ExecutionDependencies() const;

/// Set the task's execution dependencies.
///
Expand Down
8 changes: 4 additions & 4 deletions src/ray/object_manager/object_store_notification_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,25 +63,25 @@ void ObjectStoreNotificationManager::ProcessStoreNotification(
}

void ObjectStoreNotificationManager::ProcessStoreAdd(const ObjectInfoT &object_info) {
for (auto handler : add_handlers_) {
for (auto &handler : add_handlers_) {
handler(object_info);
}
}

void ObjectStoreNotificationManager::ProcessStoreRemove(const ObjectID &object_id) {
for (auto handler : rem_handlers_) {
for (auto &handler : rem_handlers_) {
handler(object_id);
}
}

void ObjectStoreNotificationManager::SubscribeObjAdded(
std::function<void(const ObjectInfoT &)> callback) {
add_handlers_.push_back(callback);
add_handlers_.push_back(std::move(callback));
}

void ObjectStoreNotificationManager::SubscribeObjDeleted(
std::function<void(const ObjectID &)> callback) {
rem_handlers_.push_back(callback);
rem_handlers_.push_back(std::move(callback));
}

} // namespace ray
10 changes: 6 additions & 4 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -229,13 +229,14 @@ void NodeManager::HeartbeatAdded(gcs::AsyncGcsClient *client, const ClientID &cl
}
// Locate the client id in remote client table and update available resources based on
// the received heartbeat information.
if (this->cluster_resource_map_.count(client_id) == 0) {
auto it = this->cluster_resource_map_.find(client_id);
if (it == cluster_resource_map_.end()) {
// Haven't received the client registration for this client yet, skip this heartbeat.
RAY_LOG(INFO) << "[HeartbeatAdded]: received heartbeat from unknown client id "
<< client_id;
return;
}
SchedulingResources &resources = this->cluster_resource_map_[client_id];
SchedulingResources &resources = it->second;
ResourceSet heartbeat_resource_available(heartbeat_data.resources_available_label,
heartbeat_data.resources_available_capacity);
resources.SetAvailableResources(
Expand Down Expand Up @@ -786,13 +787,14 @@ ray::Status NodeManager::ForwardTask(const Task &task, const ClientID &node_id)
auto client_info = gcs_client_->client_table().GetClient(node_id);

// Lookup remote server connection for this node_id and use it to send the request.
if (remote_server_connections_.count(node_id) == 0) {
auto it = remote_server_connections_.find(node_id);
if (it == remote_server_connections_.end()) {
// TODO(atumanov): caller must handle failure to ensure tasks are not lost.
RAY_LOG(INFO) << "No NodeManager connection found for GCS client id " << node_id;
return ray::Status::IOError("NodeManager connection not found");
}

auto &server_conn = remote_server_connections_.at(node_id);
auto &server_conn = it->second;
auto status = server_conn.WriteMessage(protocol::MessageType_ForwardTaskRequest,
fbb.GetSize(), fbb.GetBufferPointer());
if (status.ok()) {
Expand Down
2 changes: 1 addition & 1 deletion src/ray/raylet/scheduling_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ std::unordered_map<TaskID, ClientID> SchedulingPolicy::Schedule(
for (const auto &client_resource_pair : cluster_resources) {
// pair = ClientID, SchedulingResources
ClientID node_client_id = client_resource_pair.first;
SchedulingResources node_resources = client_resource_pair.second;
const auto &node_resources = client_resource_pair.second;
RAY_LOG(DEBUG) << "client_id " << node_client_id << " resources: "
<< node_resources.GetAvailableResources().ToString();
if (resource_demand.IsSubset(node_resources.GetTotalResources())) {
Expand Down
6 changes: 2 additions & 4 deletions src/ray/raylet/scheduling_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,8 @@ void removeTasksFromQueue(std::list<Task> &queue, std::unordered_set<TaskID> &ta
}

// Helper function to queue the given tasks to the given queue.
void queueTasks(std::list<Task> &queue, const std::vector<Task> &tasks) {
for (auto &task : tasks) {
queue.push_back(task);
}
inline void queueTasks(std::list<Task> &queue, const std::vector<Task> &tasks) {
queue.insert(queue.end(), tasks.begin(), tasks.end());
}

std::vector<Task> SchedulingQueue::RemoveTasks(std::unordered_set<TaskID> task_ids) {
Expand Down
5 changes: 3 additions & 2 deletions src/ray/raylet/worker_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ void WorkerPool::RegisterWorker(std::shared_ptr<Worker> worker) {
auto pid = worker->Pid();
RAY_LOG(DEBUG) << "Registering worker with pid " << pid;
registered_workers_.push_back(std::move(worker));
RAY_CHECK(started_worker_pids_.count(pid) > 0);
started_worker_pids_.erase(pid);
auto it = started_worker_pids_.find(pid);
RAY_CHECK(it != started_worker_pids_.end());
started_worker_pids_.erase(it);
}

std::shared_ptr<Worker> WorkerPool::GetRegisteredWorker(
Expand Down