Skip to content

Commit

Permalink
Serve Java API Improvement (#38961)
Browse files Browse the repository at this point in the history
This is about Serve Java Improvement. The goal of this PR is to make the Java API consistent with Python.

The design of the user API is discussed in this proposal: ray-project/enhancements#42.
This PR only covers the Java programming API part, including Deployment.bind, Serve.run, DeploymentHandle, and so on, ensuring that these APIs can be used properly. The alignment of some internal core logic, documentation enhancements, and support for the config file will be submitted in subsequent PRs.

Signed-off-by: chuhan.ly <chuhan.ly@antgroup.com>
Co-authored-by: chuhan.ly <chuhan.ly@antgroup.com>
  • Loading branch information
liuyang-my and chuhan.ly authored Oct 10, 2023
1 parent 1562330 commit 99f76fe
Show file tree
Hide file tree
Showing 55 changed files with 1,756 additions and 760 deletions.
222 changes: 172 additions & 50 deletions java/serve/src/main/java/io/ray/serve/api/Serve.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,27 @@
import io.ray.api.options.ActorLifetime;
import io.ray.serve.common.Constants;
import io.ray.serve.config.RayServeConfig;
import io.ray.serve.dag.Graph;
import io.ray.serve.deployment.Application;
import io.ray.serve.deployment.Deployment;
import io.ray.serve.deployment.DeploymentCreator;
import io.ray.serve.deployment.DeploymentRoute;
import io.ray.serve.exception.RayServeException;
import io.ray.serve.generated.ActorNameList;
import io.ray.serve.handle.DeploymentHandle;
import io.ray.serve.poll.LongPollClientFactory;
import io.ray.serve.replica.ReplicaContext;
import io.ray.serve.util.CollectionUtil;
import io.ray.serve.util.LogUtil;
import io.ray.serve.util.MessageFormatter;
import io.ray.serve.util.ServeProtoUtil;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -43,25 +50,30 @@ public class Serve {
* @return
*/
public static synchronized ServeControllerClient start(Map<String, String> config) {
// Initialize ray if needed.
if (!Ray.isInitialized()) {
System.setProperty("ray.job.namespace", Constants.SERVE_NAMESPACE);
Ray.init();
}
return serveStart(config);
}

private static synchronized ServeControllerClient serveStart(Map<String, String> config) {

try {
ServeControllerClient client = getGlobalClient(true);
LOGGER.info("Connecting to existing Serve app in namespace {}", Constants.SERVE_NAMESPACE);
return client;
} catch (RayServeException | IllegalStateException e) {
LOGGER.info("There is no instance running on this Ray cluster. A new one will be started.");
LOGGER.info(
"There is no Serve instance running on this Ray cluster. A new one will be started.");
}

// Initialize ray if needed.
if (!Ray.isInitialized()) {
init();
}

int httpPort =
Optional.ofNullable(config)
.map(m -> m.get(RayServeConfig.PROXY_HTTP_PORT))
.map(Integer::parseInt)
.orElse(8000);
.orElse(Integer.valueOf(System.getProperty(RayServeConfig.PROXY_HTTP_PORT, "8000")));
PyActorHandle controllerAvatar =
Ray.actor(
PyActorClass.of("ray.serve._private.controller", "ServeControllerAvatar"),
Expand Down Expand Up @@ -95,7 +107,8 @@ public static synchronized ServeControllerClient start(Map<String, String> confi
}
} catch (RayTimeoutException e) {
String errMsg =
LogUtil.format("Proxies not available after {}s.", Constants.PROXY_TIMEOUT_S);
MessageFormatter.format(
"HTTP proxies not available after {}s.", Constants.PROXY_TIMEOUT_S);
LOGGER.error(errMsg, e);
throw new RayServeException(errMsg, e);
}
Expand All @@ -108,24 +121,6 @@ public static synchronized ServeControllerClient start(Map<String, String> confi
return client;
}

public static synchronized ServeControllerClient start(
boolean detached, boolean dedicatedCpu, Map<String, String> config) {

if (!detached) {
throw new IllegalArgumentException(
"`detached=false` is no longer supported. "
+ "In a future release, it will be removed altogether.");
}

if (dedicatedCpu) {
throw new IllegalArgumentException(
"`dedicatedCpu=true` is no longer supported. "
+ "In a future release, it will be removed altogether.");
}

return start(config);
}

/**
* Completely shut down the connected Serve instance.
*
Expand All @@ -142,7 +137,7 @@ public static void shutdown() {
}

LongPollClientFactory.stop();
client.shutdown();
client.shutdown(null);
clearContext();
}

Expand Down Expand Up @@ -181,6 +176,11 @@ public static void setInternalReplicaContext(
deploymentName, replicaTag, controllerName, servableObject, config, appName);
}

/**
* Set replica information to global context.
*
* @param replicaContext
*/
public static void setInternalReplicaContext(ReplicaContext replicaContext) {
INTERNAL_REPLICA_CONTEXT = replicaContext;
}
Expand All @@ -206,7 +206,8 @@ public static ReplicaContext getReplicaContext() {
*
* @param healthCheckController If True, run a health check on the cached controller if it exists.
* If the check fails, try reconnecting to the controller.
* @return
* @return ServeControllerClient to the running Serve controller. If there is no running
* controller and raise_if_no_controller_running is set to False, returns None.
*/
public static ServeControllerClient getGlobalClient(boolean healthCheckController) {
try {
Expand All @@ -222,14 +223,15 @@ public static ServeControllerClient getGlobalClient(boolean healthCheckControlle
LOGGER.info("The cached controller has died. Reconnecting.");
setGlobalClient(null);
}
synchronized (ServeControllerClient.class) {
if (GLOBAL_CLIENT != null) {
return GLOBAL_CLIENT;
}
return connect();
}
return connect();
}

/**
* Gets the global client, which stores the controller's handle.
*
* @return ServeControllerClient to the running Serve controller. If there is no running
* controller and raise_if_no_controller_running is set to False, returns None.
*/
public static ServeControllerClient getGlobalClient() {
return getGlobalClient(false);
}
Expand All @@ -249,13 +251,19 @@ private static void setGlobalClient(ServeControllerClient client) {
*
* @return
*/
public static ServeControllerClient connect() {
private static synchronized ServeControllerClient connect() {

if (GLOBAL_CLIENT != null) {
return GLOBAL_CLIENT;
}

// Initialize ray if needed.
if (!Ray.isInitialized()) {
System.setProperty("ray.job.namespace", Constants.SERVE_NAMESPACE);
Ray.init();
init();
}

// When running inside of a replica, _INTERNAL_REPLICA_CONTEXT is set to ensure that the correct
// instance is connected to.
String controllerName =
INTERNAL_REPLICA_CONTEXT != null
? INTERNAL_REPLICA_CONTEXT.getInternalControllerName()
Expand All @@ -264,7 +272,7 @@ public static ServeControllerClient connect() {
Optional<BaseActorHandle> optional = Ray.getActor(controllerName, Constants.SERVE_NAMESPACE);
Preconditions.checkState(
optional.isPresent(),
LogUtil.format(
MessageFormatter.format(
"There is no instance running on this Ray cluster. "
+ "Please call `serve.start() to start one."));
LOGGER.info(
Expand All @@ -286,24 +294,25 @@ public static ServeControllerClient connect() {
*
* @param name name of the deployment. This must have already been deployed.
* @return Deployment
* @deprecated {@value Constants#MIGRATION_MESSAGE}
*/
@Deprecated
public static Deployment getDeployment(String name) {
LOGGER.warn(Constants.MIGRATION_MESSAGE);
DeploymentRoute deploymentRoute = getGlobalClient().getDeploymentInfo(name);
if (deploymentRoute == null) {
throw new RayServeException(
LogUtil.format("Deployment {} was not found. Did you call Deployment.deploy?", name));
MessageFormatter.format(
"Deployment {} was not found. Did you call Deployment.deploy?", name));
}

// TODO use DeploymentCreator
return new Deployment(
deploymentRoute.getDeploymentInfo().getReplicaConfig().getDeploymentDef(),
name,
deploymentRoute.getDeploymentInfo().getDeploymentConfig(),
deploymentRoute.getDeploymentInfo().getReplicaConfig(),
deploymentRoute.getDeploymentInfo().getVersion(),
null,
deploymentRoute.getDeploymentInfo().getReplicaConfig().getInitArgs(),
deploymentRoute.getRoute(),
deploymentRoute.getDeploymentInfo().getReplicaConfig().getRayActorOptions());
deploymentRoute.getRoute());
}

/**
Expand All @@ -312,8 +321,11 @@ public static Deployment getDeployment(String name) {
* <p>Dictionary maps deployment name to Deployment objects.
*
* @return
* @deprecated {@value Constants#MIGRATION_MESSAGE}
*/
@Deprecated
public static Map<String, Deployment> listDeployments() {
LOGGER.warn(Constants.MIGRATION_MESSAGE);
Map<String, DeploymentRoute> infos = getGlobalClient().listDeployments();
if (infos == null || infos.size() == 0) {
return Collections.emptyMap();
Expand All @@ -323,15 +335,125 @@ public static Map<String, Deployment> listDeployments() {
deployments.put(
entry.getKey(),
new Deployment(
entry.getValue().getDeploymentInfo().getReplicaConfig().getDeploymentDef(),
entry.getKey(),
entry.getValue().getDeploymentInfo().getDeploymentConfig(),
entry.getValue().getDeploymentInfo().getReplicaConfig(),
entry.getValue().getDeploymentInfo().getVersion(),
null,
entry.getValue().getDeploymentInfo().getReplicaConfig().getInitArgs(),
entry.getValue().getRoute(),
entry.getValue().getDeploymentInfo().getReplicaConfig().getRayActorOptions()));
entry.getValue().getRoute()));
}
return deployments;
}

/**
* Run an application and return a handle to its ingress deployment.
*
* @param target A Serve application returned by `Deployment.bind()`.
* @return A handle that can be used to call the application.
*/
public static Optional<DeploymentHandle> run(Application target) {
return run(target, true, Constants.SERVE_DEFAULT_APP_NAME, null, null);
}

/**
* Run an application and return a handle to its ingress deployment.
*
* @param target A Serve application returned by `Deployment.bind()`.
* @param blocking
* @param name Application name. If not provided, this will be the only application running on the
* cluster (it will delete all others).
* @param routePrefix Route prefix for HTTP requests. If not provided, it will use route_prefix of
* the ingress deployment. If specified neither as an argument nor in the ingress deployment,
* the route prefix will default to '/'.
* @param config
* @return A handle that can be used to call the application.
*/
public static Optional<DeploymentHandle> run(
Application target,
boolean blocking,
String name,
String routePrefix,
Map<String, String> config) {

if (StringUtils.isBlank(name)) {
throw new RayServeException("Application name must a non-empty string.");
}

ServeControllerClient client = serveStart(config);

List<Deployment> deployments = Graph.build(target.getInternalDagNode(), name);
Deployment ingress = Graph.getAndValidateIngressDeployment(deployments);

for (Deployment deployment : deployments) {
// Overwrite route prefix
if (StringUtils.isNotBlank(deployment.getRoutePrefix())
&& StringUtils.isNotBlank(routePrefix)) {
Preconditions.checkArgument(
routePrefix.startsWith("/"), "The route_prefix must start with a forward slash ('/')");
deployment.setRoutePrefix(routePrefix);
}
deployment
.getDeploymentConfig()
.setVersion(
StringUtils.isNotBlank(deployment.getVersion())
? deployment.getVersion()
: RandomStringUtils.randomAlphabetic(6));
}

client.deployApplication(name, deployments, blocking);

return Optional.ofNullable(ingress)
.map(
ingressDeployment ->
client.getDeploymentHandle(ingressDeployment.getName(), name, true));
}

private static void init() {
System.setProperty("ray.job.namespace", Constants.SERVE_NAMESPACE);
Ray.init();
}

/**
* Get a handle to the application's ingress deployment by name.
*
* @param name application name
* @return
*/
public static DeploymentHandle getAppHandle(String name) {
ServeControllerClient client = getGlobalClient();
String ingress =
(String)
((PyActorHandle) client.getController())
.task(PyActorMethod.of("get_ingress_deployment_name"), name)
.remote()
.get();

if (StringUtils.isBlank(ingress)) {
throw new RayServeException(
MessageFormatter.format("Application '{}' does not exist.", ingress));
}
return client.getDeploymentHandle(ingress, name, false);
}

/**
* Delete an application by its name.
*
* <p>Deletes the app with all corresponding deployments.
*
* @param name application name
*/
public static void delete(String name) {
delete(name, true);
}

/**
* Delete an application by its name.
*
* <p>Deletes the app with all corresponding deployments.
*
* @param name application name
* @param blocking Wait for the application to be deleted or not.
*/
public static void delete(String name, boolean blocking) {
getGlobalClient().deleteApps(Arrays.asList(name), blocking);
}
}
Loading

0 comments on commit 99f76fe

Please sign in to comment.