From 5d8c8bc90a5cdd513462488fc9f8e12c43fa0f0e Mon Sep 17 00:00:00 2001 From: Jialei Date: Wed, 4 Jan 2023 19:14:40 +0800 Subject: [PATCH] feat(controller): online eval support gc --- .../starwhale/mlops/common/ProxyServlet.java | 3 +- .../mlops/domain/job/ModelServingService.java | 160 ++++++++++++++++-- .../domain/job/mapper/ModelServingMapper.java | 5 + .../domain/job/po/ModelServingEntity.java | 1 + .../schedule/k8s/ContainerOverwriteSpec.java | 3 + .../mlops/schedule/k8s/K8sClient.java | 10 ++ .../mlops/schedule/k8s/K8sJobTemplate.java | 30 +++- .../v0_3_2/V0_3_2_008__add_last_visit.sql | 18 ++ 8 files changed, 211 insertions(+), 19 deletions(-) create mode 100644 server/controller/src/main/resources/db/migration/v0_3_2/V0_3_2_008__add_last_visit.sql diff --git a/server/controller/src/main/java/ai/starwhale/mlops/common/ProxyServlet.java b/server/controller/src/main/java/ai/starwhale/mlops/common/ProxyServlet.java index f5197233fd..2b9d49184b 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/common/ProxyServlet.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/common/ProxyServlet.java @@ -24,6 +24,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; +import java.util.Date; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; @@ -134,10 +135,10 @@ public String getTarget(String uri) { } var id = Long.parseLong(parts[1]); - // TODO add cache if (modelServingMapper.find(id) == null) { throw new IllegalArgumentException("can not find model serving entry " + parts[1]); } + modelServingMapper.updateLastVisitTime(id, new Date()); var svc = ModelServingService.getServiceName(id); var handler = ""; diff --git a/server/controller/src/main/java/ai/starwhale/mlops/domain/job/ModelServingService.java b/server/controller/src/main/java/ai/starwhale/mlops/domain/job/ModelServingService.java index 497d0ce1f0..9090340f73 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/domain/job/ModelServingService.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/domain/job/ModelServingService.java @@ -39,20 +39,27 @@ import ai.starwhale.mlops.exception.SwProcessException; import ai.starwhale.mlops.schedule.k8s.K8sClient; import ai.starwhale.mlops.schedule.k8s.K8sJobTemplate; -import com.google.protobuf.Api; import io.kubernetes.client.custom.IntOrString; import io.kubernetes.client.openapi.ApiException; import io.kubernetes.client.openapi.models.V1ObjectMeta; +import io.kubernetes.client.openapi.models.V1OwnerReference; import io.kubernetes.client.openapi.models.V1Service; import io.kubernetes.client.openapi.models.V1ServicePort; import io.kubernetes.client.openapi.models.V1ServiceSpec; +import io.kubernetes.client.openapi.models.V1StatefulSet; +import io.kubernetes.client.util.labels.EqualityMatcher; +import io.kubernetes.client.util.labels.LabelSelector; +import java.util.Date; import java.util.List; import java.util.Map; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.Objects; +import java.util.TreeMap; +import java.util.regex.Pattern; import javax.servlet.http.HttpServletResponse; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; @Slf4j @@ -77,6 +84,7 @@ public class ModelServingService { public static final String MODEL_SERVICE_PREFIX = "model-serving"; + private static final Pattern modelServingNamePattern = Pattern.compile(MODEL_SERVICE_PREFIX + "-(\\d+)"); public ModelServingService( ModelServingMapper modelServingMapper, @@ -136,16 +144,23 @@ public ModelServingVo create( .modelVersionId(modelVersionId) .jobStatus(JobStatus.CREATED) .resourcePool(resourcePool) + .lastVisitTime(new Date()) .build(); - modelServingMapper.add(entity); + long id; + synchronized (this) { + modelServingMapper.add(entity); - var services = modelServingMapper.list(projectId, modelVersionId, runtimeVersionId, resourcePool); - if (services.size() != 1) { - // this can not happen - throw new SwProcessException(SwProcessException.ErrorType.DB, "duplicate entries, size " + services.size()); + var services = modelServingMapper.list(projectId, modelVersionId, runtimeVersionId, resourcePool); + if (services.size() != 1) { + // this can not happen + throw new SwProcessException(SwProcessException.ErrorType.DB, + "duplicate entries, size " + services.size()); + } + id = services.get(0).getId(); + // update last visit time, prevents garbage collected + modelServingMapper.updateLastVisitTime(id, new Date()); } - var id = services.get(0).getId(); log.info("Model serving job has been created. ID={}", id); @@ -199,7 +214,7 @@ private void deploy( ); var ss = k8sJobTemplate.renderModelServingOrch(envs, image, name); try { - k8sClient.deployStatefulSet(ss); + ss = k8sClient.deployStatefulSet(ss); } catch (ApiException e) { if (e.getCode() != HttpServletResponse.SC_CONFLICT) { throw e; @@ -214,24 +229,141 @@ private void deploy( svc.metadata(meta); var spec = new V1ServiceSpec(); svc.spec(spec); - var selector = Map.of("app", name); + var selector = Map.of(K8sJobTemplate.LABEL_APP, name); spec.selector(selector); var port = new V1ServicePort(); port.name("model-serving-port"); port.protocol("TCP"); port.port(80); - port.targetPort(new IntOrString(8080)); + port.targetPort(new IntOrString(K8sJobTemplate.ONLINE_EVAL_PORT_IN_POD)); spec.ports(List.of(port)); + + // add owner reference for svc and we can just delete the stateful-set when gc is needed + var ownerRef = new V1OwnerReference(); + ownerRef.kind(ss.getKind()); + Objects.requireNonNull(ss.getMetadata()); + ownerRef.uid(ss.getMetadata().getUid()); + meta.ownerReferences(List.of(ownerRef)); + + // add svc to k8s k8sClient.deployService(svc); - // TODO add owner reference for svc - // TODO garbage collection when svc fails + + // if operations of svc failed, the gc thread will delete the zombie stateful-set, + // so we do not need to delete the previous stateful-set when this fails } public static String getServiceName(long id) { return String.format("%s-%d", MODEL_SERVICE_PREFIX, id); } + public static Long getServiceIdFromName(String name) { + var match = modelServingNamePattern.matcher(name); + if (match.matches()) { + return Long.parseLong(match.group(1)); + } + return null; + } + public static String getServiceBaseUri(long id) { return String.format("/gateway/%s/%d", MODEL_SERVICE_PREFIX, id); } + + + @Scheduled(initialDelay = 10000, fixedDelay = 10000) + public void gc() throws ApiException { + var labelSelector = LabelSelector.and(EqualityMatcher.equal(K8sJobTemplate.LABEL_WORKLOAD_TYPE, + K8sJobTemplate.WORKLOAD_TYPE_ONLINE_EVAL)).toString(); + var statefulSetList = k8sClient.getStatefulSetList(labelSelector); + + boolean hasPending = false; + Map running = new TreeMap<>((t1, t2) -> { + // oldest at the beginning + return t1 == t2 ? 0 : t1.before(t2) ? -1 : 1; + }); + + for (var statefulSet : statefulSetList.getItems()) { + // check if the stateful set is outdated + var meta = statefulSet.getMetadata(); + if (meta == null || StringUtils.isEmpty(meta.getName())) { + continue; + } + // parse entity id from stateful set name + var name = meta.getName(); + var id = getServiceIdFromName(name); + if (id == null) { + log.warn("can not get entity id from name {}", name); + continue; + } + + // check if in db record + ModelServingEntity entity; + synchronized (this) { + entity = modelServingMapper.find(id); + } + if (entity == null) { + // delete the unknown stateful set + log.info("delete stateful set {} when there is no entry in db", name); + deleteStatefulSet(name); + continue; + } + + var now = System.currentTimeMillis(); + // TODO use duration from system settings + if (now - entity.getLastVisitTime().getTime() > 12 * 3600 * 1000) { + log.info("delete stateful set {} when it reaches the max TTL", name); + deleteStatefulSet(name); + } + + var createTime = statefulSet.getMetadata().getCreationTimestamp(); + // TODO use duration from system settings + if (createTime != null && now - createTime.toInstant().toEpochMilli() < 1800 * 1000) { + // just been deployed, ignore + log.info("ignore stateful set {} (just been deployed)", name); + continue; + } + + var status = statefulSet.getStatus(); + // check if the stateful set is pending + if (status == null) { + // may have just been deployed, ignore + log.info("ignore stateful set {} (no status found)", name); + continue; + } + if (status.getReadyReplicas() == null || status.getReadyReplicas() == 0) { + hasPending = true; + log.info("found pending stateful set {}", name); + continue; + } + + running.put(entity.getLastVisitTime(), statefulSet); + } + + if (!hasPending) { + log.info("no pending stateful set, done"); + return; + } + + // kill the oldest stateful set + if (running.isEmpty()) { + log.info("no stateful set to gc"); + return; + } + var key = running.keySet().iterator().next(); + var oldest = running.get(key); + var name = Objects.requireNonNull(oldest.getMetadata()).getName(); + k8sClient.deleteStatefulSet(name); + log.info("delete stateful set {}", name); + } + + private void deleteStatefulSet(String name) { + try { + k8sClient.deleteStatefulSet(name); + } catch (ApiException e) { + if (e.getCode() == HttpServletResponse.SC_NOT_FOUND) { + log.info("stateful set {} not found", name); + return; + } + log.error("delete stateful set {} failed, reason {}", name, e.getResponseBody(), e); + } + } } diff --git a/server/controller/src/main/java/ai/starwhale/mlops/domain/job/mapper/ModelServingMapper.java b/server/controller/src/main/java/ai/starwhale/mlops/domain/job/mapper/ModelServingMapper.java index b29bf21be7..6aba5c0962 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/domain/job/mapper/ModelServingMapper.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/domain/job/mapper/ModelServingMapper.java @@ -18,6 +18,7 @@ import ai.starwhale.mlops.domain.job.po.ModelServingEntity; import java.util.Arrays; +import java.util.Date; import java.util.List; import org.apache.commons.text.CaseUtils; import org.apache.ibatis.annotations.InsertProvider; @@ -26,6 +27,7 @@ import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Select; import org.apache.ibatis.annotations.SelectProvider; +import org.apache.ibatis.annotations.Update; import org.apache.ibatis.jdbc.SQL; @Mapper @@ -49,6 +51,9 @@ public interface ModelServingMapper { @Select("select * from " + TABLE + " where id=#{id}") ModelServingEntity find(long id); + @Update("update " + TABLE + " set last_visit_time = #{date} where id = #{id}") + void updateLastVisitTime(long id, Date date); + @SelectProvider(value = SqlProviderAdapter.class, method = "listByConditions") List list( @Param("projectId") Long projectId, diff --git a/server/controller/src/main/java/ai/starwhale/mlops/domain/job/po/ModelServingEntity.java b/server/controller/src/main/java/ai/starwhale/mlops/domain/job/po/ModelServingEntity.java index d44b13d70e..dda9d42e2c 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/domain/job/po/ModelServingEntity.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/domain/job/po/ModelServingEntity.java @@ -41,4 +41,5 @@ public class ModelServingEntity extends BaseEntity { private Long runtimeVersionId; private Integer isDeleted; private String resourcePool; + private Date lastVisitTime; } diff --git a/server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/ContainerOverwriteSpec.java b/server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/ContainerOverwriteSpec.java index d8eaa2c9f5..9a9c1919d9 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/ContainerOverwriteSpec.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/ContainerOverwriteSpec.java @@ -17,6 +17,7 @@ package ai.starwhale.mlops.schedule.k8s; import io.kubernetes.client.openapi.models.V1EnvVar; +import io.kubernetes.client.openapi.models.V1Probe; import java.util.List; import lombok.AllArgsConstructor; import lombok.Builder; @@ -39,6 +40,8 @@ public class ContainerOverwriteSpec { List envs; + V1Probe readinessProbe; + public ContainerOverwriteSpec(String name) { this.name = name; } diff --git a/server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/K8sClient.java b/server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/K8sClient.java index a2caca7d4a..79e70b1b44 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/K8sClient.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/K8sClient.java @@ -33,6 +33,7 @@ import io.kubernetes.client.openapi.models.V1PodList; import io.kubernetes.client.openapi.models.V1Service; import io.kubernetes.client.openapi.models.V1StatefulSet; +import io.kubernetes.client.openapi.models.V1StatefulSetList; import io.kubernetes.client.util.CallGeneratorParams; import io.kubernetes.client.util.labels.LabelSelector; import java.io.IOException; @@ -102,6 +103,10 @@ public void deleteJob(String id) throws ApiException { batchV1Api.deleteNamespacedJob(id, ns, null, null, 1, false, null, null); } + public void deleteStatefulSet(String name) throws ApiException { + appsV1Api.deleteNamespacedStatefulSet(name, ns, null, null, 1, false, null, null); + } + /** * get all jobs with in this.ns * @@ -111,6 +116,11 @@ public V1JobList getJobs(String labelSelector) throws ApiException { return batchV1Api.listNamespacedJob(ns, null, null, null, null, labelSelector, null, null, null, 30, null); } + public V1StatefulSetList getStatefulSetList(String labelSelector) throws ApiException { + return appsV1Api.listNamespacedStatefulSet(ns, null, null, null, labelSelector, + null, null, null, null, 30, null); + } + public void watchJob(ResourceEventHandler eventH, String selector) { SharedIndexInformer jobInformer = informerFactory.sharedIndexInformerFor( (CallGeneratorParams params) -> batchV1Api.listNamespacedJobCall(ns, null, null, null, null, diff --git a/server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/K8sJobTemplate.java b/server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/K8sJobTemplate.java index 8b23180f98..c38d7c0635 100644 --- a/server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/K8sJobTemplate.java +++ b/server/controller/src/main/java/ai/starwhale/mlops/schedule/k8s/K8sJobTemplate.java @@ -16,14 +16,17 @@ package ai.starwhale.mlops.schedule.k8s; +import io.kubernetes.client.custom.IntOrString; import io.kubernetes.client.openapi.models.V1Container; import io.kubernetes.client.openapi.models.V1EmptyDirVolumeSource; import io.kubernetes.client.openapi.models.V1EnvVar; +import io.kubernetes.client.openapi.models.V1HTTPGetAction; import io.kubernetes.client.openapi.models.V1Job; import io.kubernetes.client.openapi.models.V1JobSpec; import io.kubernetes.client.openapi.models.V1ObjectMeta; import io.kubernetes.client.openapi.models.V1PodSpec; import io.kubernetes.client.openapi.models.V1PodTemplateSpec; +import io.kubernetes.client.openapi.models.V1Probe; import io.kubernetes.client.openapi.models.V1StatefulSet; import io.kubernetes.client.openapi.models.V1Volume; import io.kubernetes.client.util.Yaml; @@ -55,6 +58,10 @@ public class K8sJobTemplate { private final String pipCacheHostPath; public static final String DEVICE_LABEL_NAME_PREFIX = "device.starwhale.ai-"; + public static final String LABEL_APP = "app"; + public static final String LABEL_WORKLOAD_TYPE = "starwhale-workload-type"; + public static final String WORKLOAD_TYPE_ONLINE_EVAL = "online-eval"; + public static final int ONLINE_EVAL_PORT_IN_POD = 8080; final String evalJobTemplate; final String modelServingJobTemplate; @@ -111,24 +118,37 @@ public V1Job renderJob( public V1StatefulSet renderModelServingOrch(Map envs, String image, String name) { var ss = Yaml.loadAs(this.modelServingJobTemplate, V1StatefulSet.class); Objects.requireNonNull(ss.getMetadata()); + + // set name and labels ss.getMetadata().name(name); var spec = ss.getSpec(); Objects.requireNonNull(spec); - var labels = Map.of("app", name); + var labels = Map.of(LABEL_APP, name, LABEL_WORKLOAD_TYPE, WORKLOAD_TYPE_ONLINE_EVAL); spec.getSelector().matchLabels(labels); Objects.requireNonNull(spec.getTemplate().getMetadata()); spec.getTemplate().getMetadata().labels(labels); - var podSpec = spec.getTemplate().getSpec(); - Objects.requireNonNull(podSpec); + // set container spec final String containerName = "worker"; var cos = new ContainerOverwriteSpec(); cos.setName(containerName); cos.setImage(image); cos.setEnvs(envs.entrySet().stream().map(K8sJobTemplate::toEnvVar).collect(Collectors.toList())); + // add readiness probe + var readiness = new V1Probe(); + cos.setReadinessProbe(readiness); + readiness.failureThreshold(3); + var httpGet = new V1HTTPGetAction(); + readiness.httpGet(httpGet); + httpGet.path("/"); + httpGet.port(new IntOrString(ONLINE_EVAL_PORT_IN_POD)); + httpGet.scheme("http"); + var containerSpecMap = new HashMap(); containerSpecMap.put(containerName, cos); + var podSpec = spec.getTemplate().getSpec(); + Objects.requireNonNull(podSpec); patchPodSpec("Always", containerSpecMap, null, podSpec); patchPipCacheVolume(ss.getSpec().getTemplate().getSpec().getVolumes()); addDeviceInfoLabel(spec.getTemplate(), containerSpecMap); @@ -172,7 +192,9 @@ private static void patchPodSpec( if (!CollectionUtils.isEmpty(containerOverwriteSpec.envs)) { c.env(containerOverwriteSpec.envs); } - + if (containerOverwriteSpec.readinessProbe != null) { + c.readinessProbe(containerOverwriteSpec.readinessProbe); + } }); } diff --git a/server/controller/src/main/resources/db/migration/v0_3_2/V0_3_2_008__add_last_visit.sql b/server/controller/src/main/resources/db/migration/v0_3_2/V0_3_2_008__add_last_visit.sql new file mode 100644 index 0000000000..e4d8648cd5 --- /dev/null +++ b/server/controller/src/main/resources/db/migration/v0_3_2/V0_3_2_008__add_last_visit.sql @@ -0,0 +1,18 @@ +/* + * Copyright 2022 Starwhale, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE `model_serving_info` + ADD last_visit_time DATETIME NOT NULL;