diff --git a/components/kubeflow/kfserving/Dockerfile b/components/kubeflow/kfserving/Dockerfile index 82f655f0a84..8021a63e12b 100644 --- a/components/kubeflow/kfserving/Dockerfile +++ b/components/kubeflow/kfserving/Dockerfile @@ -1,6 +1,6 @@ FROM python:3.6-slim -RUN pip3 install kubernetes==10.0.1 kfserving==0.2.1 requests==2.22.0 Flask==1.1.1 flask-cors==3.0.8 +RUN pip3 install kubernetes==10.0.1 kfserving==0.3.0 requests==2.22.0 Flask==1.1.1 flask-cors==3.0.8 ENV APP_HOME /app COPY src $APP_HOME diff --git a/components/kubeflow/kfserving/component.yaml b/components/kubeflow/kfserving/component.yaml index ea59664ae87..e6fa3cd2c36 100644 --- a/components/kubeflow/kfserving/component.yaml +++ b/components/kubeflow/kfserving/component.yaml @@ -12,6 +12,7 @@ inputs: - {name: Canary Custom Model Spec, type: String, default: '{}', description: 'Custom runtime canary custom model container spec.'} - {name: Autoscaling Target, type: String, default: '0', description: 'Autoscaling Target Number'} - {name: KFServing Endpoint, type: String, default: '', description: 'KFServing remote deployer API endpoint'} + - {name: Service Account, type: String, default: '', description: 'Model Service Account'} outputs: - {name: Service Endpoint URI, type: String, description: 'URI of the deployed prediction service..'} implementation: @@ -31,5 +32,6 @@ implementation: --canary-custom-model-spec, {inputValue: Canary Custom Model Spec}, --kfserving-endpoint, {inputValue: KFServing Endpoint}, --autoscaling-target, {inputValue: Autoscaling Target}, - --output_path, {outputPath: Service Endpoint URI} + --service-account, {inputValue: Service Account}, + --output-path, {outputPath: Service Endpoint URI} ] diff --git a/components/kubeflow/kfserving/src/app.py b/components/kubeflow/kfserving/src/app.py index 21aa7f0d687..f0eaa71eb50 100644 --- a/components/kubeflow/kfserving/src/app.py +++ b/components/kubeflow/kfserving/src/app.py @@ -23,33 +23,36 @@ CORS(app) -@app.route('/deploy-model', methods=['POST']) +@app.route("/deploy-model", methods=["POST"]) def deploy_model_post(): if not request.json: abort(400) - return json.dumps(deploy_model( - action=request.json['action'], - model_name=request.json['model_name'], - default_model_uri=request.json['default_model_uri'], - canary_model_uri=request.json['canary_model_uri'], - canary_model_traffic=request.json['canary_model_traffic'], - namespace=request.json['namespace'], - framework=request.json['framework'], - default_custom_model_spec=request.json['default_custom_model_spec'], - canary_custom_model_spec=request.json['canary_custom_model_spec'], - autoscaling_target=request.json['autoscaling_target'] - )) - - -@app.route('/', methods=['GET']) + return json.dumps( + deploy_model( + action=request.json["action"], + model_name=request.json["model_name"], + default_model_uri=request.json["default_model_uri"], + canary_model_uri=request.json["canary_model_uri"], + canary_model_traffic=request.json["canary_model_traffic"], + namespace=request.json["namespace"], + framework=request.json["framework"], + default_custom_model_spec=request.json["default_custom_model_spec"], + canary_custom_model_spec=request.json["canary_custom_model_spec"], + autoscaling_target=request.json["autoscaling_target"], + service_account=request.json["service_account"], + ) + ) + + +@app.route("/", methods=["GET"]) def root_get(): return 200 -@app.route('/', methods=['OPTIONS']) +@app.route("/", methods=["OPTIONS"]) def root_options(): return "200" if __name__ == "__main__": - app.run(debug=True, host='0.0.0.0', port=int(os.environ.get('PORT', 8080))) + app.run(debug=True, host="0.0.0.0", port=int(os.environ.get("PORT", 8080))) diff --git a/components/kubeflow/kfserving/src/kfservingdeployer.py b/components/kubeflow/kfserving/src/kfservingdeployer.py index 4803c85b896..08f8d9cf012 100644 --- a/components/kubeflow/kfserving/src/kfservingdeployer.py +++ b/components/kubeflow/kfserving/src/kfservingdeployer.py @@ -34,103 +34,236 @@ from kfserving import V1alpha2InferenceServiceSpec from kfserving import V1alpha2InferenceService -def EndpointSpec(framework, storage_uri): - if framework == 'tensorflow': - return V1alpha2EndpointSpec(predictor=V1alpha2PredictorSpec(tensorflow=V1alpha2TensorflowSpec(storage_uri=storage_uri))) - elif framework == 'pytorch': - return V1alpha2EndpointSpec(predictor=V1alpha2PredictorSpec(pytorch=V1alpha2PyTorchSpec(storage_uri=storage_uri))) - elif framework == 'sklearn': - return V1alpha2EndpointSpec(predictor=V1alpha2PredictorSpec(sklearn=V1alpha2SKLearnSpec(storage_uri=storage_uri))) - elif framework == 'xgboost': - return V1alpha2EndpointSpec(predictor=V1alpha2PredictorSpec(xgboost=V1alpha2XGBoostSpec(storage_uri=storage_uri))) - elif framework == 'onnx': - return V1alpha2EndpointSpec(predictor=V1alpha2PredictorSpec(onnx=V1alpha2ONNXSpec(storage_uri=storage_uri))) - elif framework == 'tensorrt': - return V1alpha2EndpointSpec(predictor=V1alpha2PredictorSpec(tensorrt=V1alpha2TensorRTSpec(storage_uri=storage_uri))) + +def EndpointSpec(framework, storage_uri, service_account): + if framework == "tensorflow": + return V1alpha2EndpointSpec( + predictor=V1alpha2PredictorSpec( + tensorflow=V1alpha2TensorflowSpec(storage_uri=storage_uri), + service_account_name=service_account, + ) + ) + elif framework == "pytorch": + return V1alpha2EndpointSpec( + predictor=V1alpha2PredictorSpec( + pytorch=V1alpha2PyTorchSpec(storage_uri=storage_uri), + service_account_name=service_account, + ) + ) + elif framework == "sklearn": + return V1alpha2EndpointSpec( + predictor=V1alpha2PredictorSpec( + sklearn=V1alpha2SKLearnSpec(storage_uri=storage_uri), + service_account_name=service_account, + ) + ) + elif framework == "xgboost": + return V1alpha2EndpointSpec( + predictor=V1alpha2PredictorSpec( + xgboost=V1alpha2XGBoostSpec(storage_uri=storage_uri), + service_account_name=service_account, + ) + ) + elif framework == "onnx": + return V1alpha2EndpointSpec( + predictor=V1alpha2PredictorSpec( + onnx=V1alpha2ONNXSpec(storage_uri=storage_uri), + service_account_name=service_account, + ) + ) + elif framework == "tensorrt": + return V1alpha2EndpointSpec( + predictor=V1alpha2PredictorSpec( + tensorrt=V1alpha2TensorRTSpec(storage_uri=storage_uri), + service_account_name=service_account, + ) + ) else: - raise("Error: No matching framework: " + framework) + raise ("Error: No matching framework: " + framework) -def customEndpointSpec(custom_model_spec): - env = [client.V1EnvVar(name=i['name'], value=i['value']) for i in custom_model_spec['env']] if custom_model_spec.get('env', '') else None - ports = [client.V1ContainerPort(container_port=int(custom_model_spec.get('port', '')))] if custom_model_spec.get('port', '') else None +def customEndpointSpec(custom_model_spec, service_account): + env = ( + [ + client.V1EnvVar(name=i["name"], value=i["value"]) + for i in custom_model_spec["env"] + ] + if custom_model_spec.get("env", "") + else None + ) + ports = ( + [client.V1ContainerPort(container_port=int(custom_model_spec.get("port", "")))] + if custom_model_spec.get("port", "") + else None + ) containerSpec = client.V1Container( - name=custom_model_spec.get('name', 'custom-container'), - image=custom_model_spec['image'], + name=custom_model_spec.get("name", "custom-container"), + image=custom_model_spec["image"], env=env, ports=ports, - command=custom_model_spec.get('command', None), - args=custom_model_spec.get('args', None), - image_pull_policy=custom_model_spec.get('image_pull_policy', None), - working_dir=custom_model_spec.get('working_dir', None) + command=custom_model_spec.get("command", None), + args=custom_model_spec.get("args", None), + image_pull_policy=custom_model_spec.get("image_pull_policy", None), + working_dir=custom_model_spec.get("working_dir", None), + ) + return V1alpha2EndpointSpec( + predictor=V1alpha2PredictorSpec( + custom=V1alpha2CustomSpec(container=containerSpec), + service_account_name=service_account, + ) ) - return V1alpha2EndpointSpec(predictor=V1alpha2PredictorSpec(custom=V1alpha2CustomSpec(container=containerSpec))) -def InferenceService(metadata, default_model_spec, canary_model_spec=None, canary_model_traffic=None): - return V1alpha2InferenceService(api_version=constants.KFSERVING_GROUP + '/' + constants.KFSERVING_VERSION, - kind=constants.KFSERVING_KIND, - metadata=metadata, - spec=V1alpha2InferenceServiceSpec(default=default_model_spec, - canary=canary_model_spec, - canary_traffic_percent=canary_model_traffic)) +def InferenceService( + metadata, default_model_spec, canary_model_spec=None, canary_model_traffic=None +): + return V1alpha2InferenceService( + api_version=constants.KFSERVING_GROUP + "/" + constants.KFSERVING_VERSION, + kind=constants.KFSERVING_KIND, + metadata=metadata, + spec=V1alpha2InferenceServiceSpec( + default=default_model_spec, + canary=canary_model_spec, + canary_traffic_percent=canary_model_traffic, + ), + ) -def deploy_model(action, model_name, default_model_uri, canary_model_uri, canary_model_traffic, namespace, framework, default_custom_model_spec, canary_custom_model_spec, autoscaling_target=0): +def deploy_model( + action, + model_name, + default_model_uri, + canary_model_uri, + canary_model_traffic, + namespace, + framework, + default_custom_model_spec, + canary_custom_model_spec, + service_account, + autoscaling_target=0, +): if int(autoscaling_target) != 0: annotations = {"autoscaling.knative.dev/target": str(autoscaling_target)} else: annotations = None - metadata = client.V1ObjectMeta(name=model_name, namespace=namespace, annotations=annotations) - + metadata = client.V1ObjectMeta( + name=model_name, namespace=namespace, annotations=annotations + ) + # Create Default deployment if default model uri is provided. - if framework != 'custom' and default_model_uri: - default_model_spec = EndpointSpec(framework, default_model_uri) - elif framework == 'custom' and default_custom_model_spec: - default_model_spec = customEndpointSpec(default_custom_model_spec) - + if framework != "custom" and default_model_uri: + default_model_spec = EndpointSpec(framework, default_model_uri, service_account) + elif framework == "custom" and default_custom_model_spec: + default_model_spec = customEndpointSpec( + default_custom_model_spec, service_account + ) + # Create Canary deployment if canary model uri is provided. - if framework != 'custom' and canary_model_uri: - canary_model_spec = EndpointSpec(framework, canary_model_uri) - kfsvc = InferenceService(metadata, default_model_spec, canary_model_spec, canary_model_traffic) - elif framework == 'custom' and canary_custom_model_spec: - canary_model_spec = customEndpointSpec(canary_custom_model_spec) - kfsvc = InferenceService(metadata, default_model_spec, canary_model_spec, canary_model_traffic) + if framework != "custom" and canary_model_uri: + canary_model_spec = EndpointSpec(framework, canary_model_uri, service_account) + kfsvc = InferenceService( + metadata, default_model_spec, canary_model_spec, canary_model_traffic + ) + elif framework == "custom" and canary_custom_model_spec: + canary_model_spec = customEndpointSpec( + canary_custom_model_spec, service_account + ) + kfsvc = InferenceService( + metadata, default_model_spec, canary_model_spec, canary_model_traffic + ) else: kfsvc = InferenceService(metadata, default_model_spec) KFServing = KFServingClient() - if action == 'create': + if action == "create": KFServing.create(kfsvc, watch=True, timeout_seconds=120) - elif action == 'update': + elif action == "update": KFServing.patch(model_name, kfsvc) - elif action == 'rollout': - KFServing.rollout_canary(model_name, canary=canary_model_spec, percent=canary_model_traffic, namespace=namespace, watch=True, timeout_seconds=120) - elif action == 'promote': - KFServing.promote(model_name, namespace=namespace, watch=True, timeout_seconds=120) - elif action == 'delete': + elif action == "rollout": + KFServing.rollout_canary( + model_name, + canary=canary_model_spec, + percent=canary_model_traffic, + namespace=namespace, + watch=True, + timeout_seconds=120, + ) + elif action == "promote": + KFServing.promote( + model_name, namespace=namespace, watch=True, timeout_seconds=120 + ) + elif action == "delete": KFServing.delete(model_name, namespace=namespace) else: - raise("Error: No matching action: " + action) + raise ("Error: No matching action: " + action) model_status = KFServing.get(model_name, namespace=namespace) return model_status + if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument('--action', type=str, help='Action to execute on KFServing', default='create') - parser.add_argument('--model-name', type=str, help='Name to give to the deployed model', default="") - parser.add_argument('--default-model-uri', type=str, help='Path of the S3, GCS or PVC directory containing default model.') - parser.add_argument('--canary-model-uri', type=str, help='Optional path of the S3, GCS or PVC directory containing canary model.', default="") - parser.add_argument('--canary-model-traffic', type=str, help='Optional Traffic to be sent to the default model', default='0') - parser.add_argument('--namespace', type=str, help='Kubernetes namespace where the KFServing service is deployed.', default='kubeflow') - parser.add_argument('--framework', type=str, help='Model Serving Framework', default='tensorflow') - parser.add_argument('--default-custom-model-spec', type=json.loads, help='Custom runtime default custom model container spec', default={}) - parser.add_argument('--canary-custom-model-spec', type=json.loads, help='Custom runtime canary custom model container spec', default={}) - parser.add_argument('--kfserving-endpoint', type=str, help='kfserving remote deployer api endpoint', default='') - parser.add_argument('--autoscaling-target', type=str, help='Autoscaling target number', default='0') - parser.add_argument('--output_path', type=str, help='Path to store URI output') + parser.add_argument( + "--action", type=str, help="Action to execute on KFServing", default="create" + ) + parser.add_argument( + "--model-name", type=str, help="Name to give to the deployed model", default="" + ) + parser.add_argument( + "--default-model-uri", + type=str, + help="Path of the S3, GCS or PVC directory containing default model.", + ) + parser.add_argument( + "--canary-model-uri", + type=str, + help="Optional path of the S3, GCS or PVC directory containing canary model.", + default="", + ) + parser.add_argument( + "--canary-model-traffic", + type=str, + help="Optional Traffic to be sent to the default model", + default="0", + ) + parser.add_argument( + "--namespace", + type=str, + help="Kubernetes namespace where the KFServing service is deployed.", + default="kubeflow", + ) + parser.add_argument( + "--framework", type=str, help="Model Serving Framework", default="tensorflow" + ) + parser.add_argument( + "--default-custom-model-spec", + type=json.loads, + help="Custom runtime default custom model container spec", + default={}, + ) + parser.add_argument( + "--canary-custom-model-spec", + type=json.loads, + help="Custom runtime canary custom model container spec", + default={}, + ) + parser.add_argument( + "--kfserving-endpoint", + type=str, + help="kfserving remote deployer api endpoint", + default="", + ) + parser.add_argument( + "--autoscaling-target", type=str, help="Autoscaling target number", default="0" + ) + parser.add_argument( + "--service-account", + type=str, + help="Service account containing s3 credentials", + default="", + ) + parser.add_argument("--output-path", type=str, help="Path to store URI output") args = parser.parse_args() url = re.compile(r"https?://") @@ -145,8 +278,9 @@ def deploy_model(action, model_name, default_model_uri, canary_model_uri, canary output_path = args.output_path default_custom_model_spec = args.default_custom_model_spec canary_custom_model_spec = args.canary_custom_model_spec - kfserving_endpoint = url.sub('', args.kfserving_endpoint) + kfserving_endpoint = url.sub("", args.kfserving_endpoint) autoscaling_target = int(args.autoscaling_target) + service_account = args.service_account if kfserving_endpoint: formData = { @@ -159,9 +293,12 @@ def deploy_model(action, model_name, default_model_uri, canary_model_uri, canary "framework": framework, "default_custom_model_spec": default_custom_model_spec, "canary_custom_model_spec": canary_custom_model_spec, - "autoscaling_target": autoscaling_target - } - response = requests.post("http://" + kfserving_endpoint + "/deploy-model", json=formData) + "autoscaling_target": autoscaling_target, + "service_account": service_account, + } + response = requests.post( + "http://" + kfserving_endpoint + "/deploy-model", json=formData + ) model_status = response.json() else: model_status = deploy_model( @@ -174,19 +311,29 @@ def deploy_model(action, model_name, default_model_uri, canary_model_uri, canary framework=framework, default_custom_model_spec=default_custom_model_spec, canary_custom_model_spec=canary_custom_model_spec, - autoscaling_target=autoscaling_target + autoscaling_target=autoscaling_target, + service_account=service_account, ) print(model_status) try: - print(model_status['status']['url'] + ' is the knative domain header. $ISTIO_INGRESS_ENDPOINT are defined in the below commands') - print('Sample test commands: ') - print('# Note: If Istio Ingress gateway is not served with LoadBalancer, use $CLUSTER_NODE_IP:31380 as the ISTIO_INGRESS_ENDPOINT') - print('ISTIO_INGRESS_ENDPOINT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath=\'{.status.loadBalancer.ingress[0].ip}\')') + print( + model_status["status"]["url"] + + " is the knative domain header. $ISTIO_INGRESS_ENDPOINT are defined in the below commands" + ) + print("Sample test commands: ") + print( + "# Note: If Istio Ingress gateway is not served with LoadBalancer, use $CLUSTER_NODE_IP:31380 as the ISTIO_INGRESS_ENDPOINT" + ) + print( + "ISTIO_INGRESS_ENDPOINT=$(kubectl -n istio-system get service istio-ingressgateway -o jsonpath='{.status.loadBalancer.ingress[0].ip}')" + ) # model_status['status']['url'] is like http://flowers-sample.kubeflow.example.com/v1/models/flowers-sample - host, path = url.sub('', model_status['status']['url']).split("/", 1) - print('curl -X GET -H "Host: ' + host + '" http://$ISTIO_INGRESS_ENDPOINT/' + path) + host, path = url.sub("", model_status["status"]["url"]).split("/", 1) + print( + 'curl -X GET -H "Host: ' + host + '" http://$ISTIO_INGRESS_ENDPOINT/' + path + ) except: - print('Model is not ready, check the logs for the Knative URL status.') + print("Model is not ready, check the logs for the Knative URL status.") if not os.path.exists(os.path.dirname(output_path)): os.makedirs(os.path.dirname(output_path)) with open(output_path, "w") as report: