From 46c5d0daa6d161e52f61b8fcaa448870945bea6d Mon Sep 17 00:00:00 2001 From: Jeremy Lewi Date: Wed, 2 Jan 2019 17:11:06 -0800 Subject: [PATCH] Python script to cleanup argo and services (cloud endpoints) (#261) * Add a python function to GC old Argo workflows and cloud endpoints kubeflow/testing#53 GC old Argo Workflows kubeflow/testing#87 cron job to GC old resources. kubeflow/testing#268 Maximum number of services reached. * Fix lint. * Revert files that shouldn't be checked in. * Fix loop termination criterion. --- py/kubeflow/testing/cleanup_ci.py | 112 ++++++++++++++++++++++++++++++ 1 file changed, 112 insertions(+) diff --git a/py/kubeflow/testing/cleanup_ci.py b/py/kubeflow/testing/cleanup_ci.py index c70d211f514..0a8b4fc6330 100644 --- a/py/kubeflow/testing/cleanup_ci.py +++ b/py/kubeflow/testing/cleanup_ci.py @@ -8,6 +8,9 @@ import subprocess import tempfile +from kubeflow.testing import argo_client +from kubeflow.testing import util +from kubernetes import client as k8s_client from googleapiclient import discovery from oauth2client.client import GoogleCredentials @@ -22,6 +25,97 @@ def is_match(name): return False +def cleanup_workflows(args): + # We need to load the kube config so that we can have credentials to + # talk to the APIServer. + util.load_kube_config(persist_config=False) + + client = k8s_client.ApiClient() + crd_api = k8s_client.CustomObjectsApi(client) + workflows = crd_api.list_namespaced_custom_object( + argo_client.GROUP, argo_client.VERSION, args.namespace, argo_client.PLURAL) + + expired = [] + unexpired = [] + + for w in workflows["items"]: + is_expired = False + + start_time = date_parser.parse(w["status"]["startedAt"]) + now = datetime.datetime.now(start_time.tzinfo) + + name = w["metadata"]["name"] + age = now - start_time + if age > datetime.timedelta(hours=args.max_age_hours): + logging.info("Deleting workflow: %s", name) + is_expired = True + crd_api.delete_namespaced_custom_object( + argo_client.GROUP, argo_client.VERSION, args.namespace, + argo_client.PLURAL, name, k8s_client.V1DeleteOptions()) + break + + if is_expired: + expired.append(name) + else: + unexpired.append(name) + + logging.info("Unexpired workflows:\n%s", "\n".join(unexpired)) + logging.info("expired workflows:\n%s", "\n".join(expired)) + +def cleanup_endpoints(args): + credentials = GoogleCredentials.get_application_default() + + services_management = discovery.build('servicemanagement', 'v1', credentials=credentials) + services = services_management.services() + rollouts = services.rollouts() + next_page_token = None + + expired = [] + unexpired = [] + unmatched = [] + + while True: + results = services.list(producerProjectId=args.project, + pageToken=next_page_token).execute() + + for s in results["services"]: + name = s["serviceName"] + if not is_match(name): + unmatched.append(name) + continue + + all_rollouts = rollouts.list(serviceName=name).execute() + is_expired = False + if not all_rollouts.get("rollouts", []): + logging.info("Service %s has no rollouts", name) + is_expired = True + else: + r = all_rollouts["rollouts"][0] + create_time = date_parser.parse(r["createTime"]) + + now = datetime.datetime.now(create_time.tzinfo) + + age = now - create_time + if age > datetime.timedelta(hours=args.max_age_hours): + is_expired = True + + if is_expired: + logging.info("Deleting service: %s", name) + is_expired = True + services.delete(serviceName=name).execute() + expired.append(name) + else: + unexpired.append(name) + + if not "nextPageToken" in results: + break + next_page_token = results["nextPageToken"] + + + logging.info("Unmatched services:\n%s", "\n".join(unmatched)) + logging.info("Unexpired services:\n%s", "\n".join(unexpired)) + logging.info("expired services:\n%s", "\n".join(expired)) + def cleanup_service_accounts(args): credentials = GoogleCredentials.get_application_default() @@ -204,6 +298,24 @@ def main(): subparsers = parser.add_subparsers() + ###################################################### + # Parser for argo_workflows + parser_argo = subparsers.add_parser( + "workflows", help="Cleanup workflows") + + parser_argo.add_argument( + "--namespace", default="kubeflow-test-infra", + help="Namespace to cleanup.") + + parser_argo.set_defaults(func=cleanup_workflows) + + ###################################################### + # Parser for endpoints + parser_endpoints = subparsers.add_parser( + "endpoints", help="Cleanup endpoints") + + parser_endpoints.set_defaults(func=cleanup_endpoints) + ###################################################### # Parser for service accounts parser_service_account = subparsers.add_parser(