Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TaskCancellation #7669

Merged
merged 50 commits into from
Apr 25, 2020
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
169c540
Smol comment
ijrsvt Mar 19, 2020
682c5b5
Merge branch 'master' into TaskCancellation
ijrsvt Mar 25, 2020
2d020ba
WIP, not passing ray.init
ijrsvt Mar 25, 2020
1958e05
Fixed small problem
ijrsvt Mar 25, 2020
4fdeb5a
wip
ijrsvt Mar 31, 2020
40b2bb5
Pseudo interrupt things
ijrsvt Mar 31, 2020
d1295c3
Basic prototype operational
ijrsvt Mar 31, 2020
269a3b1
Merge branch 'master' of github.com:ijrsvt/ray into TaskCancellation
ijrsvt Apr 1, 2020
028d9f7
correct proc title
ijrsvt Apr 2, 2020
a4b58e5
Mostly done
ijrsvt Apr 7, 2020
33ad6a1
Cleanup
ijrsvt Apr 7, 2020
4f7eec7
cleaner raylet error
ijrsvt Apr 7, 2020
cc3ca28
Cleaning up a few loose ends
ijrsvt Apr 7, 2020
bd47066
Fixing Race Conds
ijrsvt Apr 7, 2020
c0b5ab4
Prelim testing
ijrsvt Apr 7, 2020
58c8bed
Fixing comments and adding second_check for kill
ijrsvt Apr 8, 2020
bae435f
Working_new_impl
ijrsvt Apr 9, 2020
9ab039d
demo_ready
ijrsvt Apr 9, 2020
d85496d
Fixing my english
ijrsvt Apr 10, 2020
d0ba816
Merge branch 'master' into TaskCancellation
ijrsvt Apr 10, 2020
652a0fe
Fixing a few problems
ijrsvt Apr 10, 2020
daac610
Small problems
ijrsvt Apr 10, 2020
b050b28
Cleaning up
ijrsvt Apr 10, 2020
b0457a3
Response to changes
ijrsvt Apr 15, 2020
18b3dbc
Fixing error passing
ijrsvt Apr 15, 2020
b813faf
Merge branch 'master' into TaskCancellation
ijrsvt Apr 15, 2020
112d7d8
Merged to master
ijrsvt Apr 15, 2020
ff8bbd3
fixing lock
ijrsvt Apr 15, 2020
af35898
Cleaning up print statements
ijrsvt Apr 15, 2020
b015c51
Format
ijrsvt Apr 15, 2020
616f487
Fixing Unit test build failure
ijrsvt Apr 16, 2020
2361273
mock_worker fix
ijrsvt Apr 16, 2020
9dba915
java_fix
ijrsvt Apr 16, 2020
9a43056
Canel
ijrsvt Apr 16, 2020
68a6458
Switching to Cancel
ijrsvt Apr 17, 2020
46545e1
Responding to Review
ijrsvt Apr 21, 2020
7308225
FixFormatting
ijrsvt Apr 21, 2020
1f95492
Merge branch 'master' into TaskCancellation
ijrsvt Apr 21, 2020
9a0d120
Lease cancellation
ijrsvt Apr 22, 2020
82a6248
FInal comments?
ijrsvt Apr 22, 2020
3270f92
Moving exist check to CoreWorker
ijrsvt Apr 23, 2020
794f146
Fix Actor Transport Test
ijrsvt Apr 23, 2020
e43ea33
Fixing task manager test
ijrsvt Apr 23, 2020
9beea80
chaning clock repr
ijrsvt Apr 23, 2020
2a789f5
Fix build
ijrsvt Apr 24, 2020
8c75a83
fix white space
ijrsvt Apr 24, 2020
8f7bdfe
lint fix
ijrsvt Apr 24, 2020
6f1ef56
Updating to medium size
ijrsvt Apr 24, 2020
f7fb69f
Fixing Java test compilation issue
ijrsvt Apr 25, 2020
e8cd360
lengthen bad timeouts
ijrsvt Apr 25, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import logging
import os
import pickle
import sys
import _thread
import setproctitle

