Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] New scheduler fixes #9186

Merged
merged 23 commits into from
Jul 9, 2020
Merged
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
.
wuisawesome committed Jun 29, 2020
commit 3a71cef3e2dff39b4ca982c6943bfaba19b76da0
2 changes: 1 addition & 1 deletion python/ray/log_monitor.py
Original file line number Diff line number Diff line change
@@ -113,7 +113,7 @@ def update_log_filenames(self):
self.logs_dir))
# segfaults and other serious errors are logged here
raylet_err_paths = glob.glob("{}/raylet*.err".format(self.logs_dir))
for file_path in log_file_paths:
for file_path in log_file_paths + raylet_err_paths:
if os.path.isfile(
file_path) and file_path not in self.log_filenames:
self.log_filenames.add(file_path)
3 changes: 1 addition & 2 deletions python/ray/test_utils.py
Original file line number Diff line number Diff line change
@@ -259,8 +259,7 @@ async def locked(self):


def dicts_equal(dict1, dict2, abs_tol=1e-4):
'''Compares to dicts whose values may be floating point numbers for
equality'''
"""Compares to dicts whose values may be floating point numbers."""

if dict1.keys() != dict2.keys():
return False
12 changes: 2 additions & 10 deletions python/ray/tests/test_basic.py
Original file line number Diff line number Diff line change
@@ -125,18 +125,13 @@ def f(block, accepted_resources):
resource: value[0][1]
for resource, value in ray.get_resource_ids().items()
}
print("-------------------")
print("Accepted resources:", accepted_resources)
print("True resources", true_resources)
print(ray.test_utils.dicts_equal(true_resources, accepted_resources))
print("-------------------")
if block:
ray.get(g.remote())
return ray.test_utils.dicts_equal(true_resources, accepted_resources)

# Check that the resource are assigned correctly.
result_ids = []
for rand1, rand2, rand3 in np.random.uniform(size=(10, 3)):
for rand1, rand2, rand3 in np.random.uniform(size=(100, 3)):
resource_set = {"CPU": int(rand1 * 10000) / 10000}
result_ids.append(f._remote([False, resource_set], num_cpus=rand1))

@@ -164,10 +159,7 @@ def f(block, accepted_resources):
num_cpus=rand1,
num_gpus=rand2,
resources={"Custom": rand3}))

results = ray.get(result_ids)
print("results: ", results)
assert all(results)
assert all(result_ids)

# Check that the available resources at the end are the same as the
# beginning.
145 changes: 93 additions & 52 deletions python/ray/worker.py
Original file line number Diff line number Diff line change
@@ -46,12 +46,8 @@
ObjectStoreFullError,
)
from ray.function_manager import FunctionActorManager
from ray.utils import (
_random_string,
check_oversized_pickle,
is_cython,
setup_logger,
)
from ray.utils import (_random_string, check_oversized_pickle, is_cython,
setup_logger, create_and_init_new_worker_log, open_log)

SCRIPT_MODE = 0
WORKER_MODE = 1
@@ -895,6 +891,68 @@ def custom_excepthook(type, value, tb):
UNCAUGHT_ERROR_GRACE_PERIOD = 5


def _set_log_file(file_name, worker_pid, old_obj, setter_func):
# Line-buffer the output (mode 1).
f = create_and_init_new_worker_log(file_name, worker_pid)

# TODO (Alex): Python seems to always flush when writing. If that is no
# longer true, then we need to manually flush the old buffer.
# old_obj.flush()

# TODO (Alex): Flush the c/c++ userspace buffers if necessary.
# `fflush(stdout); cout.flush();`

fileno = old_obj.fileno()

# C++ logging requires redirecting the stdout file descriptor. Note that
# dup2 will automatically close the old file descriptor before overriding
# it.
os.dup2(f.fileno(), fileno)

# We also manually set sys.stdout and sys.stderr because that seems to
# have an effect on the output buffering. Without doing this, stdout
# and stderr are heavily buffered resulting in seemingly lost logging
# statements. We never want to close the stdout file descriptor, dup2 will
# close it when necessary and we don't want python's GC to close it.
setter_func(open_log(fileno, closefd=False))

return os.path.abspath(f.name)


