Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the bug of unregistered workers in worker pool #7343

Merged
merged 14 commits into from
Mar 2, 2020
4 changes: 4 additions & 0 deletions java/test/src/main/java/org/ray/api/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ public static void skipTestUnderClusterMode() {
}
}

public static boolean isDirectActorCallEnabled() {
return ActorCreationOptions.DEFAULT_USE_DIRECT_CALL;
}

public static void skipTestIfDirectActorCallEnabled() {
skipTestIfDirectActorCallEnabled(true);
}
Expand Down
11 changes: 8 additions & 3 deletions java/test/src/main/java/org/ray/api/test/PlasmaFreeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,14 @@ public void testDeleteObjects() {
Ray.internal().free(ImmutableList.of(helloId.getId()), true, false);

final boolean result = TestUtils.waitForCondition(() ->
TestUtils.getRuntime().getObjectStore()
.wait(ImmutableList.of(helloId.getId()), 1, 0).get(0) == false, 50);
Assert.assertTrue(result);
!TestUtils.getRuntime().getObjectStore()
.wait(ImmutableList.of(helloId.getId()), 1, 0).get(0), 50);
if (TestUtils.isDirectActorCallEnabled()) {
// Direct call will not delete object from im-memory store.
Assert.assertFalse(result);
} else {
Assert.assertTrue(result);
}
}

@Test
Expand Down
8 changes: 5 additions & 3 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,11 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
object_manager_profile_timer_(io_service),
initial_config_(config),
local_available_resources_(config.resource_config),
worker_pool_(io_service, config.num_initial_workers,
config.maximum_startup_concurrency, gcs_client_,
config.worker_commands),
worker_pool_(
io_service, config.num_initial_workers, config.maximum_startup_concurrency,
gcs_client_, config.worker_commands,
/*starting_worker_timeout_callback=*/
[this]() { this->DispatchTasks(this->local_queues_.GetReadyTasksByClass()); }),
scheduling_policy_(local_queues_),
reconstruction_policy_(
io_service_,
Expand Down
30 changes: 28 additions & 2 deletions src/ray/raylet/worker_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <sys/wait.h>

#include <algorithm>
#include <boost/date_time/posix_time/posix_time.hpp>

#include "ray/common/constants.h"
#include "ray/common/ray_config.h"
Expand Down Expand Up @@ -44,10 +45,12 @@ namespace raylet {
WorkerPool::WorkerPool(boost::asio::io_service &io_service, int num_workers,
int maximum_startup_concurrency,
std::shared_ptr<gcs::GcsClient> gcs_client,
const WorkerCommandMap &worker_commands)
const WorkerCommandMap &worker_commands,
std::function<void()> starting_worker_timeout_callback)
: io_service_(&io_service),
maximum_startup_concurrency_(maximum_startup_concurrency),
gcs_client_(std::move(gcs_client)) {
gcs_client_(std::move(gcs_client)),
starting_worker_timeout_callback_(starting_worker_timeout_callback) {
RAY_CHECK(maximum_startup_concurrency > 0);
#ifndef _WIN32
// Ignore SIGCHLD signals. If we don't do this, then worker processes will
Expand Down Expand Up @@ -190,10 +193,33 @@ Process WorkerPool::StartWorkerProcess(const Language &language,
Process proc = StartProcess(worker_command_args);
RAY_LOG(DEBUG) << "Started worker process of " << workers_to_start
<< " worker(s) with pid " << proc.GetId();
MonitorStartingWorkerProcess(proc, language);
state.starting_worker_processes.emplace(proc, workers_to_start);
return proc;
}

void WorkerPool::MonitorStartingWorkerProcess(const Process &proc, const Language &language) {
constexpr static size_t timeout_seconds = 30;
jovany-wang marked this conversation as resolved.
Show resolved Hide resolved
auto timer = std::make_shared<boost::asio::deadline_timer>(
*io_service_, boost::posix_time::seconds(timeout_seconds));
// Capture timer in lambda to copy it once, so that it can avoid destructing timer.
timer->async_wait([timer, language, proc, this] (const boost::system::error_code e) -> void {
// check the error code.
auto &state = this->GetStateForLanguage(language);
// Since this process is timeout to start, remove it from starting_worker_processes
jovany-wang marked this conversation as resolved.
Show resolved Hide resolved
// to avoid the zombie worker.
auto it = state.starting_worker_processes.find(proc);
if (it != state.starting_worker_processes.end()) {
RAY_LOG(INFO) << "Some workers of the worker process(" << proc.GetId()
<< ") have not been registering to raylet.";
jovany-wang marked this conversation as resolved.
Show resolved Hide resolved
state.starting_worker_processes.erase(it);
if (starting_worker_timeout_callback_ != nullptr) {
starting_worker_timeout_callback_();
}
}
});
}

Process WorkerPool::StartProcess(const std::vector<std::string> &worker_command_args) {
if (RAY_LOG_ENABLED(DEBUG)) {
std::stringstream stream;
Expand Down
16 changes: 14 additions & 2 deletions src/ray/raylet/worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@ class WorkerPool {
/// resources on the machine).
/// \param worker_commands The commands used to start the worker process, grouped by
/// language.
/// \param starting_worker_timeout_callback The callback will be triggered once it's
jovany-wang marked this conversation as resolved.
Show resolved Hide resolved
/// timeout to start a worker.
WorkerPool(boost::asio::io_service &io_service, int num_workers,
int maximum_startup_concurrency, std::shared_ptr<gcs::GcsClient> gcs_client,
const WorkerCommandMap &worker_commands);
const WorkerCommandMap &worker_commands,
std::function<void()> starting_worker_timeout_callback);

/// Destructor responsible for freeing a set of workers owned by this class.
virtual ~WorkerPool();
Expand Down Expand Up @@ -221,13 +224,22 @@ class WorkerPool {
/// for a given language.
State &GetStateForLanguage(const Language &language);

/// Start a timer to monitor the starting worker process.
///
/// If any workers in this process don't register within the timeout
/// (due to worker process crash or any other reasons), remove them
/// from `starting_worker_processes`. Otherwise if we'll mistakenly
/// think there are unregistered workers, and won't start new workers.
void MonitorStartingWorkerProcess(const Process &proc, const Language &language);

/// For Process class for managing subprocesses (e.g. reaping zombies).
boost::asio::io_service *io_service_;
/// The maximum number of worker processes that can be started concurrently.
int maximum_startup_concurrency_;
/// A client connection to the GCS.
std::shared_ptr<gcs::GcsClient> gcs_client_;

/// The callback will be triggered once it's timeout to start a worker.
jovany-wang marked this conversation as resolved.
Show resolved Hide resolved
std::function<void()> starting_worker_timeout_callback_;
FRIEND_TEST(WorkerPoolTest, InitialWorkerProcessCount);
};

Expand Down
3 changes: 2 additions & 1 deletion src/ray/raylet/worker_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class WorkerPoolMock : public WorkerPool {

explicit WorkerPoolMock(boost::asio::io_service &io_service,
const WorkerCommandMap &worker_commands)
: WorkerPool(io_service, 0, MAXIMUM_STARTUP_CONCURRENCY, nullptr, worker_commands),
: WorkerPool(io_service, 0, MAXIMUM_STARTUP_CONCURRENCY, nullptr, worker_commands,
nullptr),
jovany-wang marked this conversation as resolved.
Show resolved Hide resolved
last_worker_process_() {
for (auto &entry : states_by_lang_) {
entry.second.num_workers_per_process = NUM_WORKERS_PER_PROCESS;
Expand Down