Skip to content

Commit 804b6b1

Browse files
authored
Revert "[Java] Remove RayRuntimeInternal class (#25016)" (#25139)
This reverts commit 4026b38. Broke test_raydp_dataset
1 parent a7e7593 commit 804b6b1

21 files changed

+97
-49
lines changed

java/runtime/src/main/java/io/ray/runtime/AbstractRayRuntime.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import io.ray.api.options.PlacementGroupCreationOptions;
2222
import io.ray.api.parallelactor.ParallelActorContext;
2323
import io.ray.api.placementgroup.PlacementGroup;
24-
import io.ray.api.runtime.RayRuntime;
2524
import io.ray.api.runtimecontext.RuntimeContext;
2625
import io.ray.api.runtimeenv.RuntimeEnv;
2726
import io.ray.runtime.config.RayConfig;
@@ -32,7 +31,6 @@
3231
import io.ray.runtime.functionmanager.FunctionManager;
3332
import io.ray.runtime.functionmanager.PyFunctionDescriptor;
3433
import io.ray.runtime.functionmanager.RayFunction;
35-
import io.ray.runtime.gcs.GcsClient;
3634
import io.ray.runtime.generated.Common.Language;
3735
import io.ray.runtime.object.ObjectRefImpl;
3836
import io.ray.runtime.object.ObjectStore;
@@ -52,7 +50,7 @@
5250
import org.slf4j.LoggerFactory;
5351

5452
/** Core functionality to implement Ray APIs. */
55-
public abstract class AbstractRayRuntime implements RayRuntime {
53+
public abstract class AbstractRayRuntime implements RayRuntimeInternal {
5654

5755
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRayRuntime.class);
5856
public static final String PYTHON_INIT_METHOD_NAME = "__init__";
@@ -84,12 +82,6 @@ public <T> ObjectRef<T> put(T obj) {
8482
/*skipAddingLocalRef=*/ true);
8583
}
8684

87-
public abstract GcsClient getGcsClient();
88-
89-
public abstract void start();
90-
91-
public abstract void run();
92-
9385
@Override
9486
public <T> ObjectRef<T> put(T obj, BaseActorHandle ownerActor) {
9587
if (LOGGER.isDebugEnabled()) {
@@ -363,22 +355,27 @@ private BaseActorHandle createActorImpl(
363355

364356
abstract List<ObjectId> getCurrentReturnIds(int numReturns, ActorId actorId);
365357

358+
@Override
366359
public WorkerContext getWorkerContext() {
367360
return workerContext;
368361
}
369362

363+
@Override
370364
public ObjectStore getObjectStore() {
371365
return objectStore;
372366
}
373367

368+
@Override
374369
public TaskExecutor getTaskExecutor() {
375370
return taskExecutor;
376371
}
377372

373+
@Override
378374
public FunctionManager getFunctionManager() {
379375
return functionManager;
380376
}
381377

378+
@Override
382379
public RayConfig getRayConfig() {
383380
return rayConfig;
384381
}

java/runtime/src/main/java/io/ray/runtime/ConcurrencyGroupImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public ConcurrencyGroupImpl(String name, int maxConcurrency, List<RayFunc> funcs
2424
funcs.forEach(
2525
func -> {
2626
RayFunction rayFunc =
27-
((AbstractRayRuntime) Ray.internal()).getFunctionManager().getFunction(func);
27+
((RayRuntimeInternal) Ray.internal()).getFunctionManager().getFunction(func);
2828
functionDescriptors.add(rayFunc.getFunctionDescriptor());
2929
});
3030
}

java/runtime/src/main/java/io/ray/runtime/DefaultRayRuntimeFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,11 @@ public RayRuntime createRayRuntime() {
2828

2929
try {
3030
logger.debug("Initializing runtime with config: {}", rayConfig);
31-
AbstractRayRuntime runtime =
31+
AbstractRayRuntime innerRuntime =
3232
rayConfig.runMode == RunMode.LOCAL
3333
? new RayDevRuntime(rayConfig)
3434
: new RayNativeRuntime(rayConfig);
35+
RayRuntimeInternal runtime = innerRuntime;
3536
runtime.start();
3637
return runtime;
3738
} catch (Exception e) {

java/runtime/src/main/java/io/ray/runtime/RayNativeRuntime.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,8 @@ private static native void nativeInitialize(
288288

289289
private static native byte[] nativeGetActorIdOfNamedActor(String actorName, String namespace);
290290

291+
private static native void nativeSetCoreWorker(byte[] workerId);
292+
291293
private static native Map<String, List<ResourceValue>> nativeGetResourceIds();
292294

293295
private static native String nativeGetNamespace();
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package io.ray.runtime;
2+
3+
import io.ray.api.runtime.RayRuntime;
4+
import io.ray.runtime.config.RayConfig;
5+
import io.ray.runtime.context.WorkerContext;
6+
import io.ray.runtime.functionmanager.FunctionManager;
7+
import io.ray.runtime.gcs.GcsClient;
8+
import io.ray.runtime.object.ObjectStore;
9+
import io.ray.runtime.task.TaskExecutor;
10+
11+
/** This interface is required to make {@link RayRuntimeProxy} work. */
12+
public interface RayRuntimeInternal extends RayRuntime {
13+
14+
/** Start runtime. */
15+
void start();
16+
17+
WorkerContext getWorkerContext();
18+
19+
ObjectStore getObjectStore();
20+
21+
TaskExecutor getTaskExecutor();
22+
23+
FunctionManager getFunctionManager();
24+
25+
RayConfig getRayConfig();
26+
27+
GcsClient getGcsClient();
28+
29+
void run();
30+
}

java/runtime/src/main/java/io/ray/runtime/actor/NativeActorHandle.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import io.ray.api.Ray;
99
import io.ray.api.id.ActorId;
1010
import io.ray.api.id.ObjectId;
11-
import io.ray.runtime.AbstractRayRuntime;
11+
import io.ray.runtime.RayRuntimeInternal;
1212
import io.ray.runtime.generated.Common.Language;
1313
import java.io.Externalizable;
1414
import java.io.IOException;
@@ -122,7 +122,7 @@ private static final class NativeActorHandleReference
122122
public NativeActorHandleReference(NativeActorHandle handle) {
123123
super(handle, REFERENCE_QUEUE);
124124
this.actorId = handle.actorId;
125-
AbstractRayRuntime runtime = (AbstractRayRuntime) Ray.internal();
125+
RayRuntimeInternal runtime = (RayRuntimeInternal) Ray.internal();
126126
this.workerId = runtime.getWorkerContext().getCurrentWorkerId().getBytes();
127127
this.removed = new AtomicBoolean(false);
128128
REFERENCES.add(this);

java/runtime/src/main/java/io/ray/runtime/context/RuntimeContextImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import io.ray.api.runtimecontext.NodeInfo;
99
import io.ray.api.runtimecontext.ResourceValue;
1010
import io.ray.api.runtimecontext.RuntimeContext;
11-
import io.ray.runtime.AbstractRayRuntime;
11+
import io.ray.runtime.RayRuntimeInternal;
1212
import io.ray.runtime.config.RunMode;
1313
import io.ray.runtime.util.ResourceUtil;
1414
import java.util.ArrayList;
@@ -21,9 +21,9 @@
2121

2222
public class RuntimeContextImpl implements RuntimeContext {
2323

24-
private AbstractRayRuntime runtime;
24+
private RayRuntimeInternal runtime;
2525

26-
public RuntimeContextImpl(AbstractRayRuntime runtime) {
26+
public RuntimeContextImpl(RayRuntimeInternal runtime) {
2727
this.runtime = runtime;
2828
}
2929

java/runtime/src/main/java/io/ray/runtime/object/NativeObjectStore.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import io.ray.api.id.BaseId;
77
import io.ray.api.id.ObjectId;
88
import io.ray.api.id.UniqueId;
9-
import io.ray.runtime.AbstractRayRuntime;
9+
import io.ray.runtime.RayRuntimeInternal;
1010
import io.ray.runtime.context.WorkerContext;
1111
import io.ray.runtime.generated.Common.Address;
1212
import java.util.HashMap;
@@ -40,7 +40,7 @@ public ObjectId putRaw(NativeRayObject obj) {
4040
@Override
4141
public ObjectId putRaw(NativeRayObject obj, ActorId ownerActorId) {
4242
byte[] serializedOwnerAddressBytes =
43-
((AbstractRayRuntime) Ray.internal()).getGcsClient().getActorAddress(ownerActorId);
43+
((RayRuntimeInternal) Ray.internal()).getGcsClient().getActorAddress(ownerActorId);
4444
return new ObjectId(nativePut(obj, serializedOwnerAddressBytes));
4545
}
4646

java/runtime/src/main/java/io/ray/runtime/object/ObjectRefImpl.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import io.ray.api.Ray;
99
import io.ray.api.id.ObjectId;
1010
import io.ray.api.id.UniqueId;
11-
import io.ray.runtime.AbstractRayRuntime;
11+
import io.ray.runtime.RayRuntimeInternal;
1212
import java.io.Externalizable;
1313
import java.io.IOException;
1414
import java.io.ObjectInput;
@@ -60,7 +60,7 @@ public ObjectRefImpl(ObjectId id, Class<T> type) {
6060
public void init(ObjectId id, Class<?> type, boolean skipAddingLocalRef) {
6161
this.id = id;
6262
this.type = (Class<T>) type;
63-
AbstractRayRuntime runtime = (AbstractRayRuntime) Ray.internal();
63+
RayRuntimeInternal runtime = (RayRuntimeInternal) Ray.internal();
6464
Preconditions.checkState(workerId == null);
6565
workerId = runtime.getWorkerContext().getCurrentWorkerId();
6666

@@ -106,7 +106,7 @@ public String toString() {
106106
public void writeExternal(ObjectOutput out) throws IOException {
107107
out.writeObject(this.getId());
108108
out.writeObject(this.getType());
109-
AbstractRayRuntime runtime = (AbstractRayRuntime) Ray.internal();
109+
RayRuntimeInternal runtime = (RayRuntimeInternal) Ray.internal();
110110
byte[] ownerAddress = runtime.getObjectStore().getOwnershipInfo(this.getId());
111111
out.writeInt(ownerAddress.length);
112112
out.write(ownerAddress);
@@ -121,7 +121,7 @@ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundExcept
121121
byte[] ownerAddress = new byte[len];
122122
in.readFully(ownerAddress);
123123

124-
AbstractRayRuntime runtime = (AbstractRayRuntime) Ray.internal();
124+
RayRuntimeInternal runtime = (RayRuntimeInternal) Ray.internal();
125125
Preconditions.checkState(workerId == null);
126126
workerId = runtime.getWorkerContext().getCurrentWorkerId();
127127
runtime.getObjectStore().addLocalReference(workerId, id);
@@ -156,7 +156,7 @@ public void finalizeReferent() {
156156
REFERENCES.remove(this);
157157
// It's possible that GC is executed after the runtime is shutdown.
158158
if (Ray.isInitialized()) {
159-
((AbstractRayRuntime) (Ray.internal()))
159+
((RayRuntimeInternal) (Ray.internal()))
160160
.getObjectStore()
161161
.removeLocalReference(workerId, objectId);
162162
allObjects.remove(objectId);

java/runtime/src/main/java/io/ray/runtime/runner/worker/DefaultWorker.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package io.ray.runtime.runner.worker;
22

33
import io.ray.api.Ray;
4-
import io.ray.runtime.AbstractRayRuntime;
4+
import io.ray.runtime.RayRuntimeInternal;
55

66
/** Default implementation of the worker process. */
77
public class DefaultWorker {
@@ -12,6 +12,6 @@ public static void main(String[] args) {
1212
System.setProperty("ray.run-mode", "CLUSTER");
1313
System.setProperty("ray.worker.mode", "WORKER");
1414
Ray.init();
15-
((AbstractRayRuntime) Ray.internal()).run();
15+
((RayRuntimeInternal) Ray.internal()).run();
1616
}
1717
}

0 commit comments

Comments
 (0)