from libc.stdint cimport (
int32_t,
Expand Down Expand Up @@ -90,6 +92,7 @@ from ray.exceptions import (
RayTaskError,
ObjectStoreFullError,
RayTimeoutError,
RayCancellationError
)
from ray.utils import decode

Expand Down Expand Up @@ -449,8 +452,11 @@ cdef execute_task(
with ray.worker._changeproctitle(title, next_title):
with core_worker.profile_event(b"task:execute"):
task_exception = True
outputs = function_executor(*args, **kwargs)
task_exception = False
try:
outputs = function_executor(*args, **kwargs)
task_exception = False
except KeyboardInterrupt as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't this get raised outside of the try block?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I believe this can be raised on any line of python code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can only be raised in regular python code. When in compiled Cython or C++ code, the interrupts can be observed with PyErr_CheckSignals (which check_signals does). As long as I call check_signals before python code (the next place it is called is in store_task_output, any lingering interrupts will be cleared.

raise RayCancellationError("")
if c_return_ids.size() == 1:
outputs = (outputs,)
# Store the outputs in the object store.
Expand All @@ -466,10 +472,12 @@ cdef execute_task(
if isinstance(error, RayTaskError):
# Avoid recursive nesting of RayTaskError.
failure_object = RayTaskError(function_name, backtrace,
error.cause_cls)
error.cause_cls, proctitle=title)
elif isinstance(error, RayCancellationError):
failure_object = RayCancellationError("")
else:
failure_object = RayTaskError(function_name, backtrace,
error.__class__)
error.__class__, proctitle=title)
errors = []
for _ in range(c_return_ids.size()):
errors.append(failure_object)
Expand Down Expand Up @@ -545,6 +553,12 @@ cdef void async_plasma_callback(CObjectID object_id,
event_handler._loop.call_soon_threadsafe(
event_handler._complete_future, obj_id)

cdef void kill_main_task() nogil:
with gil:
if setproctitle.getproctitle() != "ray::IDLE":
_thread.interrupt_main()


cdef CRayStatus check_signals() nogil:
with gil:
try:
Expand Down Expand Up @@ -656,7 +670,7 @@ cdef class CoreWorker:
gcs_options.native()[0], log_dir.encode("utf-8"),
node_ip_address.encode("utf-8"), node_manager_port,
task_execution_handler, check_signals, gc_collect,
get_py_stack, True))
get_py_stack, True, kill_main_task))

def run_task_loop(self):
with nogil:
Expand Down Expand Up @@ -924,6 +938,13 @@ cdef class CoreWorker:
check_status(self.core_worker.get().KillActor(
c_actor_id, True, no_reconstruction))

def kill_task(self, ObjectID object_id):
cdef:
CObjectID c_object_id = object_id.native()

with nogil:
check_status(self.core_worker.get().KillTask(c_object_id))