def set_log_file(stdout_name, stderr_name):
"""Sets up logging for the current worker, creating the (fd backed) file and
flushing buffers as is necessary.

Args:
stdout_name (str): The file name that stdout should be written to.
stderr_name(str): The file name that stderr should be written to.

Returns:
(tuple) The absolute paths of the files that stdout and stderr will be
written to.

"""
stdout_path = ""
stderr_path = ""
worker_pid = os.getpid()

# lambda cannot contain assignment
def stdout_setter(x):
sys.stdout = x

def stderr_setter(x):
sys.stderr = x

if stdout_name:
_set_log_file(stdout_name, worker_pid, sys.stdout, stdout_setter)

# The stderr case should be analogous to the stdout case
if stderr_name:
_set_log_file(stderr_name, worker_pid, sys.stderr, stderr_setter)

return stdout_path, stderr_path


def print_logs(redis_client, threads_stopped):
"""Prints log messages from workers on all of the nodes.

@@ -932,18 +990,16 @@ def color_for(data):
else:
return colorama.Fore.CYAN

# if data["ip"] == localhost:
# for line in data["lines"]:
# print("{}{}(pid={}){} {}".format(
# colorama.Style.DIM, color_for(data), data["pid"],
# colorama.Style.RESET_ALL, line))
# else:
# for line in data["lines"]:
# print("{}{}(pid={}, ip={}){} {}".format(
# colorama.Style.DIM, color_for(data), data["pid"],
# data["ip"], colorama.Style.RESET_ALL, line))
for line in data["lines"]:
print(">", line)
if data["ip"] == localhost:
for line in data["lines"]:
print("{}{}(pid={}){} {}".format(
colorama.Style.DIM, color_for(data), data["pid"],
colorama.Style.RESET_ALL, line))
else:
for line in data["lines"]:
print("{}{}(pid={}, ip={}){} {}".format(
colorama.Style.DIM, color_for(data), data["pid"],
data["ip"], colorama.Style.RESET_ALL, line))

if (num_consecutive_messages_received % 100 == 0
and num_consecutive_messages_received > 0):
@@ -1153,8 +1209,8 @@ def connect(node,
worker.lock = threading.RLock()

driver_name = ""
log_stdout_file_name = ""
log_stderr_file_name = ""
log_stdout_file_path = ""
log_stderr_file_path = ""
if mode == SCRIPT_MODE:
import __main__ as main
driver_name = (main.__file__
@@ -1166,39 +1222,24 @@ def connect(node,
redirect_worker_output_val = worker.redis_client.get("RedirectOutput")
if (redirect_worker_output_val is not None
and int(redirect_worker_output_val) == 1):
log_stdout_file, log_stderr_file = (
node.new_worker_redirected_log_file(worker.worker_id))
# Redirect stdout/stderr at the file descriptor level. If we simply
# set sys.stdout and sys.stderr, then logging from C++ can fail to
# be redirected.
if log_stdout_file is not None:
os.dup2(log_stdout_file.fileno(), sys.stdout.fileno())
if log_stderr_file is not None:
os.dup2(log_stderr_file.fileno(), sys.stderr.fileno())
# We also manually set sys.stdout and sys.stderr because that seems
# to have an affect on the output buffering. Without doing this,
# stdout and stderr are heavily buffered resulting in seemingly
# lost logging statements.
if log_stdout_file is not None:
sys.stdout = log_stdout_file
if log_stderr_file is not None:
sys.stderr = log_stderr_file
# This should always be the first message to appear in the worker's
# stdout and stderr log files. The string "Ray worker pid:" is
# parsed in the log monitor process.
print("Ray worker pid: {}".format(os.getpid()))
print("Ray worker pid: {}".format(os.getpid()), file=sys.stderr)
sys.stdout.flush()
sys.stderr.flush()
log_stdout_file_name = os.path.abspath(
(log_stdout_file
if log_stdout_file is not None else sys.stdout).name)
log_stderr_file_name = os.path.abspath(
(log_stderr_file
if log_stderr_file is not None else sys.stderr).name)
log_stdout_file_name, log_stderr_file_name = (
node.get_job_redirected_log_file(worker.worker_id))
try:
log_stdout_file_path, log_stderr_file_path = \
set_log_file(log_stdout_file_name, log_stderr_file_name)
except IOError:
raise IOError(
"Workers must be able to redirect their output at"
"the file descriptor level.")
elif not LOCAL_MODE:
raise ValueError(
"Invalid worker mode. Expected DRIVER, WORKER or LOCAL.")

# TODO (Alex): `current_logging_job` tracks the current job so that we know
# when to switch log files. If all logging functionaility was moved to c++,
# the functionaility in `_raylet.pyx::switch_worker_log_if_necessary` could
# be moved to `CoreWorker::SetCurrentTaskId()`.
worker.current_logging_job_id = None
redis_address, redis_port = node.redis_address.split(":")
gcs_options = ray._raylet.GcsClientOptions(
redis_address,
@@ -1218,8 +1259,8 @@ def connect(node,
node.raylet_ip_address,
(mode == LOCAL_MODE),
driver_name,
log_stdout_file_name,
log_stderr_file_name,
log_stdout_file_path,
log_stderr_file_path,
)

# Create an object for interfacing with the global state.
6 changes: 3 additions & 3 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
@@ -106,7 +106,7 @@ RAY_CONFIG(bool, lineage_pinning_enabled, false)
/// Whether to enable the new scheduler. The new scheduler is designed
/// only to work with direct calls. Once direct calls afre becoming
/// the default, this scheduler will also become the default.
RAY_CONFIG(bool, new_scheduler_enabled, true)
RAY_CONFIG(bool, new_scheduler_enabled, false)

// The max allowed size in bytes of a return object from direct actor calls.
// Objects larger than this size will be spilled/promoted to plasma.
@@ -301,5 +301,5 @@ RAY_CONFIG(int32_t, ping_gcs_rpc_server_max_retries, 600)
RAY_CONFIG(bool, plasma_store_as_thread, false)

RAY_CONFIG(bool, gcs_actor_service_enabled,
true || (getenv("RAY_GCS_ACTOR_SERVICE_ENABLED") != nullptr &&
getenv("RAY_GCS_ACTOR_SERVICE_ENABLED") == std::string("true")))
getenv("RAY_GCS_ACTOR_SERVICE_ENABLED") != nullptr &&
getenv("RAY_GCS_ACTOR_SERVICE_ENABLED") == std::string("true"))
17 changes: 8 additions & 9 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
@@ -1093,7 +1093,6 @@ Status CoreWorker::SetResource(const std::string &resource_name, const double ca
void CoreWorker::SubmitTask(const RayFunction &function, const std::vector<TaskArg> &args,
const TaskOptions &task_options,
std::vector<ObjectID> *return_ids, int max_retries) {
RAY_LOG(INFO) << "[CoreWorker] Received task submission.";
TaskSpecBuilder builder;
const int next_task_index = worker_context_.GetNextTaskIndex();
const auto task_id =
@@ -1172,12 +1171,12 @@ Status CoreWorker::CreateActor(const RayFunction &function,
return status;
}

Status CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &function,
const std::vector<TaskArg> &args,
const TaskOptions &task_options,
std::vector<ObjectID> *return_ids) {
void CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &function,
const std::vector<TaskArg> &args,
const TaskOptions &task_options,
std::vector<ObjectID> *return_ids) {
ActorHandle *actor_handle = nullptr;
RAY_RETURN_NOT_OK(GetActorHandle(actor_id, &actor_handle));
RAY_CHECK_OK(GetActorHandle(actor_id, &actor_handle));

// Add one for actor cursor object id for tasks.
const int num_returns = task_options.num_returns + 1;
@@ -1200,16 +1199,16 @@ Status CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &f
return_ids->pop_back();

// Submit task.
Status status;
TaskSpecification task_spec = builder.Build();
if (options_.is_local_mode) {
ExecuteTaskLocalMode(task_spec, actor_id);
} else {
task_manager_->AddPendingTask(GetCallerId(), rpc_address_, task_spec,
CurrentCallSite(), actor_handle->MaxTaskRetries());
status = direct_actor_submitter_->SubmitTask(task_spec);
io_service_.post([this, task_spec]() {
RAY_UNUSED(direct_actor_submitter_->SubmitTask(task_spec));
});
}
return status;
}

Status CoreWorker::CancelTask(const ObjectID &object_id, bool force_kill) {
2 changes: 1 addition & 1 deletion src/ray/core_worker/task_manager.cc
Original file line number Diff line number Diff line change
@@ -28,7 +28,7 @@ void TaskManager::AddPendingTask(const TaskID &caller_id,
const rpc::Address &caller_address,
const TaskSpecification &spec,
const std::string &call_site, int max_retries) {
RAY_LOG(INFO) << "Adding pending task " << spec.TaskId() << " with " << max_retries
RAY_LOG(DEBUG) << "Adding pending task " << spec.TaskId() << " with " << max_retries
<< " retries";

// Add references for the dependencies to the task.
14 changes: 4 additions & 10 deletions src/ray/core_worker/transport/direct_task_transport.cc
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@
namespace ray {

Status CoreWorkerDirectTaskSubmitter::SubmitTask(TaskSpecification task_spec) {
RAY_LOG(INFO) << "[CoreWorker] Submit task " << task_spec.TaskId();
RAY_LOG(DEBUG) << "Submit task " << task_spec.TaskId();
resolver_.ResolveDependencies(task_spec, [this, task_spec]() {
RAY_LOG(DEBUG) << "Task dependencies resolved " << task_spec.TaskId();
if (actor_create_callback_ && task_spec.IsActorCreationTask()) {
@@ -117,7 +117,6 @@ void CoreWorkerDirectTaskSubmitter::OnWorkerIdle(
if (queue_entry->second.empty()) {
task_queues_.erase(queue_entry);
RAY_LOG(DEBUG) << "Task queue empty, canceling lease request";
RAY_LOG(INFO) << "[OnWorkerIdle] Cancelling worker lease if needed";
CancelWorkerLeaseIfNeeded(scheduling_key);
}
}
@@ -147,13 +146,10 @@ void CoreWorkerDirectTaskSubmitter::CancelWorkerLeaseIfNeeded(
// the request queued. This can happen if: a) due to message
// reordering, the raylet has not yet received the worker lease
// request, or b) we have already returned the worker lease

// request. In the former case, we should try the cancellation
// request again. In the latter case, the in-flight lease request
// should already have been removed from our local state, so we no
// longer need to cancel.
RAY_LOG(INFO) << "[CancelWorkerLeaseIfNeeded] Retrying cancel worker lease";
RAY_LOG(WARNING) << "Cancelling worker lease failed.";
CancelWorkerLeaseIfNeeded(scheduling_key);
}
}));
@@ -199,8 +195,7 @@ void CoreWorkerDirectTaskSubmitter::RequestNewWorkerIfNeeded(
auto lease_client = GetOrConnectLeaseClient(raylet_address);
TaskSpecification &resource_spec = it->second.front();
TaskID task_id = resource_spec.TaskId();
RAY_LOG(INFO) << "[DirectTaskTransport] Lease requested " << task_id;
RAY_LOG(INFO) << "[DirectTaskTransport] Lease resources: " << resource_spec.DebugString();
RAY_LOG(DEBUG) << "Lease requested " << task_id;
RAY_UNUSED(lease_client->RequestWorkerLease(
resource_spec, [this, scheduling_key](const Status &status,
const rpc::RequestWorkerLeaseReply &reply) {
@@ -214,12 +209,12 @@ void CoreWorkerDirectTaskSubmitter::RequestNewWorkerIfNeeded(

if (status.ok()) {
if (reply.canceled()) {
RAY_LOG(INFO) << "Lease canceled " << task_id;
RAY_LOG(DEBUG) << "Lease canceled " << task_id;
RequestNewWorkerIfNeeded(scheduling_key);
} else if (!reply.worker_address().raylet_id().empty()) {
// We got a lease for a worker. Add the lease client state and try to
// assign work to the worker.
RAY_LOG(INFO) << "Lease granted " << task_id;
RAY_LOG(DEBUG) << "Lease granted " << task_id;
rpc::WorkerAddress addr(reply.worker_address());
AddWorkerLeaseClient(addr, std::move(lease_client));
auto resources_copy = reply.resource_mapping();
@@ -326,7 +321,6 @@ Status CoreWorkerDirectTaskSubmitter::CancelTask(TaskSpecification task_spec,

if (scheduled_tasks->second.empty()) {
task_queues_.erase(scheduling_key);
RAY_LOG(INFO) << "[CancelTask] Cancelling worker lease if needed";
CancelWorkerLeaseIfNeeded(scheduling_key);
}
RAY_UNUSED(task_finisher_->PendingTaskFailed(task_spec.TaskId(),
Loading