diff --git a/docs/samples/uri/README.md b/docs/samples/uri/README.md new file mode 100644 index 00000000000..cabbf7e6044 --- /dev/null +++ b/docs/samples/uri/README.md @@ -0,0 +1,231 @@ +# Predict on a `InferenceService` with a saved model from a URI +This allows you to specify a model object via the URI (Uniform Resource Identifier) of the model object exposed via an `http` or `https` endpoint. + +This `storageUri` option supports single file models, like `sklearn` which is specified by a [joblib](https://joblib.readthedocs.io/en/latest/) file, or artifacts (e.g. `tar` or `zip`) which contain all the necessary dependencies for other model types (e.g. `tensorflow` or `pytorch`). Here, we'll show examples from both of the above. + +## Setup +1. Your ~/.kube/config should point to a cluster with [KFServing installed](https://github.com/kubeflow/kfserving/#install-kfserving). +2. Your cluster's Istio Ingress gateway must be network accessible. +3. Your cluster's Istio Egress gateway must [allow http / https traffic](https://knative.dev/docs/serving/outbound-network-access/) + +## Sklearn +### Train and freeze the model +Here, we'll train a simple iris model. Please note that `kfserving` requires `sklearn==0.20.3`. + +```python +from sklearn import svm +from sklearn import datasets +import joblib + +def train(X, y): + clf = svm.SVC(gamma='auto') + clf.fit(X, y) + return clf + +def freeze(clf, path='../frozen'): + joblib.dump(clf, f'{path}/model.joblib') + return True + +if __name__ == '__main__': + iris = datasets.load_iris() + X, y = iris.data, iris.target + clf = train(X, y) + freeze(clf) +``` +Now, you'll need to take that frozen model object and put it somewhere on the web to expose it. For instance, pushing the `model.joblib` file to some repo on GitHub. + +### Specify and create the `InferenceService` +```yaml +apiVersion: serving.kubeflow.org/v1alpha2 +kind: InferenceService +metadata: + name: sklearn-from-uri +spec: + default: + predictor: + sklearn: + storageUri: https://github.com/tduffy000/kfserving-uri-examples/blob/master/sklearn/frozen/model.joblib?raw=true + +``` + +Apply the CRD, +```bash +kubectl apply -f sklearn_uri.yaml +``` +Expected Output +``` +$ inferenceservice.serving.kubeflow.org/sklearn-from-uri created +``` +### Run a prediction +The first is to [determine the ingress IP and ports](https://github.com/kubeflow/kfserving/blob/master/README.md#determine-the-ingress-ip-and-ports) and set `INGRESS_HOST` and `INGRESS_PORT`. + +Now, if everything went according to plan you should be able to hit the endpoint exposing the model we just uploaded. + +```bash +MODEL_NAME=sklearn-from-uri +INPUT_PATH=@./input.json +curl -v -H "Host: ${SERVICE_HOSTNAME}" http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/$MODEL_NAME:predict -d $INPUT_PATH +``` +Expected Output +``` +$ * Trying 10.0.1.16... +* TCP_NODELAY set + % Total % Received % Xferd Average Speed Time Time Time Current + Dload Upload Total Spent Left Speed + 0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0* Connected to 10.0.1.16 (10.0.1.16) port 30749 (#0) +> POST /v1/models/sklearn-from-uri:predict HTTP/1.1 +> Host: sklearn-from-uri.kfserving-uri-storage.example.com +> User-Agent: curl/7.58.0 +> Accept: */* +> Content-Length: 86 +> Content-Type: application/x-www-form-urlencoded +> +} [86 bytes data] +* upload completely sent off: 86 out of 86 bytes +< HTTP/1.1 200 OK +< content-length: 23 +< content-type: application/json; charset=UTF-8 +< date: Thu, 06 Aug 2020 23:13:42 GMT +< server: istio-envoy +< x-envoy-upstream-service-time: 7 +< +{ [23 bytes data] +100 109 100 23 100 86 605 2263 --:--:-- --:--:-- --:--:-- 2868 +* Connection #0 to host 10.0.1.16 left intact +{ + "predictions": [ + 1, + 1 + ] +} +``` + +## Tensorflow +This will serve as an example of the ability to also pull in a tarball containing all of the +required model dependencies, for instance `tensorflow` requires multiple files in a strict directory structure in order to be servable. +### Train and freeze the model + +```python +from sklearn import datasets +import numpy as np +import tensorflow as tf + +def _ohe(targets): + y = np.zeros((150, 3)) + for i, label in enumerate(targets): + y[i, label] = 1.0 + return y + +def train(X, y, epochs, batch_size=16): + model = tf.keras.Sequential([ + tf.keras.layers.InputLayer(input_shape=(4,)), + tf.keras.layers.Dense(16, activation=tf.nn.relu), + tf.keras.layers.Dense(16, activation=tf.nn.relu), + tf.keras.layers.Dense(3, activation='softmax') + ]) + model.compile(tf.keras.optimizers.RMSprop(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy']) + model.fit(X, y, epochs=epochs) + return model + +def freeze(model, path='../frozen'): + model.save(f'{path}/0001') + return True + +if __name__ == '__main__': + iris = datasets.load_iris() + X, targets = iris.data, iris.target + y = _ohe(targets) + model = train(X, y, epochs=50) + freeze(model) +``` +The post-training procedure here is a bit different. Instead of directly pushing the frozen output to some URI, we'll need to package them into a tarball. To do so, +```bash +cd ../frozen +tar -cvf artifacts.tar 0001/ +gzip < artifacts.tar > artifacts.tgz +``` +Where we assume the `0001/` directory has the structure: +``` +|-- 0001/ +|-- saved_model.pb +|-- variables/ +|--- variables.data-00000-of-00001 +|--- variables.index +``` +Note that building the tarball from the directory specifying a version number is required for `tensorflow`. + +Now, you can either push the `.tar` or `.tgz` file to some remote uri. +### Specify and create the `InferenceService` +And again, if everything went to plan we should be able to pull down the tarball and expose the endpoint. + +```yaml +apiVersion: serving.kubeflow.org/v1alpha2 +kind: InferenceService +metadata: + name: tensorflow-from-uri-gzip +spec: + default: + predictor: + tensorflow: + storageUri: https://raw.githubusercontent.com/tduffy000/kfserving-uri-examples/master/tensorflow/frozen/model_artifacts.tar.gz +``` +Apply the CRD, +```bash +kubectl apply -f tensorflow_uri.yaml +``` +Expected Output +``` +$ inferenceservice.serving.kubeflow.org/tensorflow-from-uri created +``` + +## Run a prediction +Again, make sure to first [determine the ingress IP and ports](https://github.com/kubeflow/kfserving/blob/master/README.md#determine-the-ingress-ip-and-ports) and set `INGRESS_HOST` and `INGRESS_PORT`. + +Now that our endpoint is up and running, we can get some predictions. + +```bash +MODEL_NAME=tensorflow-from-uri +INPUT_PATH=@./input.json +curl -v -H "Host: ${SERVICE_HOSTNAME}" http://${INGRESS_HOST}:${INGRESS_PORT}/v1/models/$MODEL_NAME:predict -d $INPUT_PATH +``` +Expected Output +``` +$ * Trying 10.0.1.16... +* TCP_NODELAY set + % Total % Received % Xferd Average Speed Time Time Time Current + Dload Upload Total Spent Left Speed + 0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0* Connected to 10.0.1.16 (10.0.1.16) port 30749 (#0) +> POST /v1/models/tensorflow-from-uri:predict HTTP/1.1 +> Host: tensorflow-from-uri.default.example.com +> User-Agent: curl/7.58.0 +> Accept: */* +> Content-Length: 86 +> Content-Type: application/x-www-form-urlencoded +> +} [86 bytes data] +* upload completely sent off: 86 out of 86 bytes +< HTTP/1.1 200 OK +< content-length: 112 +< content-type: application/json +< date: Thu, 06 Aug 2020 23:21:19 GMT +< x-envoy-upstream-service-time: 151 +< server: istio-envoy +< +{ [112 bytes data] +100 198 100 112 100 86 722 554 --:--:-- --:--:-- --:--:-- 1285 +* Connection #0 to host 10.0.1.16 left intact +{ + "predictions": [ + [ + 0.0204100646, + 0.680984616, + 0.298605353 + ], + [ + 0.0296604875, + 0.658412039, + 0.311927497 + ] + ] +} +``` \ No newline at end of file diff --git a/docs/samples/uri/input.json b/docs/samples/uri/input.json new file mode 100644 index 00000000000..77839728a0b --- /dev/null +++ b/docs/samples/uri/input.json @@ -0,0 +1,6 @@ +{ + "instances": [ + [6.8, 2.8, 4.8, 1.4], + [6.0, 3.4, 4.5, 1.6] + ] +} diff --git a/docs/samples/uri/sklearn.yaml b/docs/samples/uri/sklearn.yaml new file mode 100644 index 00000000000..0e78c5d26e5 --- /dev/null +++ b/docs/samples/uri/sklearn.yaml @@ -0,0 +1,9 @@ +apiVersion: serving.kubeflow.org/v1alpha2 +kind: InferenceService +metadata: + name: sklearn-from-uri +spec: + default: + predictor: + sklearn: + storageUri: https://github.com/tduffy000/kfserving-uri-examples/blob/master/sklearn/frozen/model.joblib?raw=true diff --git a/docs/samples/uri/tensorflow.yaml b/docs/samples/uri/tensorflow.yaml new file mode 100644 index 00000000000..29c56da7e61 --- /dev/null +++ b/docs/samples/uri/tensorflow.yaml @@ -0,0 +1,9 @@ +apiVersion: serving.kubeflow.org/v1alpha2 +kind: InferenceService +metadata: + name: tensorflow-from-uri +spec: + default: + predictor: + tensorflow: + storageUri: https://raw.githubusercontent.com/tduffy000/kfserving-uri-examples/master/tensorflow/frozen/model_artifacts.tar.gz diff --git a/pkg/apis/serving/v1alpha2/inferenceservice_validation.go b/pkg/apis/serving/v1alpha2/inferenceservice_validation.go index 6521a003270..e932ce459d0 100644 --- a/pkg/apis/serving/v1alpha2/inferenceservice_validation.go +++ b/pkg/apis/serving/v1alpha2/inferenceservice_validation.go @@ -35,7 +35,8 @@ const ( ) var ( - SupportedStorageURIPrefixList = []string{"gs://", "s3://", "pvc://", "file://"} + SupportedStorageURIPrefixList = []string{"gs://", "s3://", "pvc://", "file://", "https://", "http://"} + AzureBlobURL = "blob.core.windows.net" AzureBlobURIRegEx = "https://(.+?).blob.core.windows.net/(.+)" ) diff --git a/pkg/apis/serving/v1alpha2/inferenceservice_validation_test.go b/pkg/apis/serving/v1alpha2/inferenceservice_validation_test.go index 41062153ef1..00e731e9e6f 100644 --- a/pkg/apis/serving/v1alpha2/inferenceservice_validation_test.go +++ b/pkg/apis/serving/v1alpha2/inferenceservice_validation_test.go @@ -99,6 +99,15 @@ func TestAzureBlobNoContainerFails(t *testing.T) { g.Expect(isvc.ValidateCreate(c)).ShouldNot(gomega.Succeed()) } +func TestHttpStorageURIPrefixOK(t *testing.T) { + g := gomega.NewGomegaWithT(t) + isvc := makeTestInferenceService() + isvc.Spec.Default.Predictor.Tensorflow.StorageURI = "https://raw.githubusercontent.com/someOrg/someRepo/model.tar.gz" + g.Expect(isvc.ValidateCreate(c)).Should(gomega.Succeed()) + isvc.Spec.Default.Predictor.Tensorflow.StorageURI = "http://raw.githubusercontent.com/someOrg/someRepo/model.tar.gz" + g.Expect(isvc.ValidateCreate(c)).Should(gomega.Succeed()) +} + func TestUnkownStorageURIPrefixFails(t *testing.T) { g := gomega.NewGomegaWithT(t) isvc := makeTestInferenceService() diff --git a/pkg/apis/serving/v1alpha2/utils.go b/pkg/apis/serving/v1alpha2/utils.go index 66e6351413d..07e6e5d3d08 100644 --- a/pkg/apis/serving/v1alpha2/utils.go +++ b/pkg/apis/serving/v1alpha2/utils.go @@ -82,16 +82,18 @@ func validateStorageURI(storageURI string) error { return nil } - // one of the prefixes we know? - for _, prefix := range SupportedStorageURIPrefixList { - if strings.HasPrefix(storageURI, prefix) { + // need to verify Azure Blob first, because it uses http(s):// prefix + if strings.Contains(storageURI, AzureBlobURL) { + azureURIMatcher := regexp.MustCompile(AzureBlobURIRegEx) + if parts := azureURIMatcher.FindStringSubmatch(storageURI); parts != nil { return nil } - } - - azureURIMatcher := regexp.MustCompile(AzureBlobURIRegEx) - if parts := azureURIMatcher.FindStringSubmatch(storageURI); parts != nil { - return nil + } else { + for _, prefix := range SupportedStorageURIPrefixList { + if strings.HasPrefix(storageURI, prefix) { + return nil + } + } } return fmt.Errorf(UnsupportedStorageURIFormatError, strings.Join(SupportedStorageURIPrefixList, ", "), storageURI) diff --git a/python/kfserving/README.md b/python/kfserving/README.md index 1e046e212e0..661833da36f 100644 --- a/python/kfserving/README.md +++ b/python/kfserving/README.md @@ -49,7 +49,9 @@ KFServing supports the following storage providers: * The `pvcname` is the name of the PVC that contains the model. * The `[path]` is the relative path to the model on the PVC. * For e.g. `pvc://mypvcname/model/path/on/pvc` - +* Generic URI, over either `HTTP`, prefixed with `http://` or `HTTPS`, prefixed with `https://`. For example: + * `https://.com/model.joblib` + * `http://.com/model.joblib` ## KFServing Client diff --git a/python/kfserving/kfserving/storage.py b/python/kfserving/kfserving/storage.py index 02c89ad073a..dfda1f5353b 100644 --- a/python/kfserving/kfserving/storage.py +++ b/python/kfserving/kfserving/storage.py @@ -15,9 +15,15 @@ import glob import logging import tempfile +import mimetypes import os import re +import shutil +import tarfile +import zipfile +import gzip from urllib.parse import urlparse +import requests from azure.storage.blob import BlockBlobService from google.auth import exceptions from google.cloud import storage @@ -27,7 +33,8 @@ _S3_PREFIX = "s3://" _BLOB_RE = "https://(.+?).blob.core.windows.net/(.+)" _LOCAL_PREFIX = "file://" - +_URI_RE = "https?://(.+)/(.+)" +_HTTP_PREFIX = "http(s)://" class Storage(object): # pylint: disable=too-few-public-methods @staticmethod @@ -52,10 +59,12 @@ def download(uri: str, out_dir: str = None) -> str: Storage._download_blob(uri, out_dir) elif is_local: return Storage._download_local(uri, out_dir) + elif re.search(_URI_RE, uri): + return Storage._download_from_uri(uri, out_dir) else: raise Exception("Cannot recognize storage type for " + uri + - "\n'%s', '%s', and '%s' are the current available storage type." % - (_GCS_PREFIX, _S3_PREFIX, _LOCAL_PREFIX)) + "\n'%s', '%s', '%s', and '%s' are the current available storage type." % + (_GCS_PREFIX, _S3_PREFIX, _LOCAL_PREFIX, _HTTP_PREFIX)) logging.info("Successfully copied %s to %s", uri, out_dir) return out_dir @@ -206,6 +215,43 @@ def _download_local(uri, out_dir=None): os.symlink(src, dest_path) return out_dir + @staticmethod + def _download_from_uri(uri, out_dir=None): + url = urlparse(uri) + filename = os.path.basename(url.path) + mimetype, encoding = mimetypes.guess_type(uri) + local_path = os.path.join(out_dir, filename) + + if filename == '': + raise ValueError('No filename contained in URI: %s' % (uri)) + + with requests.get(uri, stream=True) as response: + if response.status_code != 200: + raise RuntimeError("URI: %s returned a %s response code." % (uri, response.status_code)) + if mimetype == 'application/zip' and not response.headers.get('Content-Type', '').startswith('application/zip'): + raise RuntimeError("URI: %s did not respond with \'Content-Type\': \'application/zip\'" % (uri)) + if mimetype != 'application/zip' and not response.headers.get('Content-Type', '').startswith('application/octet-stream'): + raise RuntimeError("URI: %s did not respond with \'Content-Type\': \'application/octet-stream\'" % (uri)) + + if encoding == 'gzip': + stream = gzip.GzipFile(fileobj=response.raw) + local_path = os.path.join(out_dir, f'{filename}.tar') + else: + stream = response.raw + with open(local_path, 'wb') as out: + shutil.copyfileobj(stream, out) + + if mimetype in ["application/x-tar", "application/zip"]: + if mimetype == "application/x-tar": + archive = tarfile.open(local_path, 'r', encoding='utf-8') + else: + archive = zipfile.ZipFile(local_path, 'r') + archive.extractall(out_dir) + archive.close() + os.remove(local_path) + + return out_dir + @staticmethod def _create_minio_client(): # Adding prefixing "http" in urlparse is necessary for it to be the netloc diff --git a/python/kfserving/test/test_storage.py b/python/kfserving/test/test_storage.py index 65e4a4cd250..523876b2037 100644 --- a/python/kfserving/test/test_storage.py +++ b/python/kfserving/test/test_storage.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import io +import os import pytest import kfserving from minio import Minio, error @@ -40,6 +42,57 @@ def test_no_prefix_local_path(): assert kfserving.Storage.download(abs_path) == abs_path assert kfserving.Storage.download(relative_path) == relative_path +class MockHttpResponse(object): + def __init__( + self, + status_code=404, + raw=b'', + content_type='' + ): + self.status_code = status_code + self.raw = io.BytesIO(raw) + self.headers = {'Content-Type': content_type} + + def __enter__(self): + return self + def __exit__(self, ex_type, ex_val, traceback): + pass + +@mock.patch('requests.get', return_value=MockHttpResponse(status_code=200, content_type='application/octet-stream')) +def test_http_uri_path(_): + http_uri = 'http://foo.bar/model.joblib' + http_with_query_uri = 'http://foo.bar/model.joblib?foo=bar' + out_dir = '.' + assert kfserving.Storage.download(http_uri, out_dir=out_dir) == out_dir + assert kfserving.Storage.download(http_with_query_uri, out_dir=out_dir) == out_dir + os.remove('./model.joblib') + +@mock.patch('requests.get', return_value=MockHttpResponse(status_code=200, content_type='application/octet-stream')) +def test_https_uri_path(_): + https_uri = 'https://foo.bar/model.joblib' + https_with_query_uri = 'https://foo.bar/model.joblib?foo=bar' + out_dir = '.' + assert kfserving.Storage.download(https_uri, out_dir=out_dir) == out_dir + assert kfserving.Storage.download(https_with_query_uri, out_dir=out_dir) == out_dir + os.remove('./model.joblib') + +@mock.patch('requests.get', return_value=MockHttpResponse(status_code=404)) +def test_nonexistent_uri(_): + non_existent_uri = 'https://theabyss.net/model.joblib' + with pytest.raises(RuntimeError): + kfserving.Storage.download(non_existent_uri) + +@mock.patch('requests.get', return_value=MockHttpResponse(status_code=200)) +def test_uri_no_filename(_): + bad_uri = 'https://foo.bar/test/' + with pytest.raises(ValueError): + kfserving.Storage.download(bad_uri) + +@mock.patch('requests.get', return_value=MockHttpResponse(status_code=200, content_type='text/html')) +def test_html_content_type(_): + bad_uri = 'https://some.site.com/test.model' + with pytest.raises(RuntimeError): + kfserving.Storage.download(bad_uri) @mock.patch(STORAGE_MODULE + '.storage') def test_mock_gcs(mock_storage):