Skip to content

Commit

Permalink
Fix the bug of unregistered workers in worker pool (ray-project#7343)
Browse files Browse the repository at this point in the history
* Fix

* Fix

* Fix complie

* Fix lint

* Fix linting

* Fix testDeleteObject

* Fix linting

* Update src/ray/raylet/worker_pool.cc

Co-Authored-By: Hao Chen <chenh1024@gmail.com>

* Update src/ray/raylet/worker_pool.cc

Co-Authored-By: Hao Chen <chenh1024@gmail.com>

* Update src/ray/raylet/worker_pool.h

Co-Authored-By: Hao Chen <chenh1024@gmail.com>

* Update src/ray/raylet/worker_pool.cc

Co-Authored-By: Hao Chen <chenh1024@gmail.com>

* Address comments.

* FIx linting

Co-authored-by: Hao Chen <chenh1024@gmail.com>
  • Loading branch information
jovany-wang and raulchen authored Mar 2, 2020
1 parent 0792b5c commit 2771af1
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 11 deletions.
4 changes: 4 additions & 0 deletions java/test/src/main/java/org/ray/api/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ public static void skipTestUnderClusterMode() {
}
}

public static boolean isDirectActorCallEnabled() {
return ActorCreationOptions.DEFAULT_USE_DIRECT_CALL;
}

public static void skipTestIfDirectActorCallEnabled() {
skipTestIfDirectActorCallEnabled(true);
}
Expand Down
11 changes: 8 additions & 3 deletions java/test/src/main/java/org/ray/api/test/PlasmaFreeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,14 @@ public void testDeleteObjects() {
Ray.internal().free(ImmutableList.of(helloId.getId()), true, false);

final boolean result = TestUtils.waitForCondition(() ->
TestUtils.getRuntime().getObjectStore()
.wait(ImmutableList.of(helloId.getId()), 1, 0).get(0) == false, 50);
Assert.assertTrue(result);
!TestUtils.getRuntime().getObjectStore()
.wait(ImmutableList.of(helloId.getId()), 1, 0).get(0), 50);
if (TestUtils.isDirectActorCallEnabled()) {
// Direct call will not delete object from im-memory store.
Assert.assertFalse(result);
} else {
Assert.assertTrue(result);
}
}

