Skip to content

Commit

Permalink
Python script to cleanup argo and services (cloud endpoints) (kubeflo…
Browse files Browse the repository at this point in the history
…w#261)

* Add a python function to GC old Argo workflows and cloud endpoints

kubeflow/testing#53 GC old Argo Workflows
kubeflow/testing#87 cron job to GC old resources.
kubeflow/testing#268 Maximum number of services reached.

* Fix lint.

* Revert files that shouldn't be checked in.

* Fix loop termination criterion.
  • Loading branch information
jlewi authored and k8s-ci-robot committed Jan 3, 2019
1 parent bccafbf commit 46c5d0d
Showing 1 changed file with 112 additions and 0 deletions.
112 changes: 112 additions & 0 deletions py/kubeflow/testing/cleanup_ci.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import subprocess
import tempfile

from kubeflow.testing import argo_client
from kubeflow.testing import util
from kubernetes import client as k8s_client
from googleapiclient import discovery
from oauth2client.client import GoogleCredentials

Expand All @@ -22,6 +25,97 @@ def is_match(name):

return False

def cleanup_workflows(args):
# We need to load the kube config so that we can have credentials to
# talk to the APIServer.
util.load_kube_config(persist_config=False)

client = k8s_client.ApiClient()
crd_api = k8s_client.CustomObjectsApi(client)
workflows = crd_api.list_namespaced_custom_object(
argo_client.GROUP, argo_client.VERSION, args.namespace, argo_client.PLURAL)

expired = []
unexpired = []

for w in workflows["items"]:
is_expired = False

start_time = date_parser.parse(w["status"]["startedAt"])
now = datetime.datetime.now(start_time.tzinfo)

name = w["metadata"]["name"]
age = now - start_time
if age > datetime.timedelta(hours=args.max_age_hours):
logging.info("Deleting workflow: %s", name)
is_expired = True
crd_api.delete_namespaced_custom_object(
argo_client.GROUP, argo_client.VERSION, args.namespace,
argo_client.PLURAL, name, k8s_client.V1DeleteOptions())
break

if is_expired:
expired.append(name)
else:
unexpired.append(name)

logging.info("Unexpired workflows:\n%s", "\n".join(unexpired))
logging.info("expired workflows:\n%s", "\n".join(expired))

def cleanup_endpoints(args):
credentials = GoogleCredentials.get_application_default()

services_management = discovery.build('servicemanagement', 'v1', credentials=credentials)
services = services_management.services()
rollouts = services.rollouts()
next_page_token = None

expired = []
unexpired = []
unmatched = []

while True:
results = services.list(producerProjectId=args.project,
pageToken=next_page_token).execute()

for s in results["services"]:
name = s["serviceName"]
if not is_match(name):
unmatched.append(name)
continue

all_rollouts = rollouts.list(serviceName=name).execute()
is_expired = False
if not all_rollouts.get("rollouts", []):
logging.info("Service %s has no rollouts", name)
is_expired = True
else:
r = all_rollouts["rollouts"][0]
create_time = date_parser.parse(r["createTime"])

now = datetime.datetime.now(create_time.tzinfo)

age = now - create_time
if age > datetime.timedelta(hours=args.max_age_hours):
is_expired = True

if is_expired:
logging.info("Deleting service: %s", name)
is_expired = True
services.delete(serviceName=name).execute()
expired.append(name)
else:
unexpired.append(name)

if not "nextPageToken" in results:
break
next_page_token = results["nextPageToken"]


logging.info("Unmatched services:\n%s", "\n".join(unmatched))
logging.info("Unexpired services:\n%s", "\n".join(unexpired))
logging.info("expired services:\n%s", "\n".join(expired))

def cleanup_service_accounts(args):
credentials = GoogleCredentials.get_application_default()

Expand Down Expand Up @@ -204,6 +298,24 @@ def main():

subparsers = parser.add_subparsers()

######################################################
# Parser for argo_workflows
parser_argo = subparsers.add_parser(
"workflows", help="Cleanup workflows")

parser_argo.add_argument(
"--namespace", default="kubeflow-test-infra",
help="Namespace to cleanup.")

parser_argo.set_defaults(func=cleanup_workflows)

######################################################
# Parser for endpoints
parser_endpoints = subparsers.add_parser(
"endpoints", help="Cleanup endpoints")

parser_endpoints.set_defaults(func=cleanup_endpoints)

######################################################
# Parser for service accounts
parser_service_account = subparsers.add_parser(
Expand Down

0 comments on commit 46c5d0d

Please sign in to comment.