Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serve Java API Improvement #38961

Merged
merged 13 commits into from
Oct 10, 2023
222 changes: 172 additions & 50 deletions java/serve/src/main/java/io/ray/serve/api/Serve.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,27 @@
import io.ray.api.options.ActorLifetime;
import io.ray.serve.common.Constants;
import io.ray.serve.config.RayServeConfig;
import io.ray.serve.dag.Graph;
import io.ray.serve.deployment.Application;
import io.ray.serve.deployment.Deployment;
import io.ray.serve.deployment.DeploymentCreator;
import io.ray.serve.deployment.DeploymentRoute;
import io.ray.serve.exception.RayServeException;
import io.ray.serve.generated.ActorNameList;
import io.ray.serve.handle.DeploymentHandle;
import io.ray.serve.poll.LongPollClientFactory;
import io.ray.serve.replica.ReplicaContext;
import io.ray.serve.util.CollectionUtil;
import io.ray.serve.util.LogUtil;
import io.ray.serve.util.MessageFormatter;
import io.ray.serve.util.ServeProtoUtil;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -43,25 +50,30 @@ public class Serve {
* @return
*/
public static synchronized ServeControllerClient start(Map<String, String> config) {
// Initialize ray if needed.
if (!Ray.isInitialized()) {
System.setProperty("ray.job.namespace", Constants.SERVE_NAMESPACE);
Ray.init();
}
return serveStart(config);
}

private static synchronized ServeControllerClient serveStart(Map<String, String> config) {

try {
ServeControllerClient client = getGlobalClient(true);
LOGGER.info("Connecting to existing Serve app in namespace {}", Constants.SERVE_NAMESPACE);
return client;
} catch (RayServeException | IllegalStateException e) {
LOGGER.info("There is no instance running on this Ray cluster. A new one will be started.");
LOGGER.info(
"There is no Serve instance running on this Ray cluster. A new one will be started.");
}

// Initialize ray if needed.
if (!Ray.isInitialized()) {
init();
}

int httpPort =
Optional.ofNullable(config)
.map(m -> m.get(RayServeConfig.PROXY_HTTP_PORT))
.map(Integer::parseInt)
.orElse(8000);
.orElse(Integer.valueOf(System.getProperty(RayServeConfig.PROXY_HTTP_PORT, "8000")));
PyActorHandle controllerAvatar =
Ray.actor(
PyActorClass.of("ray.serve._private.controller", "ServeControllerAvatar"),
Expand Down Expand Up @@ -95,7 +107,8 @@ public static synchronized ServeControllerClient start(Map<String, String> confi
}
} catch (RayTimeoutException e) {
String errMsg =
LogUtil.format("Proxies not available after {}s.", Constants.PROXY_TIMEOUT_S);
MessageFormatter.format(
"HTTP proxies not available after {}s.", Constants.PROXY_TIMEOUT_S);
LOGGER.error(errMsg, e);
throw new RayServeException(errMsg, e);
}
Expand All @@ -108,24 +121,6 @@ public static synchronized ServeControllerClient start(Map<String, String> confi
return client;
}

public static synchronized ServeControllerClient start(
boolean detached, boolean dedicatedCpu, Map<String, String> config) {

if (!detached) {
throw new IllegalArgumentException(
"`detached=false` is no longer supported. "
+ "In a future release, it will be removed altogether.");
}

if (dedicatedCpu) {
throw new IllegalArgumentException(
"`dedicatedCpu=true` is no longer supported. "
+ "In a future release, it will be removed altogether.");
}

return start(config);
}

/**
* Completely shut down the connected Serve instance.
*
Expand All @@ -142,7 +137,7 @@ public static void shutdown() {
}

LongPollClientFactory.stop();
client.shutdown();
client.shutdown(null);
clearContext();
}

Expand Down Expand Up @@ -181,6 +176,11 @@ public static void setInternalReplicaContext(
deploymentName, replicaTag, controllerName, servableObject, config, appName);
}

/**
* Set replica information to global context.
*
* @param replicaContext
*/
public static void setInternalReplicaContext(ReplicaContext replicaContext) {
INTERNAL_REPLICA_CONTEXT = replicaContext;
}
Expand All @@ -206,7 +206,8 @@ public static ReplicaContext getReplicaContext() {
*
* @param healthCheckController If True, run a health check on the cached controller if it exists.
* If the check fails, try reconnecting to the controller.
* @return
* @return ServeControllerClient to the running Serve controller. If there is no running
* controller and raise_if_no_controller_running is set to False, returns None.
*/
public static ServeControllerClient getGlobalClient(boolean healthCheckController) {
try {
Expand All @@ -222,14 +223,15 @@ public static ServeControllerClient getGlobalClient(boolean healthCheckControlle
LOGGER.info("The cached controller has died. Reconnecting.");
setGlobalClient(null);
}
synchronized (ServeControllerClient.class) {
if (GLOBAL_CLIENT != null) {
return GLOBAL_CLIENT;
}
return connect();
}
return connect();
}

/**
* Gets the global client, which stores the controller's handle.
*
* @return ServeControllerClient to the running Serve controller. If there is no running
* controller and raise_if_no_controller_running is set to False, returns None.
*/
public static ServeControllerClient getGlobalClient() {
return getGlobalClient(false);
}
Expand All @@ -249,13 +251,19 @@ private static void setGlobalClient(ServeControllerClient client) {
*
* @return
*/
public static ServeControllerClient connect() {
private static synchronized ServeControllerClient connect() {

if (GLOBAL_CLIENT != null) {
return GLOBAL_CLIENT;
}

// Initialize ray if needed.
if (!Ray.isInitialized()) {
System.setProperty("ray.job.namespace", Constants.SERVE_NAMESPACE);
Ray.init();
init();
}

// When running inside of a replica, _INTERNAL_REPLICA_CONTEXT is set to ensure that the correct
// instance is connected to.
String controllerName =
INTERNAL_REPLICA_CONTEXT != null
? INTERNAL_REPLICA_CONTEXT.getInternalControllerName()
Expand All @@ -264,7 +272,7 @@ public static ServeControllerClient connect() {
Optional<BaseActorHandle> optional = Ray.getActor(controllerName, Constants.SERVE_NAMESPACE);
Preconditions.checkState(
optional.isPresent(),
LogUtil.format(
MessageFormatter.format(
"There is no instance running on this Ray cluster. "
+ "Please call `serve.start() to start one."));
LOGGER.info(
Expand All @@ -286,24 +294,25 @@ public static ServeControllerClient connect() {
*
* @param name name of the deployment. This must have already been deployed.
* @return Deployment
* @deprecated {@value Constants#MIGRATION_MESSAGE}
*/
@Deprecated
liuyang-my marked this conversation as resolved.
Show resolved Hide resolved
public static Deployment getDeployment(String name) {
LOGGER.warn(Constants.MIGRATION_MESSAGE);
DeploymentRoute deploymentRoute = getGlobalClient().getDeploymentInfo(name);
if (deploymentRoute == null) {
throw new RayServeException(
LogUtil.format("Deployment {} was not found. Did you call Deployment.deploy?", name));
MessageFormatter.format(
"Deployment {} was not found. Did you call Deployment.deploy?", name));
}

// TODO use DeploymentCreator
return new Deployment(
deploymentRoute.getDeploymentInfo().getReplicaConfig().getDeploymentDef(),
name,
deploymentRoute.getDeploymentInfo().getDeploymentConfig(),
deploymentRoute.getDeploymentInfo().getReplicaConfig(),
deploymentRoute.getDeploymentInfo().getVersion(),
null,
deploymentRoute.getDeploymentInfo().getReplicaConfig().getInitArgs(),
deploymentRoute.getRoute(),
deploymentRoute.getDeploymentInfo().getReplicaConfig().getRayActorOptions());
deploymentRoute.getRoute());
}

/**
Expand All @@ -312,8 +321,11 @@ public static Deployment getDeployment(String name) {
* <p>Dictionary maps deployment name to Deployment objects.
*
* @return
* @deprecated {@value Constants#MIGRATION_MESSAGE}
*/
@Deprecated
public static Map<String, Deployment> listDeployments() {
LOGGER.warn(Constants.MIGRATION_MESSAGE);
Map<String, DeploymentRoute> infos = getGlobalClient().listDeployments();
if (infos == null || infos.size() == 0) {
return Collections.emptyMap();
Expand All @@ -323,15 +335,125 @@ public static Map<String, Deployment> listDeployments() {
deployments.put(
entry.getKey(),
new Deployment(
entry.getValue().getDeploymentInfo().getReplicaConfig().getDeploymentDef(),
entry.getKey(),
entry.getValue().getDeploymentInfo().getDeploymentConfig(),
entry.getValue().getDeploymentInfo().getReplicaConfig(),
entry.getValue().getDeploymentInfo().getVersion(),
null,
entry.getValue().getDeploymentInfo().getReplicaConfig().getInitArgs(),
entry.getValue().getRoute(),
entry.getValue().getDeploymentInfo().getReplicaConfig().getRayActorOptions()));
entry.getValue().getRoute()));
}
return deployments;
}

/**
* Run an application and return a handle to its ingress deployment.
*
* @param target A Serve application returned by `Deployment.bind()`.
* @return A handle that can be used to call the application.
*/
public static Optional<DeploymentHandle> run(Application target) {
return run(target, true, Constants.SERVE_DEFAULT_APP_NAME, null, null);
}

/**
* Run an application and return a handle to its ingress deployment.
*
* @param target A Serve application returned by `Deployment.bind()`.
* @param blocking
* @param name Application name. If not provided, this will be the only application running on the
* cluster (it will delete all others).
* @param routePrefix Route prefix for HTTP requests. If not provided, it will use route_prefix of
* the ingress deployment. If specified neither as an argument nor in the ingress deployment,
* the route prefix will default to '/'.
* @param config
* @return A handle that can be used to call the application.
*/
public static Optional<DeploymentHandle> run(
liuyang-my marked this conversation as resolved.
Show resolved Hide resolved
Application target,
boolean blocking,
String name,
String routePrefix,
Map<String, String> config) {

if (StringUtils.isBlank(name)) {
throw new RayServeException("Application name must a non-empty string.");
}

ServeControllerClient client = serveStart(config);

List<Deployment> deployments = Graph.build(target.getInternalDagNode(), name);
Deployment ingress = Graph.getAndValidateIngressDeployment(deployments);

for (Deployment deployment : deployments) {
// Overwrite route prefix
if (StringUtils.isNotBlank(deployment.getRoutePrefix())
&& StringUtils.isNotBlank(routePrefix)) {
Preconditions.checkArgument(
routePrefix.startsWith("/"), "The route_prefix must start with a forward slash ('/')");
deployment.setRoutePrefix(routePrefix);
}
deployment
.getDeploymentConfig()
.setVersion(
StringUtils.isNotBlank(deployment.getVersion())
? deployment.getVersion()
: RandomStringUtils.randomAlphabetic(6));
}

client.deployApplication(name, deployments, blocking);

return Optional.ofNullable(ingress)
.map(
ingressDeployment ->
client.getDeploymentHandle(ingressDeployment.getName(), name, true));
}

private static void init() {
System.setProperty("ray.job.namespace", Constants.SERVE_NAMESPACE);
Ray.init();
}

/**
* Get a handle to the application's ingress deployment by name.
*
* @param name application name
* @return
*/
public static DeploymentHandle getAppHandle(String name) {
ServeControllerClient client = getGlobalClient();
String ingress =
(String)
((PyActorHandle) client.getController())
.task(PyActorMethod.of("get_ingress_deployment_name"), name)
.remote()
.get();

if (StringUtils.isBlank(ingress)) {
throw new RayServeException(
MessageFormatter.format("Application '{}' does not exist.", ingress));
}
return client.getDeploymentHandle(ingress, name, false);
}

/**
* Delete an application by its name.
*
* <p>Deletes the app with all corresponding deployments.
*
* @param name application name
*/
public static void delete(String name) {
delete(name, true);
}

/**
* Delete an application by its name.
*
* <p>Deletes the app with all corresponding deployments.
*
* @param name application name
* @param blocking Wait for the application to be deleted or not.
*/
public static void delete(String name, boolean blocking) {
getGlobalClient().deleteApps(Arrays.asList(name), blocking);
}
}
Loading
Loading