Skip to content

Commit

Permalink
add API watch for SDK client (kubeflow#305)
Browse files Browse the repository at this point in the history
  • Loading branch information
jinchihe authored and k8s-ci-robot committed Aug 27, 2019
1 parent f31141e commit 7068489
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 33 deletions.
49 changes: 37 additions & 12 deletions python/kfserving/docs/KFServingClient.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Class | Method | Description
------------ | ------------- | -------------
KFServingClient | [set_credentials](#set_credentials) | Set Credentials|
KFServingClient | [create](#create) | Create KFService|
KFServingClient | [get](#get) | Get the specified KFService|
KFServingClient | [get](#get) | Get or watch the specified KFService or all KFServices in the namespace |
KFServingClient | [patch](#patch) | Patch the specified KFService|
KFServingClient | [delete](#delete) | Delete the specified KFService |

Expand Down Expand Up @@ -87,7 +87,7 @@ s3_verify_ssl | str | S3 only|Optional. If HTTPS is used, SSL verification coul


## create
> create(kfservice, namespace=None)
> create(kfservice, namespace=None, watch=False, timeout_seconds=600)
Create the provided KFService in the specified namespace

Expand Down Expand Up @@ -115,19 +115,25 @@ kfsvc = V1alpha2KFService(api_version=constants.KFSERVING_GROUP + '/' + constant

KFServing = KFServingClient()
KFServing.create(kfsvc)

# The API also supports watching the created KFService status till it's READY.
# KFServing.create(kfsvc, watch=True)
```


### Parameters
Name | Type | Description | Notes
------------ | ------------- | ------------- | -------------
kfservice | [V1alpha2KFService](V1alpha2KFService.md) | kfservice defination| |
namespace | str | Namespace for kfservice deploying to. If the `namespace` is not defined, will align with kfservice definition, or use current or default namespace if namespace is not specified in kfservice definition. | |
kfservice | [V1alpha2KFService](V1alpha2KFService.md) | KFService defination| Required |
namespace | str | Namespace for KFService deploying to. If the `namespace` is not defined, will align with KFService definition, or use current or default namespace if namespace is not specified in KFService definition. | Optional |
watch | bool | Watch the created KFService if `True`, otherwise will return the created KFService object. Stop watching if KFService reaches the optional specified `timeout_seconds` or once the KFService overall status `READY` is `True`. | Optional |
timeout_seconds | int | Timeout seconds for watching. Defaults to 600. | Optional |

### Return type
object

## get
> get(name, namespace=None)
> get(name=None, namespace=None, watch=False, timeout_seconds=600)
Get the created KFService in the specified namespace

Expand All @@ -139,19 +145,33 @@ from kfserving import KFServingClient
KFServing = KFServingClient()
KFServing.get('flower-sample', namespace='kubeflow')
```
The API also support watching the specified KFService or all KFService in the namespace.
```python
KFServing.get('flower-sample', namespace='kubeflow', watch=True, timeout_seconds=120)
```
The outputs will be as following. Stop watching if KFService reaches the optional specified `timeout_seconds` or once the KFService overall status `READY` is `True`.
```sh
NAME READY DEFAULT_TRAFFIC CANARY_TRAFFIC URL
flower-sample Unknown http://flower-sample.kubeflow.example.com
flower-sample Unknown 90 10 http://flower-sample.kubeflow.example.com
flower-sample True 90 10 http://flower-sample.kubeflow.example.com
```


### Parameters
Name | Type | Description | Notes
------------ | ------------- | ------------- | -------------
name | str | kfservice name| |
namespace | str | The kfservice's namespace. Defaults to current or default namespace.| |
name | str | KFService name. If the `name` is not specified, it will get or watch all KFServices in the namespace.| Optional. |
namespace | str | The KFService's namespace. Defaults to current or default namespace.| Optional |
watch | bool | Watch the specified KFService or all KFService in the namespace if `True`, otherwise will return object for the specified KFService or all KFService in the namespace. Stop watching if KFService reaches the optional specified `timeout_seconds` or once the speficed KFService overall status `READY` is `True` (Only if the `name` is speficed). | Optional |
timeout_seconds | int | Timeout seconds for watching. Defaults to 600. | Optional |

### Return type
object


## patch
> patch(name, kfservice, namespace=None)
> patch(name, kfservice, namespace=None, watch=False, timeout_seconds=600)
Patch the created KFService in the specified namespace

Expand Down Expand Up @@ -180,13 +200,18 @@ kfsvc = V1alpha2KFService(api_version=constants.KFSERVING_GROUP + '/' + constant

KFServing = KFServingClient()
KFServing.patch('flower-sample', kfsvc)

# The API also supports watching the patached KFService status till it's READY.
# KFServing.patch('flower-sample', kfsvc, watch=True)
```

### Parameters
Name | Type | Description | Notes
------------ | ------------- | ------------- | -------------
kfservice | [V1alpha2KFService](V1alpha2KFService.md) | kfservice defination| |
namespace | str | The kfservice's namespace for patching. If the `namespace` is not defined, will align with kfservice definition, or use current or default namespace if namespace is not specified in kfservice definition. | |
kfservice | [V1alpha2KFService](V1alpha2KFService.md) | KFService defination| Required |
namespace | str | The KFService's namespace for patching. If the `namespace` is not defined, will align with KFService definition, or use current or default namespace if namespace is not specified in KFService definition. | Optional|
watch | bool | Watch the patched KFService if `True`, otherwise will return the patched KFService object. Stop watching if KFService reaches the optional specified `timeout_seconds` or once the KFService overall status `READY` is `True`. | Optional |
timeout_seconds | int | Timeout seconds for watching. Defaults to 600. | Optional |

### Return type
object
Expand All @@ -209,8 +234,8 @@ KFServing.get('flower-sample', namespace='kubeflow')
### Parameters
Name | Type | Description | Notes
------------ | ------------- | ------------- | -------------
Name | str | kfservice name| |
namespace | str | The kfservice's namespace. Defaults to current or default namespace. | |
Name | str | KFService name| |
namespace | str | The kfservice's namespace. Defaults to current or default namespace. | Optional|

### Return type
object
75 changes: 59 additions & 16 deletions python/kfserving/kfserving/api/kf_serving_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from ..constants import constants
from ..utils import utils
from .creds_utils import set_gcs_credentials, set_s3_credentials, set_azure_credentials
from .kf_serving_watch import watch as kfsvc_watch


class KFServingClient(object):
Expand Down Expand Up @@ -73,14 +74,14 @@ def set_credentials(self, storage_type, namespace=None, credentials_file=None,
currently.\n" % storage_type)


def create(self, kfservice, namespace=None):
def create(self, kfservice, namespace=None, watch=False, timeout_seconds=600): #pylint:disable=inconsistent-return-statements
"""Create the provided KFService in the specified namespace"""

if namespace is None:
namespace = utils.set_kfsvc_namespace(kfservice)

try:
return self.api_instance.create_namespaced_custom_object(
outputs = self.api_instance.create_namespaced_custom_object(
constants.KFSERVING_GROUP,
constants.KFSERVING_VERSION,
namespace,
Expand All @@ -91,31 +92,65 @@ def create(self, kfservice, namespace=None):
"Exception when calling CustomObjectsApi->create_namespaced_custom_object:\
%s\n" % e)

def get(self, name, namespace=None):
if watch:
kfsvc_watch(
name=outputs['metadata']['name'],
namespace=namespace,
timeout_seconds=timeout_seconds)
else:
return outputs


def get(self, name=None, namespace=None, watch=False, timeout_seconds=600): #pylint:disable=inconsistent-return-statements
"""Get the created KFService in the specified namespace"""

if namespace is None:
namespace = utils.get_default_target_namespace()

try:
return self.api_instance.get_namespaced_custom_object(
constants.KFSERVING_GROUP,
constants.KFSERVING_VERSION,
namespace,
constants.KFSERVING_PLURAL,
name)
except client.rest.ApiException as e:
raise RuntimeError(
"Exception when calling CustomObjectsApi->get_namespaced_custom_object: %s\n" % e)

def patch(self, name, kfservice, namespace=None):
if name:
if watch:
kfsvc_watch(
name=name,
namespace=namespace,
timeout_seconds=timeout_seconds)
else:
try:
return self.api_instance.get_namespaced_custom_object(
constants.KFSERVING_GROUP,
constants.KFSERVING_VERSION,
namespace,
constants.KFSERVING_PLURAL,
name)
except client.rest.ApiException as e:
raise RuntimeError(
"Exception when calling CustomObjectsApi->get_namespaced_custom_object:\
%s\n" % e)
else:
if watch:
kfsvc_watch(
namespace=namespace,
timeout_seconds=timeout_seconds)
else:
try:
return self.api_instance.list_namespaced_custom_object(
constants.KFSERVING_GROUP,
constants.KFSERVING_VERSION,
namespace,
constants.KFSERVING_PLURAL)
except client.rest.ApiException as e:
raise RuntimeError(
"Exception when calling CustomObjectsApi->list_namespaced_custom_object:\
%s\n" % e)


def patch(self, name, kfservice, namespace=None, watch=False, timeout_seconds=600): # pylint:disable=too-many-arguments,inconsistent-return-statements
"""Patch the created KFService in the specified namespace"""

if namespace is None:
namespace = utils.set_kfsvc_namespace(kfservice)

try:
return self.api_instance.patch_namespaced_custom_object(
outputs = self.api_instance.patch_namespaced_custom_object(
constants.KFSERVING_GROUP,
constants.KFSERVING_VERSION,
namespace,
Expand All @@ -127,6 +162,14 @@ def patch(self, name, kfservice, namespace=None):
"Exception when calling CustomObjectsApi->patch_namespaced_custom_object:\
%s\n" % e)

if watch:
kfsvc_watch(
name=outputs['metadata']['name'],
namespace=namespace,
timeout_seconds=timeout_seconds)
else:
return outputs

def delete(self, name, namespace=None):
"""Delete the provided KFService in the specified namespace"""

Expand Down
58 changes: 58 additions & 0 deletions python/kfserving/kfserving/api/kf_serving_watch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Copyright 2019 The Kubeflow Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from kubernetes import client
from kubernetes import watch as k8s_watch
from table_logger import TableLogger

from ..constants import constants
from ..utils import utils


def watch(name=None, namespace=None, timeout_seconds=600):
"""Watch the created or patched KFService in the specified namespace"""

if namespace is None:
namespace = utils.get_default_target_namespace()

tbl = TableLogger(
columns='NAME,READY,DEFAULT_TRAFFIC,CANARY_TRAFFIC,URL',
colwidth={'NAME': 20, 'READY':10, 'DEFAULT_TRAFFIC':15, 'CANARY_TRAFFIC':15, 'URL': 50},
border=False)

stream = k8s_watch.Watch().stream(
client.CustomObjectsApi().list_namespaced_custom_object,
constants.KFSERVING_GROUP,
constants.KFSERVING_VERSION,
namespace,
constants.KFSERVING_PLURAL,
timeout_seconds=timeout_seconds)

for event in stream:
kfserivce = event['object']
kfsvc_name = kfserivce['metadata']['name']
if name and name != kfsvc_name:
continue
else:
url = kfserivce['status'].get('url', '')
default_traffic = kfserivce['status'].get('default', {}).get('traffic', '')
canary_traffic = kfserivce['status'].get('canary', {}).get('traffic', '')
status = 'Unknown'
for condition in kfserivce['status'].get('conditions', {}):
if condition.get('type', '') == 'Ready':
status = condition.get('status', 'Unknown')
tbl(kfsvc_name, status, default_traffic, canary_traffic, url)

if name == kfsvc_name and status == 'True':
break
3 changes: 2 additions & 1 deletion python/kfserving/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ six >= 1.10
python_dateutil >= 2.5.3
setuptools >= 21.0.0
urllib3 >= 1.15.1
kubernetes >= 9.0.0
kubernetes >= 10.0.1
tornado >= 1.4.1
argparse >= 1.4.0
minio >= 4.0.9
google-cloud-storage >= 1.16.0
azure-storage-blob >= 2.1.0
adal >= 1.2.2
table_logger >= 0.3.5
numpy
15 changes: 11 additions & 4 deletions python/kfserving/test/test_kfservice_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,28 +39,35 @@ def generate_kfservice():
spec=V1alpha2KFServiceSpec(default=default_model_spec))
return kfsvc

# Unit test for kfserving create api
def test_kfservice_client_creat():
'''Unit test for kfserving create api'''
with patch('kfserving.api.kf_serving_client.KFServingClient.create',
return_value=mocked_unit_result):
kfsvc = generate_kfservice()
assert mocked_unit_result == KFServing.create(kfsvc, namespace='kubeflow')

# Unit test for kfserving get api
def test_kfservice_client_get():
'''Unit test for kfserving get api'''
with patch('kfserving.api.kf_serving_client.KFServingClient.get',
return_value=mocked_unit_result):
assert mocked_unit_result == KFServing.get('flower-sample', namespace='kubeflow')

# Unit test for kfserving patch api
def test_kfservice_client_watch():
'''Unit test for kfserving get api'''
with patch('kfserving.api.kf_serving_client.KFServingClient.get',
return_value=mocked_unit_result):
assert mocked_unit_result == KFServing.get('flower-sample', namespace='kubeflow',
watch=True, timeout_seconds=120)

def test_kfservice_client_patch():
'''Unit test for kfserving patch api'''
with patch('kfserving.api.kf_serving_client.KFServingClient.patch',
return_value=mocked_unit_result):
kfsvc = generate_kfservice()
assert mocked_unit_result == KFServing.patch('flower-sample', kfsvc, namespace='kubeflow')

# Unit test for kfserving delete api
def test_kfservice_client_delete():
'''Unit test for kfserving delete api'''
with patch('kfserving.api.kf_serving_client.KFServingClient.delete',
return_value=mocked_unit_result):
assert mocked_unit_result == KFServing.delete('flower-sample', namespace='kubeflow')

0 comments on commit 7068489

Please sign in to comment.