Skip to content

Commit

Permalink
1. Unify the Controller.deploy_application for x-lang.
Browse files Browse the repository at this point in the history
2. Include some audit content.
3. DeploymentHandle.remote use the requestId from context.

Signed-off-by: chuhan.ly <chuhan.ly@antgroup.com>
  • Loading branch information
chuhan.ly committed Oct 7, 2023
1 parent 1f3676b commit fe346be
Show file tree
Hide file tree
Showing 16 changed files with 293 additions and 277 deletions.
92 changes: 60 additions & 32 deletions java/serve/src/main/java/io/ray/serve/api/Serve.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.ray.serve.deployment.DeploymentRoute;
import io.ray.serve.exception.RayServeException;
import io.ray.serve.generated.ActorNameList;
import io.ray.serve.generated.StatusOverview;
import io.ray.serve.handle.DeploymentHandle;
import io.ray.serve.poll.LongPollClientFactory;
import io.ray.serve.replica.ReplicaContext;
Expand Down Expand Up @@ -54,7 +53,7 @@ public static synchronized ServeControllerClient start(Map<String, String> confi
return serveStart(config);
}

public static synchronized ServeControllerClient serveStart(Map<String, String> config) {
private static synchronized ServeControllerClient serveStart(Map<String, String> config) {

try {
ServeControllerClient client = getGlobalClient(true);
Expand Down Expand Up @@ -108,7 +107,8 @@ public static synchronized ServeControllerClient serveStart(Map<String, String>
}
} catch (RayTimeoutException e) {
String errMsg =
MessageFormatter.format("Proxies not available after {}s.", Constants.PROXY_TIMEOUT_S);
MessageFormatter.format(
"HTTP proxies not available after {}s.", Constants.PROXY_TIMEOUT_S);
LOGGER.error(errMsg, e);
throw new RayServeException(errMsg, e);
}
Expand All @@ -121,24 +121,6 @@ public static synchronized ServeControllerClient serveStart(Map<String, String>
return client;
}

public static synchronized ServeControllerClient start(
boolean detached, boolean dedicatedCpu, Map<String, String> config) {

if (!detached) {
throw new IllegalArgumentException(
"`detached=false` is no longer supported. "
+ "In a future release, it will be removed altogether.");
}

if (dedicatedCpu) {
throw new IllegalArgumentException(
"`dedicatedCpu=true` is no longer supported. "
+ "In a future release, it will be removed altogether.");
}

return start(config);
}

/**
* Completely shut down the connected Serve instance.
*
Expand All @@ -155,7 +137,7 @@ public static void shutdown() {
}

LongPollClientFactory.stop();
client.shutdown();
client.shutdown(null);
clearContext();
}

Expand Down Expand Up @@ -194,6 +176,11 @@ public static void setInternalReplicaContext(
deploymentName, replicaTag, controllerName, servableObject, config, appName);
}

/**
* Set replica information to global context.
*
* @param replicaContext
*/
public static void setInternalReplicaContext(ReplicaContext replicaContext) {
INTERNAL_REPLICA_CONTEXT = replicaContext;
}
Expand All @@ -219,7 +206,8 @@ public static ReplicaContext getReplicaContext() {
*
* @param healthCheckController If True, run a health check on the cached controller if it exists.
* If the check fails, try reconnecting to the controller.
* @return
* @return ServeControllerClient to the running Serve controller. If there is no running
* controller and raise_if_no_controller_running is set to False, returns None.
*/
public static ServeControllerClient getGlobalClient(boolean healthCheckController) {
try {
Expand All @@ -238,6 +226,12 @@ public static ServeControllerClient getGlobalClient(boolean healthCheckControlle
return connect();
}

/**
* Gets the global client, which stores the controller's handle.
*
* @return ServeControllerClient to the running Serve controller. If there is no running
* controller and raise_if_no_controller_running is set to False, returns None.
*/
public static ServeControllerClient getGlobalClient() {
return getGlobalClient(false);
}
Expand Down Expand Up @@ -300,9 +294,11 @@ private static synchronized ServeControllerClient connect() {
*
* @param name name of the deployment. This must have already been deployed.
* @return Deployment
* @deprecated {@value Constants#MIGRATION_MESSAGE}
*/
@Deprecated
public static Deployment getDeployment(String name) {
LOGGER.warn(Constants.MIGRATION_MESSAGE);
DeploymentRoute deploymentRoute = getGlobalClient().getDeploymentInfo(name);
if (deploymentRoute == null) {
throw new RayServeException(
Expand All @@ -325,9 +321,11 @@ public static Deployment getDeployment(String name) {
* <p>Dictionary maps deployment name to Deployment objects.
*
* @return
* @deprecated {@value Constants#MIGRATION_MESSAGE}
*/
@Deprecated
public static Map<String, Deployment> listDeployments() {
LOGGER.warn(Constants.MIGRATION_MESSAGE);
Map<String, DeploymentRoute> infos = getGlobalClient().listDeployments();
if (infos == null || infos.size() == 0) {
return Collections.emptyMap();
Expand All @@ -346,10 +344,29 @@ public static Map<String, Deployment> listDeployments() {
return deployments;
}

/**
* Run an application and return a handle to its ingress deployment.
*
* @param target A Serve application returned by `Deployment.bind()`.
* @return A handle that can be used to call the application.
*/
public static Optional<DeploymentHandle> run(Application target) {
return run(target, true, Constants.SERVE_DEFAULT_APP_NAME, null, null);
}

/**
* Run an application and return a handle to its ingress deployment.
*
* @param target A Serve application returned by `Deployment.bind()`.
* @param blocking
* @param name Application name. If not provided, this will be the only application running on the
* cluster (it will delete all others).
* @param routePrefix Route prefix for HTTP requests. If not provided, it will use route_prefix of
* the ingress deployment. If specified neither as an argument nor in the ingress deployment,
* the route prefix will default to '/'.
* @param config
* @return A handle that can be used to call the application.
*/
public static Optional<DeploymentHandle> run(
Application target,
boolean blocking,
Expand All @@ -368,8 +385,8 @@ public static Optional<DeploymentHandle> run(

for (Deployment deployment : deployments) {
// Overwrite route prefix
if (StringUtils.isNotBlank(routePrefix)
&& StringUtils.isNotBlank(deployment.getRoutePrefix())) {
if (StringUtils.isNotBlank(deployment.getRoutePrefix())
&& StringUtils.isNotBlank(routePrefix)) {
Preconditions.checkArgument(
routePrefix.startsWith("/"), "The route_prefix must start with a forward slash ('/')");
deployment.setRoutePrefix(routePrefix);
Expand All @@ -393,6 +410,12 @@ private static void init() {
Ray.init();
}

/**
* Get a handle to the application's ingress deployment by name.
*
* @param name application name
* @return
*/
public static DeploymentHandle getAppHandle(String name) {
ServeControllerClient client = getGlobalClient();
String ingress =
Expand All @@ -409,21 +432,26 @@ public static DeploymentHandle getAppHandle(String name) {
return client.getHandle(ingress, name, false);
}

/**
* Delete an application by its name.
*
* <p>Deletes the app with all corresponding deployments.
*
* @param name application name
*/
public static void delete(String name) {
delete(name, true);
}

/**
* Delete an application by its name. Deletes the app with all corresponding deployments.
* Delete an application by its name.
*
* @param name
* @param blocking
* <p>Deletes the app with all corresponding deployments.
*
* @param name application name
* @param blocking Wait for the application to be deleted or not.
*/
public static void delete(String name, boolean blocking) {
getGlobalClient().deleteApps(Arrays.asList(name), blocking);
}

public static StatusOverview status(String name) {
return getGlobalClient().getServeStatus(name);
}
}
Loading

0 comments on commit fe346be

Please sign in to comment.