diff --git a/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java b/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java index d19b25ed48d5a..36126f630da05 100644 --- a/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java +++ b/java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java @@ -21,6 +21,7 @@ import io.ray.api.options.PlacementGroupCreationOptions; import io.ray.api.parallelactor.ParallelActorContext; import io.ray.api.placementgroup.PlacementGroup; +import io.ray.api.runtime.RayRuntime; import io.ray.api.runtimecontext.RuntimeContext; import io.ray.api.runtimeenv.RuntimeEnv; import io.ray.runtime.config.RayConfig; @@ -31,6 +32,7 @@ import io.ray.runtime.functionmanager.FunctionManager; import io.ray.runtime.functionmanager.PyFunctionDescriptor; import io.ray.runtime.functionmanager.RayFunction; +import io.ray.runtime.gcs.GcsClient; import io.ray.runtime.generated.Common.Language; import io.ray.runtime.object.ObjectRefImpl; import io.ray.runtime.object.ObjectStore; @@ -50,7 +52,7 @@ import org.slf4j.LoggerFactory; /** Core functionality to implement Ray APIs. */ -public abstract class AbstractRayRuntime implements RayRuntimeInternal { +public abstract class AbstractRayRuntime implements RayRuntime { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRayRuntime.class); public static final String PYTHON_INIT_METHOD_NAME = "__init__"; @@ -82,6 +84,12 @@ public ObjectRef put(T obj) { /*skipAddingLocalRef=*/ true); } + public abstract GcsClient getGcsClient(); + + public abstract void start(); + + public abstract void run(); + @Override public ObjectRef put(T obj, BaseActorHandle ownerActor) { if (LOGGER.isDebugEnabled()) { @@ -355,27 +363,22 @@ private BaseActorHandle createActorImpl( abstract List getCurrentReturnIds(int numReturns, ActorId actorId); - @Override public WorkerContext getWorkerContext() { return workerContext; } - @Override public ObjectStore getObjectStore() { return objectStore; } - @Override public TaskExecutor getTaskExecutor() { return taskExecutor; } - @Override public FunctionManager getFunctionManager() { return functionManager; } - @Override public RayConfig getRayConfig() { return rayConfig; } diff --git a/java/runtime/src/main/java/io/ray/runtime/ConcurrencyGroupImpl.java b/java/runtime/src/main/java/io/ray/runtime/ConcurrencyGroupImpl.java index 53ac57da52e93..5403666e83210 100644 --- a/java/runtime/src/main/java/io/ray/runtime/ConcurrencyGroupImpl.java +++ b/java/runtime/src/main/java/io/ray/runtime/ConcurrencyGroupImpl.java @@ -24,7 +24,7 @@ public ConcurrencyGroupImpl(String name, int maxConcurrency, List funcs funcs.forEach( func -> { RayFunction rayFunc = - ((RayRuntimeInternal) Ray.internal()).getFunctionManager().getFunction(func); + ((AbstractRayRuntime) Ray.internal()).getFunctionManager().getFunction(func); functionDescriptors.add(rayFunc.getFunctionDescriptor()); }); } diff --git a/java/runtime/src/main/java/io/ray/runtime/DefaultRayRuntimeFactory.java b/java/runtime/src/main/java/io/ray/runtime/DefaultRayRuntimeFactory.java index 806ec020951fe..e9ecc0889d9a3 100644 --- a/java/runtime/src/main/java/io/ray/runtime/DefaultRayRuntimeFactory.java +++ b/java/runtime/src/main/java/io/ray/runtime/DefaultRayRuntimeFactory.java @@ -28,11 +28,10 @@ public RayRuntime createRayRuntime() { try { logger.debug("Initializing runtime with config: {}", rayConfig); - AbstractRayRuntime innerRuntime = + AbstractRayRuntime runtime = rayConfig.runMode == RunMode.LOCAL ? new RayDevRuntime(rayConfig) : new RayNativeRuntime(rayConfig); - RayRuntimeInternal runtime = innerRuntime; runtime.start(); return runtime; } catch (Exception e) { diff --git a/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java b/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java index 2f245273028d4..6b0032594fcd9 100644 --- a/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java +++ b/java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java @@ -288,8 +288,6 @@ private static native void nativeInitialize( private static native byte[] nativeGetActorIdOfNamedActor(String actorName, String namespace); - private static native void nativeSetCoreWorker(byte[] workerId); - private static native Map> nativeGetResourceIds(); private static native String nativeGetNamespace(); diff --git a/java/runtime/src/main/java/io/ray/runtime/RayRuntimeInternal.java b/java/runtime/src/main/java/io/ray/runtime/RayRuntimeInternal.java deleted file mode 100644 index fd1a23b90b3be..0000000000000 --- a/java/runtime/src/main/java/io/ray/runtime/RayRuntimeInternal.java +++ /dev/null @@ -1,30 +0,0 @@ -package io.ray.runtime; - -import io.ray.api.runtime.RayRuntime; -import io.ray.runtime.config.RayConfig; -import io.ray.runtime.context.WorkerContext; -import io.ray.runtime.functionmanager.FunctionManager; -import io.ray.runtime.gcs.GcsClient; -import io.ray.runtime.object.ObjectStore; -import io.ray.runtime.task.TaskExecutor; - -/** This interface is required to make {@link RayRuntimeProxy} work. */ -public interface RayRuntimeInternal extends RayRuntime { - - /** Start runtime. */ - void start(); - - WorkerContext getWorkerContext(); - - ObjectStore getObjectStore(); - - TaskExecutor getTaskExecutor(); - - FunctionManager getFunctionManager(); - - RayConfig getRayConfig(); - - GcsClient getGcsClient(); - - void run(); -} diff --git a/java/runtime/src/main/java/io/ray/runtime/actor/NativeActorHandle.java b/java/runtime/src/main/java/io/ray/runtime/actor/NativeActorHandle.java index 10b1137505b5a..f5ee9deffea84 100644 --- a/java/runtime/src/main/java/io/ray/runtime/actor/NativeActorHandle.java +++ b/java/runtime/src/main/java/io/ray/runtime/actor/NativeActorHandle.java @@ -8,7 +8,7 @@ import io.ray.api.Ray; import io.ray.api.id.ActorId; import io.ray.api.id.ObjectId; -import io.ray.runtime.RayRuntimeInternal; +import io.ray.runtime.AbstractRayRuntime; import io.ray.runtime.generated.Common.Language; import java.io.Externalizable; import java.io.IOException; @@ -122,7 +122,7 @@ private static final class NativeActorHandleReference public NativeActorHandleReference(NativeActorHandle handle) { super(handle, REFERENCE_QUEUE); this.actorId = handle.actorId; - RayRuntimeInternal runtime = (RayRuntimeInternal) Ray.internal(); + AbstractRayRuntime runtime = (AbstractRayRuntime) Ray.internal(); this.workerId = runtime.getWorkerContext().getCurrentWorkerId().getBytes(); this.removed = new AtomicBoolean(false); REFERENCES.add(this); diff --git a/java/runtime/src/main/java/io/ray/runtime/context/RuntimeContextImpl.java b/java/runtime/src/main/java/io/ray/runtime/context/RuntimeContextImpl.java index ba10acc0a1053..41648ad0753b2 100644 --- a/java/runtime/src/main/java/io/ray/runtime/context/RuntimeContextImpl.java +++ b/java/runtime/src/main/java/io/ray/runtime/context/RuntimeContextImpl.java @@ -8,7 +8,7 @@ import io.ray.api.runtimecontext.NodeInfo; import io.ray.api.runtimecontext.ResourceValue; import io.ray.api.runtimecontext.RuntimeContext; -import io.ray.runtime.RayRuntimeInternal; +import io.ray.runtime.AbstractRayRuntime; import io.ray.runtime.config.RunMode; import io.ray.runtime.util.ResourceUtil; import java.util.ArrayList; @@ -21,9 +21,9 @@ public class RuntimeContextImpl implements RuntimeContext { - private RayRuntimeInternal runtime; + private AbstractRayRuntime runtime; - public RuntimeContextImpl(RayRuntimeInternal runtime) { + public RuntimeContextImpl(AbstractRayRuntime runtime) { this.runtime = runtime; } diff --git a/java/runtime/src/main/java/io/ray/runtime/object/NativeObjectStore.java b/java/runtime/src/main/java/io/ray/runtime/object/NativeObjectStore.java index bf99e6f2ac7b0..ef48447d5ac84 100644 --- a/java/runtime/src/main/java/io/ray/runtime/object/NativeObjectStore.java +++ b/java/runtime/src/main/java/io/ray/runtime/object/NativeObjectStore.java @@ -6,7 +6,7 @@ import io.ray.api.id.BaseId; import io.ray.api.id.ObjectId; import io.ray.api.id.UniqueId; -import io.ray.runtime.RayRuntimeInternal; +import io.ray.runtime.AbstractRayRuntime; import io.ray.runtime.context.WorkerContext; import io.ray.runtime.generated.Common.Address; import java.util.HashMap; @@ -40,7 +40,7 @@ public ObjectId putRaw(NativeRayObject obj) { @Override public ObjectId putRaw(NativeRayObject obj, ActorId ownerActorId) { byte[] serializedOwnerAddressBytes = - ((RayRuntimeInternal) Ray.internal()).getGcsClient().getActorAddress(ownerActorId); + ((AbstractRayRuntime) Ray.internal()).getGcsClient().getActorAddress(ownerActorId); return new ObjectId(nativePut(obj, serializedOwnerAddressBytes)); } diff --git a/java/runtime/src/main/java/io/ray/runtime/object/ObjectRefImpl.java b/java/runtime/src/main/java/io/ray/runtime/object/ObjectRefImpl.java index 6fb64e8055ca6..51bf3c20dc7dc 100644 --- a/java/runtime/src/main/java/io/ray/runtime/object/ObjectRefImpl.java +++ b/java/runtime/src/main/java/io/ray/runtime/object/ObjectRefImpl.java @@ -8,7 +8,7 @@ import io.ray.api.Ray; import io.ray.api.id.ObjectId; import io.ray.api.id.UniqueId; -import io.ray.runtime.RayRuntimeInternal; +import io.ray.runtime.AbstractRayRuntime; import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; @@ -60,7 +60,7 @@ public ObjectRefImpl(ObjectId id, Class type) { public void init(ObjectId id, Class type, boolean skipAddingLocalRef) { this.id = id; this.type = (Class) type; - RayRuntimeInternal runtime = (RayRuntimeInternal) Ray.internal(); + AbstractRayRuntime runtime = (AbstractRayRuntime) Ray.internal(); Preconditions.checkState(workerId == null); workerId = runtime.getWorkerContext().getCurrentWorkerId(); @@ -106,7 +106,7 @@ public String toString() { public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(this.getId()); out.writeObject(this.getType()); - RayRuntimeInternal runtime = (RayRuntimeInternal) Ray.internal(); + AbstractRayRuntime runtime = (AbstractRayRuntime) Ray.internal(); byte[] ownerAddress = runtime.getObjectStore().getOwnershipInfo(this.getId()); out.writeInt(ownerAddress.length); out.write(ownerAddress); @@ -121,7 +121,7 @@ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundExcept byte[] ownerAddress = new byte[len]; in.readFully(ownerAddress); - RayRuntimeInternal runtime = (RayRuntimeInternal) Ray.internal(); + AbstractRayRuntime runtime = (AbstractRayRuntime) Ray.internal(); Preconditions.checkState(workerId == null); workerId = runtime.getWorkerContext().getCurrentWorkerId(); runtime.getObjectStore().addLocalReference(workerId, id); @@ -156,7 +156,7 @@ public void finalizeReferent() { REFERENCES.remove(this); // It's possible that GC is executed after the runtime is shutdown. if (Ray.isInitialized()) { - ((RayRuntimeInternal) (Ray.internal())) + ((AbstractRayRuntime) (Ray.internal())) .getObjectStore() .removeLocalReference(workerId, objectId); allObjects.remove(objectId); diff --git a/java/runtime/src/main/java/io/ray/runtime/runner/worker/DefaultWorker.java b/java/runtime/src/main/java/io/ray/runtime/runner/worker/DefaultWorker.java index 6352011474431..3e88bfe484480 100644 --- a/java/runtime/src/main/java/io/ray/runtime/runner/worker/DefaultWorker.java +++ b/java/runtime/src/main/java/io/ray/runtime/runner/worker/DefaultWorker.java @@ -1,7 +1,7 @@ package io.ray.runtime.runner.worker; import io.ray.api.Ray; -import io.ray.runtime.RayRuntimeInternal; +import io.ray.runtime.AbstractRayRuntime; /** Default implementation of the worker process. */ public class DefaultWorker { @@ -12,6 +12,6 @@ public static void main(String[] args) { System.setProperty("ray.run-mode", "CLUSTER"); System.setProperty("ray.worker.mode", "WORKER"); Ray.init(); - ((RayRuntimeInternal) Ray.internal()).run(); + ((AbstractRayRuntime) Ray.internal()).run(); } } diff --git a/java/runtime/src/main/java/io/ray/runtime/task/ArgumentsBuilder.java b/java/runtime/src/main/java/io/ray/runtime/task/ArgumentsBuilder.java index 1c45d934038c4..af46661a708a6 100644 --- a/java/runtime/src/main/java/io/ray/runtime/task/ArgumentsBuilder.java +++ b/java/runtime/src/main/java/io/ray/runtime/task/ArgumentsBuilder.java @@ -5,7 +5,7 @@ import io.ray.api.ObjectRef; import io.ray.api.Ray; import io.ray.api.id.ObjectId; -import io.ray.runtime.RayRuntimeInternal; +import io.ray.runtime.AbstractRayRuntime; import io.ray.runtime.generated.Common.Address; import io.ray.runtime.generated.Common.Language; import io.ray.runtime.object.NativeRayObject; @@ -41,7 +41,7 @@ public static List wrap(Object[] args, Language language) { if (arg instanceof ObjectRef) { Preconditions.checkState(arg instanceof ObjectRefImpl); id = ((ObjectRefImpl) arg).getId(); - address = ((RayRuntimeInternal) Ray.internal()).getObjectStore().getOwnerAddress(id); + address = ((AbstractRayRuntime) Ray.internal()).getObjectStore().getOwnerAddress(id); } else { value = ObjectSerializer.serialize(arg); if (language != Language.JAVA) { @@ -60,8 +60,8 @@ public static List wrap(Object[] args, Language language) { } } if (value.data.length > LARGEST_SIZE_PASS_BY_VALUE) { - id = ((RayRuntimeInternal) Ray.internal()).getObjectStore().putRaw(value); - address = ((RayRuntimeInternal) Ray.internal()).getWorkerContext().getRpcAddress(); + id = ((AbstractRayRuntime) Ray.internal()).getObjectStore().putRaw(value); + address = ((AbstractRayRuntime) Ray.internal()).getWorkerContext().getRpcAddress(); value = null; } } diff --git a/java/runtime/src/main/java/io/ray/runtime/task/LocalModeTaskExecutor.java b/java/runtime/src/main/java/io/ray/runtime/task/LocalModeTaskExecutor.java index 90d93b0a405a1..94830b316f490 100644 --- a/java/runtime/src/main/java/io/ray/runtime/task/LocalModeTaskExecutor.java +++ b/java/runtime/src/main/java/io/ray/runtime/task/LocalModeTaskExecutor.java @@ -1,7 +1,7 @@ package io.ray.runtime.task; import io.ray.api.id.UniqueId; -import io.ray.runtime.RayRuntimeInternal; +import io.ray.runtime.AbstractRayRuntime; /** Task executor for local mode. */ public class LocalModeTaskExecutor extends TaskExecutor { @@ -20,7 +20,7 @@ public UniqueId getWorkerId() { } } - public LocalModeTaskExecutor(RayRuntimeInternal runtime) { + public LocalModeTaskExecutor(AbstractRayRuntime runtime) { super(runtime); } diff --git a/java/runtime/src/main/java/io/ray/runtime/task/LocalModeTaskSubmitter.java b/java/runtime/src/main/java/io/ray/runtime/task/LocalModeTaskSubmitter.java index c6376c6390826..a44df04c12110 100644 --- a/java/runtime/src/main/java/io/ray/runtime/task/LocalModeTaskSubmitter.java +++ b/java/runtime/src/main/java/io/ray/runtime/task/LocalModeTaskSubmitter.java @@ -14,8 +14,8 @@ import io.ray.api.options.CallOptions; import io.ray.api.options.PlacementGroupCreationOptions; import io.ray.api.placementgroup.PlacementGroup; +import io.ray.runtime.AbstractRayRuntime; import io.ray.runtime.ConcurrencyGroupImpl; -import io.ray.runtime.RayRuntimeInternal; import io.ray.runtime.actor.LocalModeActorHandle; import io.ray.runtime.context.LocalModeWorkerContext; import io.ray.runtime.functionmanager.FunctionDescriptor; @@ -59,7 +59,7 @@ public class LocalModeTaskSubmitter implements TaskSubmitter { private final Map> waitingTasks = new HashMap<>(); private final Object taskAndObjectLock = new Object(); - private final RayRuntimeInternal runtime; + private final AbstractRayRuntime runtime; private final TaskExecutor taskExecutor; private final LocalModeObjectStore objectStore; @@ -169,7 +169,7 @@ public synchronized void shutdown() { } public LocalModeTaskSubmitter( - RayRuntimeInternal runtime, TaskExecutor taskExecutor, LocalModeObjectStore objectStore) { + AbstractRayRuntime runtime, TaskExecutor taskExecutor, LocalModeObjectStore objectStore) { this.runtime = runtime; this.taskExecutor = taskExecutor; this.objectStore = objectStore; diff --git a/java/runtime/src/main/java/io/ray/runtime/task/NativeTaskExecutor.java b/java/runtime/src/main/java/io/ray/runtime/task/NativeTaskExecutor.java index e13e98fd87161..755dde2e3dad5 100644 --- a/java/runtime/src/main/java/io/ray/runtime/task/NativeTaskExecutor.java +++ b/java/runtime/src/main/java/io/ray/runtime/task/NativeTaskExecutor.java @@ -1,14 +1,14 @@ package io.ray.runtime.task; import io.ray.api.id.UniqueId; -import io.ray.runtime.RayRuntimeInternal; +import io.ray.runtime.AbstractRayRuntime; /** Task executor for cluster mode. */ public class NativeTaskExecutor extends TaskExecutor { static class NativeActorContext extends TaskExecutor.ActorContext {} - public NativeTaskExecutor(RayRuntimeInternal runtime) { + public NativeTaskExecutor(AbstractRayRuntime runtime) { super(runtime); } diff --git a/java/runtime/src/main/java/io/ray/runtime/task/TaskExecutor.java b/java/runtime/src/main/java/io/ray/runtime/task/TaskExecutor.java index 1f13734e9423c..8a99006aa8711 100644 --- a/java/runtime/src/main/java/io/ray/runtime/task/TaskExecutor.java +++ b/java/runtime/src/main/java/io/ray/runtime/task/TaskExecutor.java @@ -8,7 +8,7 @@ import io.ray.api.id.JobId; import io.ray.api.id.TaskId; import io.ray.api.id.UniqueId; -import io.ray.runtime.RayRuntimeInternal; +import io.ray.runtime.AbstractRayRuntime; import io.ray.runtime.functionmanager.JavaFunctionDescriptor; import io.ray.runtime.functionmanager.RayFunction; import io.ray.runtime.generated.Common.TaskType; @@ -32,7 +32,7 @@ public abstract class TaskExecutor { private static final Logger LOGGER = LoggerFactory.getLogger(TaskExecutor.class); - protected final RayRuntimeInternal runtime; + protected final AbstractRayRuntime runtime; // TODO(qwang): Use actorContext instead later. private final ConcurrentHashMap actorContextMap = new ConcurrentHashMap<>(); @@ -44,7 +44,7 @@ static class ActorContext { Object currentActor = null; } - TaskExecutor(RayRuntimeInternal runtime) { + TaskExecutor(AbstractRayRuntime runtime) { this.runtime = runtime; } diff --git a/java/runtime/src/main/java/io/ray/runtime/util/MethodUtils.java b/java/runtime/src/main/java/io/ray/runtime/util/MethodUtils.java index b6523562f5e68..aad31daf1890f 100644 --- a/java/runtime/src/main/java/io/ray/runtime/util/MethodUtils.java +++ b/java/runtime/src/main/java/io/ray/runtime/util/MethodUtils.java @@ -1,7 +1,7 @@ package io.ray.runtime.util; import io.ray.api.Ray; -import io.ray.runtime.RayRuntimeInternal; +import io.ray.runtime.AbstractRayRuntime; import java.lang.reflect.Array; import java.lang.reflect.Field; import java.lang.reflect.Method; @@ -47,7 +47,7 @@ public static Class getReturnTypeFromSignature(String signature) { /// This code path indicates that here might be in another thread of a worker. /// So try to load the class from URLClassLoader of this worker. ClassLoader cl = - ((RayRuntimeInternal) Ray.internal()).getFunctionManager().getClassLoader(); + ((AbstractRayRuntime) Ray.internal()).getFunctionManager().getClassLoader(); actorClz = Class.forName(className, true, cl); } } catch (Exception e) { diff --git a/java/runtime/src/main/java/io/ray/runtime/utils/parallelactor/ParallelActorContextImpl.java b/java/runtime/src/main/java/io/ray/runtime/utils/parallelactor/ParallelActorContextImpl.java index 9da1f4cd0e3e9..6bcd5c18b5b9e 100644 --- a/java/runtime/src/main/java/io/ray/runtime/utils/parallelactor/ParallelActorContextImpl.java +++ b/java/runtime/src/main/java/io/ray/runtime/utils/parallelactor/ParallelActorContextImpl.java @@ -8,7 +8,7 @@ import io.ray.api.function.RayFunc; import io.ray.api.function.RayFuncR; import io.ray.api.parallelactor.*; -import io.ray.runtime.RayRuntimeInternal; +import io.ray.runtime.AbstractRayRuntime; import io.ray.runtime.functionmanager.FunctionManager; import io.ray.runtime.functionmanager.JavaFunctionDescriptor; @@ -26,7 +26,7 @@ public ParallelActorHandle createParallelActorExecutor( .build(); } - FunctionManager functionManager = ((RayRuntimeInternal) Ray.internal()).getFunctionManager(); + FunctionManager functionManager = ((AbstractRayRuntime) Ray.internal()).getFunctionManager(); JavaFunctionDescriptor functionDescriptor = functionManager.getFunction(ctorFunc).getFunctionDescriptor(); ActorHandle parallelExecutorHandle = @@ -42,7 +42,7 @@ public ObjectRef submitTask( ParallelActorHandle parallelActorHandle, int instanceId, RayFunc func, Object[] args) { ActorHandle parallelExecutor = ((ParallelActorHandleImpl) parallelActorHandle).getExecutor(); - FunctionManager functionManager = ((RayRuntimeInternal) Ray.internal()).getFunctionManager(); + FunctionManager functionManager = ((AbstractRayRuntime) Ray.internal()).getFunctionManager(); JavaFunctionDescriptor functionDescriptor = functionManager.getFunction(func).getFunctionDescriptor(); ObjectRef ret = diff --git a/java/runtime/src/main/java/io/ray/runtime/utils/parallelactor/ParallelActorExecutorImpl.java b/java/runtime/src/main/java/io/ray/runtime/utils/parallelactor/ParallelActorExecutorImpl.java index 3836303e13e94..91020366fd24a 100644 --- a/java/runtime/src/main/java/io/ray/runtime/utils/parallelactor/ParallelActorExecutorImpl.java +++ b/java/runtime/src/main/java/io/ray/runtime/utils/parallelactor/ParallelActorExecutorImpl.java @@ -2,7 +2,7 @@ import com.google.common.base.Preconditions; import io.ray.api.Ray; -import io.ray.runtime.RayRuntimeInternal; +import io.ray.runtime.AbstractRayRuntime; import io.ray.runtime.functionmanager.FunctionManager; import io.ray.runtime.functionmanager.JavaFunctionDescriptor; import io.ray.runtime.functionmanager.RayFunction; @@ -22,7 +22,7 @@ public class ParallelActorExecutorImpl { public ParallelActorExecutorImpl(int parallelism, JavaFunctionDescriptor javaFunctionDescriptor) throws InvocationTargetException, IllegalAccessException { - functionManager = ((RayRuntimeInternal) Ray.internal()).getFunctionManager(); + functionManager = ((AbstractRayRuntime) Ray.internal()).getFunctionManager(); RayFunction init = functionManager.getFunction(javaFunctionDescriptor); Thread.currentThread().setContextClassLoader(init.classLoader); for (int i = 0; i < parallelism; ++i) { diff --git a/java/test/src/main/java/io/ray/test/TestUtils.java b/java/test/src/main/java/io/ray/test/TestUtils.java index d302a0baee57f..408f189447924 100644 --- a/java/test/src/main/java/io/ray/test/TestUtils.java +++ b/java/test/src/main/java/io/ray/test/TestUtils.java @@ -3,7 +3,7 @@ import com.google.common.base.Preconditions; import io.ray.api.ObjectRef; import io.ray.api.Ray; -import io.ray.runtime.RayRuntimeInternal; +import io.ray.runtime.AbstractRayRuntime; import io.ray.runtime.config.RayConfig; import io.ray.runtime.config.RunMode; import io.ray.runtime.task.ArgumentsBuilder; @@ -122,12 +122,8 @@ public static void warmUpCluster() { Assert.assertEquals(obj.get(), "hi"); } - public static RayRuntimeInternal getRuntime() { - return (RayRuntimeInternal) Ray.internal(); - } - - public static RayRuntimeInternal getUnderlyingRuntime() { - return (RayRuntimeInternal) Ray.internal(); + public static AbstractRayRuntime getRuntime() { + return (AbstractRayRuntime) Ray.internal(); } public static ProcessBuilder buildDriver(Class mainClass, String[] args) { diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc index c467b4ecaba0c..1521b5d3f82db 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc @@ -390,12 +390,6 @@ JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeKillActor( THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, (void)0); } -JNIEXPORT void JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeSetCoreWorker( - JNIEnv *env, jclass, jbyteArray workerId) { - const auto worker_id = JavaByteArrayToId(env, workerId); - CoreWorkerProcess::SetCurrentThreadWorkerId(worker_id); -} - JNIEXPORT jobject JNICALL Java_io_ray_runtime_RayNativeRuntime_nativeGetResourceIds(JNIEnv *env, jclass) { auto key_converter = [](JNIEnv *env, const std::string &str) -> jstring { diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.h b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.h index 6650799ce2488..b620a21e355a9 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.h +++ b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.h @@ -79,14 +79,6 @@ Java_io_ray_runtime_RayNativeRuntime_nativeGetActorIdOfNamedActor(JNIEnv *, jstring, jstring); -/* - * Class: io_ray_runtime_RayNativeRuntime - * Method: nativeSetCoreWorker - * Signature: ([B)V - */ -JNIEXPORT void JNICALL -Java_io_ray_runtime_RayNativeRuntime_nativeSetCoreWorker(JNIEnv *, jclass, jbyteArray); - /* * Class: io_ray_runtime_RayNativeRuntime * Method: nativeGetResourceIds