@Test
Expand Down
8 changes: 5 additions & 3 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,11 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
object_manager_profile_timer_(io_service),
initial_config_(config),
local_available_resources_(config.resource_config),
worker_pool_(io_service, config.num_initial_workers,
config.maximum_startup_concurrency, gcs_client_,
config.worker_commands),
worker_pool_(
io_service, config.num_initial_workers, config.maximum_startup_concurrency,
gcs_client_, config.worker_commands,
/*starting_worker_timeout_callback=*/
[this]() { this->DispatchTasks(this->local_queues_.GetReadyTasksByClass()); }),
scheduling_policy_(local_queues_),
reconstruction_policy_(
io_service_,
Expand Down
30 changes: 28 additions & 2 deletions src/ray/raylet/worker_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <sys/wait.h>

#include <algorithm>
#include <boost/date_time/posix_time/posix_time.hpp>

#include "ray/common/constants.h"
#include "ray/common/ray_config.h"
Expand Down Expand Up @@ -44,10 +45,12 @@ namespace raylet {
WorkerPool::WorkerPool(boost::asio::io_service &io_service, int num_workers,
int maximum_startup_concurrency,
std::shared_ptr<gcs::GcsClient> gcs_client,
const WorkerCommandMap &worker_commands)
const WorkerCommandMap &worker_commands,
std::function<void()> starting_worker_timeout_callback)
: io_service_(&io_service),
maximum_startup_concurrency_(maximum_startup_concurrency),
gcs_client_(std::move(gcs_client)) {
gcs_client_(std::move(gcs_client)),
starting_worker_timeout_callback_(starting_worker_timeout_callback) {
RAY_CHECK(maximum_startup_concurrency > 0);
#ifndef _WIN32
// Ignore SIGCHLD signals. If we don't do this, then worker processes will
Expand Down Expand Up @@ -190,10 +193,33 @@ Process WorkerPool::StartWorkerProcess(const Language &language,
Process proc = StartProcess(worker_command_args);
RAY_LOG(DEBUG) << "Started worker process of " << workers_to_start
<< " worker(s) with pid " << proc.GetId();
MonitorStartingWorkerProcess(proc, language);
state.starting_worker_processes.emplace(proc, workers_to_start);
return proc;
}

void WorkerPool::MonitorStartingWorkerProcess(const Process &proc,
const Language &language) {
constexpr static size_t worker_register_timeout_seconds = 30;
auto timer = std::make_shared<boost::asio::deadline_timer>(
*io_service_, boost::posix_time::seconds(worker_register_timeout_seconds));
// Capture timer in lambda to copy it once, so that it can avoid destructing timer.
timer->async_wait(
[timer, language, proc, this](const boost::system::error_code e) -> void {
// check the error code.
auto &state = this->GetStateForLanguage(language);
// Since this process times out to start, remove it from starting_worker_processes
// to avoid the zombie worker.
auto it = state.starting_worker_processes.find(proc);
if (it != state.starting_worker_processes.end()) {
RAY_LOG(INFO) << "Some workers of the worker process(" << proc.GetId()
<< ") have not registered to raylet within timeout.";
state.starting_worker_processes.erase(it);
starting_worker_timeout_callback_();
}
});
}

Process WorkerPool::StartProcess(const std::vector<std::string> &worker_command_args) {
if (RAY_LOG_ENABLED(DEBUG)) {
std::stringstream stream;
Expand Down
16 changes: 14 additions & 2 deletions src/ray/raylet/worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@ class WorkerPool {
/// resources on the machine).
/// \param worker_commands The commands used to start the worker process, grouped by
/// language.
/// \param starting_worker_timeout_callback The callback that will be triggered once
/// it times out to start a worker.
WorkerPool(boost::asio::io_service &io_service, int num_workers,
int maximum_startup_concurrency, std::shared_ptr<gcs::GcsClient> gcs_client,
const WorkerCommandMap &worker_commands);
const WorkerCommandMap &worker_commands,
std::function<void()> starting_worker_timeout_callback);

/// Destructor responsible for freeing a set of workers owned by this class.
virtual ~WorkerPool();
Expand Down Expand Up @@ -221,13 +224,22 @@ class WorkerPool {
/// for a given language.
State &GetStateForLanguage(const Language &language);

/// Start a timer to monitor the starting worker process.
///
/// If any workers in this process don't register within the timeout
/// (due to worker process crash or any other reasons), remove them
/// from `starting_worker_processes`. Otherwise if we'll mistakenly
/// think there are unregistered workers, and won't start new workers.
void MonitorStartingWorkerProcess(const Process &proc, const Language &language);

/// For Process class for managing subprocesses (e.g. reaping zombies).
boost::asio::io_service *io_service_;
/// The maximum number of worker processes that can be started concurrently.
int maximum_startup_concurrency_;
/// A client connection to the GCS.
std::shared_ptr<gcs::GcsClient> gcs_client_;

/// The callback that will be triggered once it times out to start a worker.
std::function<void()> starting_worker_timeout_callback_;
FRIEND_TEST(WorkerPoolTest, InitialWorkerProcessCount);
};

Expand Down
3 changes: 2 additions & 1 deletion src/ray/raylet/worker_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class WorkerPoolMock : public WorkerPool {

explicit WorkerPoolMock(boost::asio::io_service &io_service,
const WorkerCommandMap &worker_commands)
: WorkerPool(io_service, 0, MAXIMUM_STARTUP_CONCURRENCY, nullptr, worker_commands),
: WorkerPool(io_service, 0, MAXIMUM_STARTUP_CONCURRENCY, nullptr, worker_commands,
[]() {}),
last_worker_process_() {
for (auto &entry : states_by_lang_) {
entry.second.num_workers_per_process = NUM_WORKERS_PER_PROCESS;
Expand Down

0 comments on commit 2771af1

Please sign in to comment.