def resource_ids(self):
cdef:
ResourceMappingType resource_mapping = (
Expand Down
5 changes: 5 additions & 0 deletions python/ray/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ class RayConnectionError(RayError):
pass


class RayCancellationError(RayError):
"""Raised when this task or a dependency is cancelled"""
pass


class RayTaskError(RayError):
"""Indicates that a task threw an exception during execution.

Expand Down
4 changes: 3 additions & 1 deletion python/ray/includes/libcoreworker.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
CRayStatus() nogil,
void() nogil,
void(c_string *stack_out) nogil,
c_bool ref_counting_enabled)
c_bool ref_counting_enabled,
void() nogil)
CWorkerType &GetWorkerType()
CLanguage &GetLanguage()

Expand All @@ -120,6 +121,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
CRayStatus KillActor(
const CActorID &actor_id, c_bool force_kill,
c_bool no_reconstruction)
CRayStatus KillTask(const CObjectID &object_id)

unique_ptr[CProfileEvent] CreateProfileEvent(
const c_string &event_type)
Expand Down
2 changes: 2 additions & 0 deletions python/ray/includes/unique_ids.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ cdef extern from "ray/common/id.h" namespace "ray" nogil:
CTaskID ForNormalTask(CJobID job_id, CTaskID parent_task_id,
int64_t parent_task_counter)

CActorID ActorId() const

cdef cppclass CObjectID" ray::ObjectID"(CBaseID[CObjectID]):

@staticmethod
Expand Down
3 changes: 3 additions & 0 deletions python/ray/includes/unique_ids.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ cdef class TaskID(BaseID):
def is_nil(self):
return self.data.IsNil()

def actor_id(self):
return ActorID(self.data.ActorId().Binary())

cdef size_t hash(self):
return self.data.Hash()

Expand Down
12 changes: 5 additions & 7 deletions python/ray/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,9 @@
import ray.utils
from ray.utils import _random_string
from ray.gcs_utils import ErrorType
from ray.exceptions import (
PlasmaObjectNotAvailable,
RayTaskError,
RayActorError,
RayWorkerError,
UnreconstructableError,
)
from ray.exceptions import (PlasmaObjectNotAvailable, RayTaskError,
RayActorError, RayWorkerError,
UnreconstructableError, RayCancellationError)
from ray._raylet import Pickle5Writer, unpack_pickle5_buffers

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -288,6 +284,8 @@ def _deserialize_object(self, data, metadata, object_id):
return RayWorkerError()
elif error_type == ErrorType.Value("ACTOR_DIED"):
return RayActorError()
elif error_type == ErrorType.Value("TASK_CANCELLED"):
return RayCancellationError()
elif error_type == ErrorType.Value("OBJECT_UNRECONSTRUCTABLE"):
return UnreconstructableError(ray.ObjectID(object_id.binary()))
else:
Expand Down
21 changes: 21 additions & 0 deletions python/ray/tests/test_advanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,27 @@ def method(self):
assert ray.worker.global_worker.core_worker.object_exists(x_id)


def test_task_cancellation(shutdown_only):
# import ray, time
ray.init(num_cpus=1)

@ray.remote
def micro_sleep_for(t):
for _ in range(t * 1000):
time.sleep(1 / 1000)
return t

obj1 = micro_sleep_for.remote(100)
ray.kill(obj1)
ray.get(obj1, 1)
obj2 = micro_sleep_for.remote(200)
obj3 = micro_sleep_for.remote(300)
ray.kill(obj3)
ray.get(obj3)
ray.kill(obj2)
ray.get(obj2)


if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))
27 changes: 18 additions & 9 deletions python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1371,8 +1371,10 @@ def disconnect(exiting_interpreter=False):
@contextmanager
def _changeproctitle(title, next_title):
setproctitle.setproctitle(title)
yield
setproctitle.setproctitle(next_title)
try:
yield
finally:
setproctitle.setproctitle(next_title)


def register_custom_serializer(cls,
Expand Down Expand Up @@ -1655,7 +1657,7 @@ def wait(object_ids, num_returns=1, timeout=None):
return ready_ids, remaining_ids


def kill(actor):
def kill(id):
"""Kill an actor forcefully.

This will interrupt any running tasks on the actor, causing them to fail
Expand All @@ -1668,15 +1670,22 @@ def kill(actor):
If this actor is reconstructable, it will be attempted to be reconstructed.

Args:
actor (ActorHandle): Handle to the actor to kill.
id (ActorHandle or ObjectID): Handle for the actor to kill or ObjectID
of the task to kill.
"""
if not isinstance(actor, ray.actor.ActorHandle):
raise ValueError("ray.kill() only supported for actors. "
"Got: {}.".format(type(actor)))

worker = ray.worker.global_worker
worker.check_connected()
worker.core_worker.kill_actor(actor._ray_actor_id, False)

if isinstance(id, ray.actor.ActorHandle):
Copy link
Contributor Author

@ijrsvt ijrsvt Apr 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll move this to a new PR later

worker.core_worker.kill_actor(id._ray_actor_id, False)
elif isinstance(id, ray.ObjectID):
if id.task_id().actor_id().hex() in ray.actors().keys():
raise ValueError(
"Please use ray.kill(ActorHandle) to kill an actor task")
worker.core_worker.kill_task(id)
else:
raise ValueError("ray.kill() only supported for actors and objects. "
"Got: {}.".format(type(id)))


def _mode(worker=global_worker):
Expand Down
35 changes: 32 additions & 3 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,13 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
std::function<Status()> check_signals,
std::function<void()> gc_collect,
std::function<void(std::string *)> get_lang_stack,
bool ref_counting_enabled)
bool ref_counting_enabled, std::function<void()> kill_main)
: worker_type_(worker_type),
language_(language),
log_dir_(log_dir),
ref_counting_enabled_(ref_counting_enabled),
check_signals_(check_signals),
kill_main_thread_(kill_main),
gc_collect_(gc_collect),
get_call_site_(RayConfig::instance().record_ref_creation_sites() ? get_lang_stack
: nullptr),
Expand Down Expand Up @@ -356,10 +357,10 @@ void CoreWorker::RunIOService() {

void CoreWorker::SetCurrentTaskId(const TaskID &task_id) {
worker_context_.SetCurrentTaskId(task_id);
main_thread_task_id_ = task_id;
bool not_actor_task = false;
{
absl::MutexLock lock(&mutex_);
main_thread_task_id_ = task_id;
not_actor_task = actor_id_.IsNil();
}
if (not_actor_task && task_id.IsNil()) {
Expand All @@ -374,7 +375,7 @@ void CoreWorker::SetCurrentTaskId(const TaskID &task_id) {
}

void CoreWorker::CheckForRayletFailure() {
// If the raylet fails, we will be reassigned to init (PID=1).
// If the raylet fails, we will be reassigned to init (PID=1).
if (getppid() == 1) {
RAY_LOG(ERROR) << "Raylet failed. Shutting down.";
Shutdown();
Expand Down Expand Up @@ -896,6 +897,16 @@ Status CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &f
return status;
}

Status CoreWorker::KillTask(const ObjectID &object_id) {
auto task_id = object_id.TaskId();
if (task_manager_->IsTaskPending(task_id)) {
auto task_spec = task_manager_->GetTaskSpec(object_id.TaskId());
if (!task_spec.IsActorCreationTask())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's do a RAY_CHECK instead of if statement here because this is definitely checked by python code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it actually is.

return direct_task_submitter_->KillTask(task_spec);
}
return Status::OK();
}

Status CoreWorker::KillActor(const ActorID &actor_id, bool force_kill,
bool no_reconstruction) {
ActorHandle *actor_handle = nullptr;
Expand Down Expand Up @@ -1078,6 +1089,7 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
if (resource_ids != nullptr) {
resource_ids_ = resource_ids;
}
RAY_LOG(ERROR) << "Setting current task id: " << task_spec.TaskId();
worker_context_.SetCurrentTask(task_spec);
SetCurrentTaskId(task_spec.TaskId());

Expand Down Expand Up @@ -1389,6 +1401,23 @@ void CoreWorker::HandleWaitForRefRemoved(const rpc::WaitForRefRemovedRequest &re
reference_counter_->SetRefRemovedCallback(object_id, contained_in_id, owner_id,
owner_address, ref_removed_callback);
}
void CoreWorker::HandleKillTask(const rpc::KillTaskRequest &request,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happen if the task is inside worker's scheduling queue and timeout expired?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For direct tasks, the task is never entered into the scheduling queue, it is immediately executed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about for actor tasks? Is that not supported at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct.

rpc::KillTaskReply *reply,
rpc::SendReplyCallback send_reply_callback) {
TaskID intended_task_id = TaskID::FromBinary(request.intended_task_id());
for (int i = 0; i < 3; i++) {
{
absl::MutexLock lock(&mutex_);
if (main_thread_task_id_ == intended_task_id) {
RAY_LOG(ERROR) << "Calling Kill Main";
kill_main_thread_();
return;
}
}
RAY_LOG(ERROR) << "Failed: " << i;
usleep(5 * 1000);
}
}

void CoreWorker::HandleKillActor(const rpc::KillActorRequest &request,
rpc::KillActorReply *reply,
Expand Down
14 changes: 13 additions & 1 deletion src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
std::function<Status()> check_signals = nullptr,
std::function<void()> gc_collect = nullptr,
std::function<void(std::string *)> get_lang_stack = nullptr,
bool ref_counting_enabled = false);
bool ref_counting_enabled = false,
std::function<void()> kill_main = nullptr);

virtual ~CoreWorker();

Expand Down Expand Up @@ -402,6 +403,11 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// \param[out] Status
Status KillActor(const ActorID &actor_id, bool force_kill, bool no_reconstruction);

/// Stops the task associated with the given Object ID.
///
/// \param[in] object_id of the task to kill (must be a Non-Actor task)
/// \param[out] Status
Status KillTask(const ObjectID &object_id);
/// Decrease the reference count for this actor. Should be called by the
/// language frontend when a reference to the ActorHandle destroyed.
///
Expand Down Expand Up @@ -513,6 +519,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
void HandleKillActor(const rpc::KillActorRequest &request, rpc::KillActorReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

/// Implements gRPC server handler.
void HandleKillTask(const rpc::KillTaskRequest &request, rpc::KillTaskReply *reply,
rpc::SendReplyCallback send_reply_callback);

/// Implements gRPC server handler.
void HandlePlasmaObjectReady(const rpc::PlasmaObjectReadyRequest &request,
rpc::PlasmaObjectReadyReply *reply,
Expand Down Expand Up @@ -691,6 +701,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// 1s) during long-running operations.
std::function<Status()> check_signals_;

std::function<void()> kill_main_thread_;

/// Application-language callback to trigger garbage collection in the language
/// runtime. This is required to free distributed references that may otherwise
/// be held up in garbage objects.
Expand Down
11 changes: 11 additions & 0 deletions src/ray/core_worker/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,17 @@ void TaskManager::CompletePendingTask(const TaskID &task_id,
ShutdownIfNeeded();
}

void TaskManager::CancelTask(const TaskID &task_id) {
{
absl::MutexLock lock(&mu_);
auto it = submissible_tasks_.find(task_id);
if (it != submissible_tasks_.end()) {
it->second.num_retries_left = 0;
}
}
PendingTaskFailed(task_id, rpc::ErrorType::TASK_CANCELLED);
}

void TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type,
Status *status) {
// Note that this might be the __ray_terminate__ task, so we don't log
Expand Down
4 changes: 4 additions & 0 deletions src/ray/core_worker/task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class TaskFinisherInterface {
virtual void PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type,
Status *status = nullptr) = 0;

virtual void CancelTask(const TaskID &task_id) = 0;

virtual void OnTaskDependenciesInlined(
const std::vector<ObjectID> &inlined_dependency_ids,
const std::vector<ObjectID> &contained_ids) = 0;
Expand Down Expand Up @@ -108,6 +110,8 @@ class TaskManager : public TaskFinisherInterface {
void OnTaskDependenciesInlined(const std::vector<ObjectID> &inlined_dependency_ids,
const std::vector<ObjectID> &contained_ids) override;

void CancelTask(const TaskID &task_id);

/// Return the spec for a pending task.
TaskSpecification GetTaskSpec(const TaskID &task_id) const;

Expand Down
Loading