Skip to content

Commit

Permalink
Add test to test raylet client connection when raylet crashes. (ray-p…
Browse files Browse the repository at this point in the history
  • Loading branch information
guoyuhong authored and robertnishihara committed Dec 14, 2018
1 parent e7b51cb commit a4abe6c
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import java.util.concurrent.ConcurrentHashMap;
import org.ray.api.RayObject;
import org.ray.api.WaitResult;
import org.ray.api.exception.RayException;
import org.ray.api.id.UniqueId;
import org.ray.runtime.RayDevRuntime;
import org.ray.runtime.objectstore.MockObjectStore;
Expand Down Expand Up @@ -68,7 +67,7 @@ public TaskSpec getTask() {

@Override
public void fetchOrReconstruct(List<UniqueId> objectIds, boolean fetchOnly,
UniqueId currentTaskId) throws RayException {
UniqueId currentTaskId) {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import java.util.List;
import org.ray.api.RayObject;
import org.ray.api.WaitResult;
import org.ray.api.exception.RayException;
import org.ray.api.id.UniqueId;
import org.ray.runtime.task.TaskSpec;

Expand All @@ -16,8 +15,7 @@ public interface RayletClient {

TaskSpec getTask();

void fetchOrReconstruct(List<UniqueId> objectIds, boolean fetchOnly, UniqueId currentTaskId)
throws RayException;
void fetchOrReconstruct(List<UniqueId> objectIds, boolean fetchOnly, UniqueId currentTaskId);

void notifyUnblocked(UniqueId currentTaskId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public TaskSpec getTask() {

@Override
public void fetchOrReconstruct(List<UniqueId> objectIds, boolean fetchOnly,
UniqueId currentTaskId) throws RayException {
UniqueId currentTaskId) {
if (RayLog.core.isDebugEnabled()) {
RayLog.core.debug("Blocked on objects for task {}, object IDs are {}",
UniqueIdUtil.computeTaskId(objectIds.get(0)), objectIds);
Expand Down
2 changes: 1 addition & 1 deletion src/ray/raylet/lib/python/raylet_extension.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ static PyObject *PyRayletClient_FetchOrReconstruct(PyRayletClient *self, PyObjec
<< "raylet client may be closed, check raylet status. error message: "
<< status.ToString();
PyErr_SetString(CommonError, stream.str().c_str());
Py_RETURN_NONE;
return NULL;
}
}

Expand Down
17 changes: 17 additions & 0 deletions test/failure_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
import ray
import sys
import tempfile
import threading
import time

import ray.ray_constants as ray_constants
from ray.utils import _random_string
import pytest


Expand Down Expand Up @@ -611,3 +613,18 @@ def test_warning_for_dead_node(ray_start_two_nodes):
}

assert client_ids == warning_client_ids


def test_raylet_crash_when_get(ray_start_regular):
nonexistent_id = ray.ObjectID(_random_string())

def sleep_to_kill_raylet():
# Don't kill raylet before default workers get connected.
time.sleep(2)
ray.services.all_processes[ray.services.PROCESS_TYPE_RAYLET][0].kill()

thread = threading.Thread(target=sleep_to_kill_raylet)
thread.start()
with pytest.raises(Exception, match=r".*raylet client may be closed.*"):
ray.get(nonexistent_id)
thread.join()

0 comments on commit a4abe6c

Please sign in to comment.