diff --git a/py/kubeflow/testing/argo_client.py b/py/kubeflow/testing/argo_client.py index 6872e3dfac2..59e789b49de 100644 --- a/py/kubeflow/testing/argo_client.py +++ b/py/kubeflow/testing/argo_client.py @@ -1,8 +1,10 @@ """Some utility functions for working with TfJobs.""" import datetime +import json import logging from retrying import retry +import six import time from kubernetes import client as k8s_client @@ -35,17 +37,35 @@ def log_status(workflow): GATEWAY_TIMEOUT = 504 def handle_retriable_exception(exception): - if (isinstance(exception, rest.ApiException) and + if isinstance(exception, rest.ApiException): + # ApiException could store the exit code in status or it might + # store it in HTTP response body + # see: https://github.com/kubernetes-client/python/blob/5e512ff564c244c50cab780d821542ed56aa965a/kubernetes/client/rest.py#L289 # pylint: disable=line-too-long + code = None + if exception.body: + if isinstance(exception.body, six.string_types): + body = {} + try: + logging.info("Parsing ApiException body: %s", exception.body) + body = json.loads(exception.body) + except json.JSONDecodeError as e: + logging.error("Error parsing body: %s", e) + else: + body = exception.body + code = body.get("code") + else: + code = exception.status + # UNAUTHORIZED and FORBIDDEN errors can be an indication we need to # refresh credentials - (exception.status == UNAUTHORIZED or exception.status == FORBIDDEN or - exception.status == GATEWAY_TIMEOUT)): - # Due to https://github.com/kubernetes-client/python-base/issues/59, - # we need to reload the kube config (which refreshes the GCP token). - # TODO(richardsliu): Remove this workaround when the k8s client issue - # is resolved. - util.load_kube_config() - return True + logging.info("ApiException code=%s", code) + if code in [UNAUTHORIZED, FORBIDDEN, GATEWAY_TIMEOUT]: + # Due to https://github.com/kubernetes-client/python-base/issues/59, + # we need to reload the kube config (which refreshes the GCP token). + # TODO(richardsliu): Remove this workaround when the k8s client issue + # is resolved. + util.load_kube_config() + return True return not isinstance(exception, util.TimeoutError)