Skip to content

Commit

Permalink
[Java]Add actor id when throw RayActorException (#10886)
Browse files Browse the repository at this point in the history
  • Loading branch information
ffbin authored Sep 20, 2020
1 parent 3f90ec5 commit 1d06e02
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 2 deletions.
2 changes: 1 addition & 1 deletion java/api/src/main/java/io/ray/api/id/TaskId.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
*/
public class TaskId extends BaseId implements Serializable {

private static final int UNIQUE_BYTES_LENGTH = 8;
public static final int UNIQUE_BYTES_LENGTH = 8;

public static final int LENGTH = ActorId.LENGTH + UNIQUE_BYTES_LENGTH;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.ray.runtime.exception;

import io.ray.api.id.ActorId;

/**
* Indicates that the actor died unexpectedly before finishing a task.
*
Expand All @@ -8,10 +10,18 @@
*/
public class RayActorException extends RayException {

public ActorId actorId;

public RayActorException() {
super("The actor died unexpectedly before finishing this task.");
}

public RayActorException(ActorId actorId) {
super(String.format(
"The actor %s died unexpectedly before finishing this task.", actorId));
this.actorId = actorId;
}

public RayActorException(String message) {
super(message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.ray.runtime.exception.UnreconstructableException;
import io.ray.runtime.generated.Common.ErrorType;
import io.ray.runtime.serializer.Serializer;
import io.ray.runtime.util.IdUtil;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -74,7 +75,7 @@ public static Object deserialize(NativeRayObject nativeRayObject, ObjectId objec
} else if (Arrays.equals(meta, WORKER_EXCEPTION_META)) {
return new RayWorkerException();
} else if (Arrays.equals(meta, ACTOR_EXCEPTION_META)) {
return new RayActorException();
return new RayActorException(IdUtil.getActorIdFromObjectId(objectId));
} else if (Arrays.equals(meta, UNRECONSTRUCTABLE_EXCEPTION_META)) {
return new UnreconstructableException(objectId);
} else if (Arrays.equals(meta, TASK_EXECUTION_EXCEPTION_META)) {
Expand Down
17 changes: 17 additions & 0 deletions java/runtime/src/main/java/io/ray/runtime/util/IdUtil.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package io.ray.runtime.util;

import io.ray.api.id.ActorId;
import io.ray.api.id.BaseId;
import io.ray.api.id.ObjectId;
import io.ray.api.id.TaskId;

/**
* Helper method for different Ids. Note: any changes to these methods must be synced with C++
Expand Down Expand Up @@ -75,4 +78,18 @@ private static long murmurHash64A(byte[] data, int length, int seed) {

return h;
}

/**
* Compute the actor ID of the task which created this object.
* @return The actor ID of the task which created this object.
*/
public static ActorId getActorIdFromObjectId(ObjectId objectId) {
byte[] taskIdBytes = new byte[TaskId.LENGTH];
System.arraycopy(objectId.getBytes(), 0, taskIdBytes, 0, TaskId.LENGTH);
TaskId taskId = TaskId.fromBytes(taskIdBytes);
byte[] actorIdBytes = new byte[ActorId.LENGTH];
System.arraycopy(taskId.getBytes(), TaskId.UNIQUE_BYTES_LENGTH,
actorIdBytes, 0, ActorId.LENGTH);
return ActorId.fromBytes(actorIdBytes);
}
}
1 change: 1 addition & 0 deletions java/test/src/main/java/io/ray/test/FailureTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public void testActorProcessDying() {
} catch (RayActorException e) {
// When the actor process dies while executing a task, we should receive an
// RayActorException.
Assert.assertEquals(e.actorId, actor.getId());
}
try {
actor.task(BadActor::badMethod).remote().get();
Expand Down

0 comments on commit 1d06e02

Please sign in to comment.