From 6cd34358161c1b3d3a6d1ad62c3a8ccb1ece1f7a Mon Sep 17 00:00:00 2001 From: "xiaoman.gao@extremevision.com.cn" <825485697@qq.com> Date: Tue, 11 Jun 2024 14:01:02 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E9=83=A8=E5=88=86=E6=98=BE?= =?UTF-8?q?=E7=A4=BAbug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- myapp/models/model_dimension.py | 19 ++++++---- myapp/models/model_job.py | 2 +- myapp/models/model_notebook.py | 9 ----- myapp/models/model_train_model.py | 7 +++- myapp/tools/start.sh | 7 ---- myapp/utils/core.py | 9 ++--- myapp/utils/py/py_k8s.py | 52 +++++++++++++++++++++++----- myapp/utils/sqllab/base_impl.py | 2 +- myapp/views/baseApi.py | 2 ++ myapp/views/home.py | 2 +- myapp/views/view_inferenceserving.py | 4 +-- myapp/views/view_nni.py | 5 ++- myapp/views/view_notebook.py | 2 +- myapp/views/view_pipeline.py | 25 ++++++++----- myapp/views/view_serving.py | 2 +- myapp/views/view_sqllab.py | 17 +++++---- myapp/views/view_task.py | 6 ++-- myapp/views/view_team.py | 9 +++-- myapp/views/view_train_model.py | 20 ++++++++++- myapp/views/view_workflow.py | 2 +- 20 files changed, 131 insertions(+), 72 deletions(-) delete mode 100644 myapp/tools/start.sh diff --git a/myapp/models/model_dimension.py b/myapp/models/model_dimension.py index 0b2c4f3c..a8cddd9f 100644 --- a/myapp/models/model_dimension.py +++ b/myapp/models/model_dimension.py @@ -36,16 +36,21 @@ def table_html(self): users = [user.strip() for user in users if user.strip()] url_path = conf.get('MODEL_URLS',{}).get("dimension") if g.user.is_admin() or g.user.username in users or '*' in self.owner: - return Markup(f'{self.table_name}') - else: - return self.table_name + if self.sqllchemy_uri: + return Markup(f'{self.table_name}') + + return self.table_name @property def operate_html(self): - url=f''' - {__("更新远程表")} | {__("建外表示例")} - '''%(self.id,self.id) - return Markup(url) + if self.sqllchemy_uri: + url=f''' + {__("更新远程表")} | {__("建外表示例")} + '''%(self.id,self.id) + return Markup(url) + else: + return __("更新远程表")+" | "+__("建外表示例") + diff --git a/myapp/models/model_job.py b/myapp/models/model_job.py index 0b1d1209..69428c60 100644 --- a/myapp/models/model_job.py +++ b/myapp/models/model_job.py @@ -470,7 +470,7 @@ class Task(Model,ImportMixin,AuditMixinNullable,MyappModelBase): job_template = relationship( "Job_Template", foreign_keys=[job_template_id] ) - pipeline_id = Column(Integer, ForeignKey('pipeline.id'),comment='项目组id') + pipeline_id = Column(Integer, ForeignKey('pipeline.id'),comment='任务流id') pipeline = relationship( "Pipeline", foreign_keys=[pipeline_id] ) diff --git a/myapp/models/model_notebook.py b/myapp/models/model_notebook.py index 5de285ca..9e0f7977 100644 --- a/myapp/models/model_notebook.py +++ b/myapp/models/model_notebook.py @@ -59,15 +59,6 @@ def name_url(self): if self.ide_type=='theia': url = "/notebook/"+self.namespace + "/" + self.name+"/" + "#"+self.mount - elif self.ide_type=='matlab': - url = "/notebook/"+self.namespace + "/" + self.name+"/index.html" - elif self.ide_type=='rstudio' and not SERVICE_EXTERNAL_IP: - url1 = host+"/notebook/" + self.namespace + "/" + self.name + "/auth-sign-in?appUri=%2F" - url2 = host+"/notebook/" + self.namespace + "/" + self.name+"/" - a_html='''%s'''%(url1,url2,self.name) - return Markup(a_html) - - # url = "/notebook/" + self.namespace + "/" + self.name+"/" else: if root: url = '/notebook/jupyter/%s/lab/tree/%s' % (self.name,root.lstrip('/')) diff --git a/myapp/models/model_train_model.py b/myapp/models/model_train_model.py index 788adc52..dd9fde79 100644 --- a/myapp/models/model_train_model.py +++ b/myapp/models/model_train_model.py @@ -59,7 +59,12 @@ def project_url(self): @property def deploy(self): - ops=f''' + download_url = '' + if self.path or self.download_url: + download_url = f'{__("下载")} |' + else: + download_url = f'{__("下载")} |' + ops=download_url+f''' {__("发布")} ''' return Markup(ops) diff --git a/myapp/tools/start.sh b/myapp/tools/start.sh deleted file mode 100644 index bdfbe61b..00000000 --- a/myapp/tools/start.sh +++ /dev/null @@ -1,7 +0,0 @@ - - -nohup python myapp/tools/watch_workflow.py > workflow.log 2>&1 & - -nohup python myapp/tools/watch_service.py > service.log 2>&1 & - -tail -f workflow.log service.log diff --git a/myapp/utils/core.py b/myapp/utils/core.py index 1b70acf8..00b80992 100644 --- a/myapp/utils/core.py +++ b/myapp/utils/core.py @@ -1533,21 +1533,22 @@ def check_max_gpu(gpu_num, src_gpu_num=0): return gpu_num gpu_num, gpu_type, resource_name = get_gpu(resource_gpu) - src_gpu_num,_,_ = get_gpu(src_resource_gpu) if src_resource_gpu else 0,0,0 + src_gpu_num = 0 + if src_resource_gpu: + src_gpu_num, _, _ = get_gpu(src_resource_gpu) - if hasattr(g,'user') and not g.user.is_admin(): + if hasattr(g,'user') and not g.user.is_admin(): resource_gpu = check_max_gpu(gpu_num, src_gpu_num) if math.ceil(float(resource_gpu))==resource_gpu: resource_gpu = math.ceil(float(resource_gpu)) if gpu_type: - resource_gpu +=f'({gpu_type})' + resource_gpu = str(resource_gpu)+f'({gpu_type})' else: resource_gpu = resource_gpu return str(resource_gpu) - def check_resource(resource_memory,resource_cpu,resource_gpu,src_resource_memory=None,src_resource_cpu=None,src_resource_gpu=None): new_resource_memory = check_resource_memory(resource_memory,src_resource_memory) new_resource_memory = str(math.ceil(float(str(new_resource_memory).replace('G',''))))+"G" diff --git a/myapp/utils/py/py_k8s.py b/myapp/utils/py/py_k8s.py index c1b5378e..05ad2824 100755 --- a/myapp/utils/py/py_k8s.py +++ b/myapp/utils/py/py_k8s.py @@ -74,14 +74,21 @@ def get_pods(self, namespace=None, service_name=None, pod_name=None, labels={},s pod_name_temp = address.target_ref.name pod = self.v1.read_namespaced_pod(name=pod_name_temp, namespace=namespace) all_pods.append(pod) - elif (namespace and status and status.lower()=='running'): - all_endpoints = self.v1.list_namespaced_endpoints(namespace=namespace) # 先查询入口点, - subsets = all_endpoints.subsets - addresses = subsets[0].addresses # 只取第一个子网 - for address in addresses: - pod_name_temp = address.target_ref.name - pod = self.v1.read_namespaced_pod(name=pod_name_temp, namespace=namespace) - all_pods.append(pod) + elif (namespace and status): + if status.lower()=='running': + all_endpoints = self.v1.list_namespaced_endpoints(namespace=namespace) # 先查询入口点, + subsets = all_endpoints.subsets + addresses = subsets[0].addresses # 只取第一个子网 + for address in addresses: + pod_name_temp = address.target_ref.name + pod = self.v1.read_namespaced_pod(name=pod_name_temp, namespace=namespace) + all_pods.append(pod) + else: + src_pods = self.v1.list_namespaced_pod(namespace).items + for pod in src_pods: + if pod.status and pod.status.phase == status: + all_pods.append(pod) + elif (namespace and labels): src_pods = self.v1.list_namespaced_pod(namespace).items @@ -1068,9 +1075,27 @@ def make_pod(self, namespace, name, labels, command, args, volume_mount, working # 设置卡型 if gpu_type and gpu_type.strip(): nodeSelector['gpu-type'] = gpu_type + # 独占模式,尽量聚集在一个,避免卡零碎 if gpu_num >= 1: nodeSelector['gpu'] = 'true' - + labels['gpu']='true' + # 优先选择gpu卡占用的地方,这样不容易造成卡的零碎化占用 + affinity = client.V1Affinity( + node_affinity=None, + pod_anti_affinity=None, + pod_affinity=client.V1PodAffinity( + preferred_during_scheduling_ignored_during_execution=[ + client.V1WeightedPodAffinityTerm( + pod_affinity_term=client.V1PodAffinityTerm( + topology_key="kubernetes.io/hostname", + label_selector = client.V1LabelSelector( + match_labels={ + "gpu": 'true' + } + ) + ), + weight=10)]) + ) k8s_volumes, k8s_volume_mounts = self.get_volume_mounts(volume_mount, username) containers = [self.make_container(name=name, @@ -1290,6 +1315,15 @@ def delete_deployment(self, namespace, name=None, labels=None): except Exception as e: print(e) + # deploymnet伸缩容 + def scale_deployment(self,namespace, name, replicas): + try: + deployment = self.AppsV1Api.read_namespaced_deployment(name=name, namespace=namespace) + deployment.spec.replicas = int(replicas) + self.AppsV1Api.replace_namespaced_deployment(name=name, namespace=namespace, body=deployment) + except Exception as e: + print(e) + # @pysnooper.snoop() def create_deployment(self, namespace, name, replicas, labels, command, args, volume_mount,working_dir, node_selector, resource_memory, resource_cpu, resource_gpu, image_pull_policy, diff --git a/myapp/utils/sqllab/base_impl.py b/myapp/utils/sqllab/base_impl.py index 95ca0588..c0f6947d 100644 --- a/myapp/utils/sqllab/base_impl.py +++ b/myapp/utils/sqllab/base_impl.py @@ -103,7 +103,7 @@ def submit_task(self, qid, enable_async=True): async_task = handle_task.delay(qid, username=g.user.username) else: stage, status, _err_msg = handle_task(qid, username=g.user.username) - if _err_msg != "": + if _err_msg: raise RuntimeError(_err_msg) res = self.get_result(qid) diff --git a/myapp/views/baseApi.py b/myapp/views/baseApi.py index 7ed14084..4aec9019 100644 --- a/myapp/views/baseApi.py +++ b/myapp/views/baseApi.py @@ -1849,6 +1849,8 @@ def make_ui_info(self, ret_src): local_validators.append(val) ret['validators'] = local_validators + # 去除重复 + ret['validators'] = [json.loads(x) for x in list(set([json.dumps(x) for x in ret['validators']]))] # 统一规范前端type和选择时value # 选择器 diff --git a/myapp/views/home.py b/myapp/views/home.py index 5f7413c6..091f0178 100644 --- a/myapp/views/home.py +++ b/myapp/views/home.py @@ -495,7 +495,7 @@ def menu(self): "children": [ { "name": 'chat', - "title": __('知识库'), + "title": __('智能体'), "icon": '', "menu_type": "api", "url": '/chat_modelview/api/', diff --git a/myapp/views/view_inferenceserving.py b/myapp/views/view_inferenceserving.py index f0eef9f7..e3c0ad9e 100644 --- a/myapp/views/view_inferenceserving.py +++ b/myapp/views/view_inferenceserving.py @@ -95,7 +95,7 @@ class InferenceService_ModelView_base(): "model_status": {"type": "ellip2", "width": 100}, "modified": {"type": "ellip2", "width": 150}, "operate_html": {"type": "ellip2", "width": 350}, - "resource": {"type": "ellip2", "width": 300}, + "resource": {"type": "ellip2", "width": 350}, } search_columns = ['name', 'created_by', 'project', 'service_type', 'label', 'model_name', 'model_version', 'model_path', 'host', 'model_status', 'resource_gpu'] @@ -187,7 +187,7 @@ class InferenceService_ModelView_base(): ), "hpa": StringField( _('弹性伸缩'), - default='cpu:50%,gpu:50%', + default='', description= _('弹性伸缩容的触发条件:可以使用cpu/mem/gpu/qps等信息,可以使用其中一个指标或者多个指标,示例:cpu:50%,mem:50%,gpu:50%'), widget=BS3TextFieldWidget() ), diff --git a/myapp/views/view_nni.py b/myapp/views/view_nni.py index 2820723c..001e4605 100644 --- a/myapp/views/view_nni.py +++ b/myapp/views/view_nni.py @@ -352,8 +352,7 @@ def make_nnijob(self,k8s_client,namespace,nni): "name": nni.name, "namespace": namespace, "labels": { - "type": "nni", - "component": "nni", + "pod-type": "nni", "role": "master", "app": nni.name, "username": nni.created_by.username @@ -725,7 +724,7 @@ class NNI_ModelView_Api(NNI_ModelView_Base, MyappModelRestApi): 'parallel_trial_count', 'max_trial_count', 'objective_type', 'objective_goal', 'objective_metric_name', 'algorithm_name', 'algorithm_setting', 'parameters', 'job_json', 'working_dir', 'node_selector', - 'resource_memory', 'resource_cpu', 'alert_status', 'job_worker_image', 'job_worker_command'] + 'resource_memory', 'resource_cpu','resource_gpu', 'alert_status', 'job_worker_image', 'job_worker_command'] edit_columns = add_columns diff --git a/myapp/views/view_notebook.py b/myapp/views/view_notebook.py index 770e5f08..ecf3726f 100644 --- a/myapp/views/view_notebook.py +++ b/myapp/views/view_notebook.py @@ -73,7 +73,7 @@ class Notebook_ModelView_Base(): "resource": {"type": "ellip2", "width": 300}, "status": {"type": "ellip2", "width": 100}, "renew": {"type": "ellip2", "width": 200}, - "save": {"type": "ellip2", "width": 200} + "save": {"type": "ellip2", "width": 100} } add_form_query_rel_fields = { "project": [["name", Project_Join_Filter, 'org']] diff --git a/myapp/views/view_pipeline.py b/myapp/views/view_pipeline.py index f645201a..c10299a2 100644 --- a/myapp/views/view_pipeline.py +++ b/myapp/views/view_pipeline.py @@ -921,8 +921,12 @@ def pre_delete(self, pipeline): db.session.delete(task) # 删除所有的workflow - db.session.query(Workflow).filter_by(foreign_key=str(pipeline.id)).delete() - db.session.query(RunHistory).filter_by(pipeline_id=pipeline.id).delete() + # 只是删除了数据库记录,但是实例并没有删除,会重新监听更新的。 + db.session.query(Workflow).filter_by(foreign_key=str(pipeline.id)).delete(synchronize_session=False) + db.session.commit() + db.session.query(Workflow).filter(Workflow.labels.contains(f'"pipeline-id": "{str(pipeline.id)}"')).delete(synchronize_session=False) + db.session.commit() + db.session.query(RunHistory).filter_by(pipeline_id=pipeline.id).delete(synchronize_session=False) db.session.commit() @@ -1067,6 +1071,10 @@ def run_pipeline(self, pipeline_id): # return print('begin upload and run pipeline %s' % pipeline.name) pipeline.version_id = '' + if not pipeline.pipeline_file: + flash("请先编排任务,并进行保存后再运行整个任务流",'warning') + return redirect('/pipeline_modelview/api/web/%s' % pipeline.id) + crd_name = run_pipeline(pipeline, json.loads(pipeline.pipeline_file)) # 会根据版本号是否为空决定是否上传 pipeline.pipeline_argo_id = crd_name db.session.commit() # 更新 @@ -1124,12 +1132,12 @@ def web_log(self, pipeline_id): @expose("/web/monitoring/", methods=["GET"]) def web_monitoring(self, pipeline_id): pipeline = db.session.query(Pipeline).filter_by(id=int(pipeline_id)).first() - if pipeline.run_id: - url = "http://"+pipeline.project.cluster.get('HOST', request.host)+conf.get('GRAFANA_TASK_PATH')+ pipeline.name - return redirect(url) - else: - flash('no running instance', 'warning') - return redirect('/pipeline_modelview/api/web/%s' % pipeline.id) + + url = "http://"+pipeline.project.cluster.get('HOST', request.host)+conf.get('GRAFANA_TASK_PATH')+ pipeline.name + return redirect(url) + # else: + # flash('no running instance', 'warning') + # return redirect('/pipeline_modelview/api/web/%s' % pipeline.id) # # @event_logger.log_this @expose("/web/pod/", methods=["GET"]) @@ -1197,6 +1205,7 @@ def change_node(src_task_id, des_task_id): change_node(task.id, new_task.id) new_pipeline.expand = json.dumps(expand) + new_pipeline.parameter="{}" # 扩展参数不进行复制,这样demo的pipeline不会被复制一遍 db.session.commit() return new_pipeline diff --git a/myapp/views/view_serving.py b/myapp/views/view_serving.py index ce21a94f..8f0ee0fc 100644 --- a/myapp/views/view_serving.py +++ b/myapp/views/view_serving.py @@ -174,7 +174,7 @@ def deploy(self, service_id): image=service.images, hostAliases=conf.get('HOSTALIASES', ''), env=env, - privileged=False, + privileged=True, accounts=None, username=service.created_by.username, ports=[int(port) for port in service.ports.split(',')] diff --git a/myapp/views/view_sqllab.py b/myapp/views/view_sqllab.py index 4f83948b..b816b1dd 100644 --- a/myapp/views/view_sqllab.py +++ b/myapp/views/view_sqllab.py @@ -29,20 +29,19 @@ def db_commit_helper(dbsession): engine_impls = { 'mysql': Base_Impl(), 'presto': Base_Impl(), - 'clikchouse': Base_Impl(), + 'clickhouse': Base_Impl(), 'postgres': Base_Impl(), "impala": Base_Impl(), - "oracle": Base_Impl(), - "mssql": Base_Impl() + "hive": Base_Impl() } db_uri_demo = { - 'mysql': ['mysql+pymysql://username:password@host:port/database'], - 'postgres': ['postgresql+psycopg2://username:password@host:port/database'], - 'presto': ['presto://username:password@host:port/database'], - 'clikchouse': ['clickhouse+native://username:password@host:port/database'], + 'mysql': ['mysql+pymysql://$username:$password@$host:3306/$database'], + 'postgres': ['postgresql+psycopg2://$username:$password@$host:5432/$database'], + 'presto': ['presto://$username:$password@$host:8080/$catalog/$schema'], # presto://presto-coordinator-service.default:8080/hive/default + 'clickhouse': ['clickhouse+native://$username:$password@$host:9000/$database'], + # "hive":['hive://username:passwd@host:10000/default?auth=LDAP'], + "hive":['hive://$host:10000/default?auth=NOSASL'], "impala": ['impala://host:port/database'], - "oracle": ['oracle://username:password@host:port/database'], - "mssql": ['mssql+pymssql://username:password@host:port/database'] } # @pysnooper.snoop() diff --git a/myapp/views/view_task.py b/myapp/views/view_task.py index c94c0cf6..741fd784 100644 --- a/myapp/views/view_task.py +++ b/myapp/views/view_task.py @@ -169,7 +169,7 @@ class Task_ModelView_Base(): ), } - add_form_extra_fields['resource_gpu'] = StringField('gpu', default='0', description= _('gpu的资源使用限制(单位卡),示例:1,2,训练任务每个容器独占整卡。申请具体的卡型号,可以类似 1(V100),目前支持T4/V100/A100/VGPU'),widget=BS3TextFieldWidget()) + add_form_extra_fields['resource_gpu'] = StringField('gpu', default='0', description= _('gpu的资源使用限制(单位卡),示例:1,2,训练任务每个容器独占整卡。申请具体的卡型号,可以类似 1(V100)'),widget=BS3TextFieldWidget()) add_form_extra_fields['resource_rdma'] = StringField('rdma', default='0', description= _('RDMA的资源使用限制,示例 0,1,10,填写方式咨询管理员'), widget=BS3TextFieldWidget()) edit_form_extra_fields = add_form_extra_fields @@ -451,9 +451,7 @@ def run_pod(self, task, k8s_client, run_id, namespace, pod_name, image, working_ # 将哈希值映射到指定范围 hostPort = 40000 + 10*(hash_value % 1000) - if HostNetwork: - task_env += 'PORT1=' + str(hostPort + 1)+ "\n" - task_env += 'PORT2=' + str(hostPort + 2)+ "\n" + _, _, resource_name = core.get_gpu(task.resource_gpu) diff --git a/myapp/views/view_team.py b/myapp/views/view_team.py index a87da9a8..ab091733 100644 --- a/myapp/views/view_team.py +++ b/myapp/views/view_team.py @@ -89,12 +89,13 @@ class Project_User_ModelView_Base(): } edit_form_extra_fields = add_form_extra_fields - # @pysnooper.snoop() def pre_add_req(self,req_json): user_roles = [role.name.lower() for role in list(get_user_roles())] if "admin" in user_roles: return req_json creators = db.session().query(Project_User).filter_by(project_id=req_json.get('project')).all() + creators = [creator.user.username for creator in creators] + if g.user.username not in creators: raise MyappException('just creator can add/edit user') @@ -184,7 +185,7 @@ class Project_ModelView_Base(): } edit_form_extra_fields = add_form_extra_fields - + pre_update_more=None # @pysnooper.snoop() def pre_add_web(self): @@ -196,7 +197,11 @@ def pre_add_web(self): ) self.add_form_extra_fields = self.edit_form_extra_fields + # @pysnooper.snoop() def pre_update(self, item): + if self.pre_add: + self.pre_add(item) + if not item.type: item.type = self.project_type if item.expand: diff --git a/myapp/views/view_train_model.py b/myapp/views/view_train_model.py index 123eb5da..93184d5d 100644 --- a/myapp/views/view_train_model.py +++ b/myapp/views/view_train_model.py @@ -14,7 +14,7 @@ from flask import ( flash, g, - redirect + redirect, request ) from .base import ( DeleteMixin, @@ -164,6 +164,24 @@ def pre_update(self, item): item.path = self.src_item_json['path'] self.pre_add(item) + import pysnooper + @expose("/download/", methods=["GET", 'POST']) + # @pysnooper.snoop() + def download_model(self, model_id): + train_model = db.session.query(Training_Model).filter_by(id=model_id).first() + if train_model.download_url: + return redirect(train_model.download_url) + if train_model.path: + if 'http://' in train_model.path or 'https://' in train_model.path: + return redirect(train_model.path) + if '/mnt' in train_model.path: + download_url = request.host_url + 'static/' + train_model.path.strip('/') + return redirect(download_url) + flash(__('未发现模型存储地址'),'warning') + + return redirect(conf.get('train_model')) + + @expose("/deploy/", methods=["GET", 'POST']) def deploy(self, model_id): train_model = db.session.query(Training_Model).filter_by(id=model_id).first() diff --git a/myapp/views/view_workflow.py b/myapp/views/view_workflow.py index b89b4731..a47d860b 100644 --- a/myapp/views/view_workflow.py +++ b/myapp/views/view_workflow.py @@ -786,7 +786,7 @@ def web_node_detail(self,cluster_name,namespace,workflow_name,node_name): "tabName": __("pod信息"), "content": [ { - "groupName": __("yaml信息"), + "groupName": __("pod信息"), "groupContent": { "value": Markup(pod_yaml), "type": 'text'