From e3c7cd4268e5ba956d6046e4730715a2042e3d3d Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Thu, 18 Feb 2021 09:55:49 +0000 Subject: [PATCH] Lambda function to scale ASG based on Github webhooks (#2) --- .pre-commit-config.yaml | 2 +- lambdas/scale_out_runner/.chalice/config.json | 19 ++ .../.chalice/deployed/prod.json | 28 +++ .../scale_out_runner/.chalice/prod_iam.json | 33 +++ lambdas/scale_out_runner/.gitignore | 20 ++ lambdas/scale_out_runner/app.py | 209 ++++++++++++++++++ lambdas/scale_out_runner/requirements.txt | 18 ++ requirements-test.txt | 2 + requirements.txt | 1 + tests/conftest.py | 16 ++ tests/lambdas/scale_out_runner/conftest.py | 35 +++ tests/lambdas/scale_out_runner/test_app.py | 61 +++++ 12 files changed, 443 insertions(+), 1 deletion(-) create mode 100644 lambdas/scale_out_runner/.chalice/config.json create mode 100644 lambdas/scale_out_runner/.chalice/deployed/prod.json create mode 100644 lambdas/scale_out_runner/.chalice/prod_iam.json create mode 100644 lambdas/scale_out_runner/.gitignore create mode 100644 lambdas/scale_out_runner/app.py create mode 100644 lambdas/scale_out_runner/requirements.txt create mode 100644 requirements-test.txt create mode 100644 tests/conftest.py create mode 100644 tests/lambdas/scale_out_runner/conftest.py create mode 100644 tests/lambdas/scale_out_runner/test_app.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 02a11bd..fa32fe1 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -31,7 +31,7 @@ repos: - id: forbid-tabs - id: insert-license name: Add license - exclude: ^\.github/.*$|^license-templates/ + exclude: ^\.github/.*$|^license-templates/|\.json$ args: - --comment-style - "|#|" diff --git a/lambdas/scale_out_runner/.chalice/config.json b/lambdas/scale_out_runner/.chalice/config.json new file mode 100644 index 0000000..bd29d8c --- /dev/null +++ b/lambdas/scale_out_runner/.chalice/config.json @@ -0,0 +1,19 @@ +{ + "version": "2.0", + "app_name": "scale_out_runner", + "stages": { + "dev": { + "api_gateway_stage": "api" + }, + "prod": { + "api_gateway_stage": "api", + "autogen_policy": false, + "iam_policy_file": "prod_iam.json", + "environment_variables": { + "GH_WEBHOOK_TOKEN_ENCRYPTED": "AQICAHg1MGVq8MAqYXSkkgy6iL19KSI14nJw8DelmHFuRpAfvQEh/mniWicOD0N1aVuHhp+VAAAAfDB6BgkqhkiG9w0BBwagbTBrAgEAMGYGCSqGSIb3DQEHATAeBglghkgBZQMEAS4wEQQMVqkAu70BaR2rDQ4qAgEQgDlIAxDrBlblL6XNKkJIs+zrNEMcGpS68JnoVh2s1oKEzbBXzYS16/9ZrW9CwUV4ULAd2EsGb90iz74=", + "ACTIONS_SQS_URL": "https://sqs.eu-central-1.amazonaws.com/827901512104/actions-runner-requests" + } + } + }, + "automatic_layer": true +} diff --git a/lambdas/scale_out_runner/.chalice/deployed/prod.json b/lambdas/scale_out_runner/.chalice/deployed/prod.json new file mode 100644 index 0000000..7f1a411 --- /dev/null +++ b/lambdas/scale_out_runner/.chalice/deployed/prod.json @@ -0,0 +1,28 @@ +{ + "resources": [ + { + "name": "managed-layer", + "resource_type": "lambda_layer", + "layer_version_arn": "arn:aws:lambda:eu-central-1:827901512104:layer:scale_out_runner-prod-managed-layer:35" + }, + { + "name": "api_handler_role", + "resource_type": "iam_role", + "role_arn": "arn:aws:iam::827901512104:role/scale_out_runner-prod-api_handler", + "role_name": "scale_out_runner-prod-api_handler" + }, + { + "name": "api_handler", + "resource_type": "lambda_function", + "lambda_arn": "arn:aws:lambda:eu-central-1:827901512104:function:scale_out_runner-prod" + }, + { + "name": "rest_api", + "resource_type": "rest_api", + "rest_api_id": "2onm92olq7", + "rest_api_url": "https://2onm92olq7.execute-api.eu-central-1.amazonaws.com/api/" + } + ], + "schema_version": "2.0", + "backend": "api" +} diff --git a/lambdas/scale_out_runner/.chalice/prod_iam.json b/lambdas/scale_out_runner/.chalice/prod_iam.json new file mode 100644 index 0000000..539bac3 --- /dev/null +++ b/lambdas/scale_out_runner/.chalice/prod_iam.json @@ -0,0 +1,33 @@ +{ + "Version": "2012-10-17", + "Statement": [ + { + "Sid": "VisualEditor0", + "Effect": "Allow", + "Action": [ + "logs:CreateLogStream", + "kms:Decrypt", + "autoscaling:SetDesiredCapacity", + "ssm:GetParameter", + "logs:CreateLogGroup", + "logs:PutLogEvents", + "dynamodb:UpdateItem" + ], + "Resource": [ + "arn:aws:ssm:*:827901512104:parameter/runners/*/configOverlay", + "arn:aws:autoscaling:*:827901512104:autoScalingGroup:*:autoScalingGroupName/AshbRunnerASG", + "arn:aws:kms:*:827901512104:key/48a58710-7ac6-4f88-995f-758a6a450faa", + "arn:aws:dynamodb:*:827901512104:table/GithubRunnerQueue", + "arn:*:logs:*:*:*" + ] + }, + { + "Sid": "VisualEditor1", + "Effect": "Allow", + "Action": [ + "autoscaling:DescribeAutoScalingGroups" + ], + "Resource": "*" + } + ] +} diff --git a/lambdas/scale_out_runner/.gitignore b/lambdas/scale_out_runner/.gitignore new file mode 100644 index 0000000..9e4df26 --- /dev/null +++ b/lambdas/scale_out_runner/.gitignore @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +.chalice/deployments/ +.chalice/venv/ +__pycache__/ diff --git a/lambdas/scale_out_runner/app.py b/lambdas/scale_out_runner/app.py new file mode 100644 index 0000000..2f40fb1 --- /dev/null +++ b/lambdas/scale_out_runner/app.py @@ -0,0 +1,209 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import codecs +import hmac +import json +import logging +import os +from typing import cast + +import boto3 +from chalice import BadRequestError, Chalice, ForbiddenError +from chalice.app import Request + +app = Chalice(app_name='scale_out_runner') +app.log.setLevel(logging.INFO) + +ASG_GROUP_NAME = os.getenv('ASG_NAME', 'AshbRunnerASG') +TABLE_NAME = os.getenv('COUNTER_TABLE', 'GithubRunnerQueue') +_commiters = set() +GH_WEBHOOK_TOKEN = None + +REPOS = os.getenv('REPOS') +if REPOS: + REPO_CONFIGURATION = json.loads(REPOS) +else: + REPO_CONFIGURATION = { + # : [list-of-branches-to-use-self-hosted-on] + 'apache/airflow': {'main', 'master'}, + } +del REPOS + + +@app.route('/', methods=['POST']) +def index(): + validate_gh_sig(app.current_request) + + if app.current_request.headers.get('X-GitHub-Event', None) != "check_run": + # Ignore things about installs/permissions etc + return {'ignored': 'not about check_runs'} + + body = app.current_request.json_body + + repo = body['repository']['full_name'] + + sender = body['sender']['login'] + + # Other repos configured with this app, but we don't do anything with them + # yet. + if repo not in REPO_CONFIGURATION: + app.log.info("Ignoring event for %r", repo) + return {'ignored': 'Other repo'} + + interested_branches = REPO_CONFIGURATION[repo] + + branch = body['check_run']['check_suite']['head_branch'] + + use_self_hosted = sender in commiters() or branch in interested_branches + payload = {'sender': sender, 'use_self_hosted': use_self_hosted} + + if body['action'] == 'completed' and body['check_run']['conclusion'] == 'cancelled': + if use_self_hosted: + # The only time we get a "cancelled" job is when it wasn't yet running. + queue_length = increment_dynamodb_counter(-1) + # Don't scale in the ASG -- let the CloudWatch alarm do that. + payload['new_queue'] = queue_length + else: + payload = {'ignored': 'unknown sender'} + + elif body['action'] != 'created': + payload = {'ignored': "action is not 'created'"} + + elif body['check_run']['status'] != 'queued': + # Skipped runs are "created", but are instantly completed. Ignore anything that is not queued + payload = {'ignored': "check_run.status is not 'queued'"} + else: + if use_self_hosted: + # Increment counter in DynamoDB + queue_length = increment_dynamodb_counter() + payload.update(**scale_asg_if_needed(queue_length)) + app.log.info( + "delivery=%s branch=%s: %r", + app.current_request.headers.get('X-GitHub-Delivery', None), + branch, + payload, + ) + return payload + + +def commiters(ssm_repo_name: str = os.getenv('SSM_REPO_NAME', 'apache/airflow')): + global _commiters + + if not _commiters: + client = boto3.client('ssm') + param_path = os.path.join('/runners/', ssm_repo_name, 'configOverlay') + app.log.info("Loading config overlay from %s", param_path) + + try: + + resp = client.get_parameter(Name=param_path, WithDecryption=True) + except client.exceptions.ParameterNotFound: + app.log.debug("Failed to load config overlay", exc_info=True) + return set() + + try: + overlay = json.loads(resp['Parameter']['Value']) + except ValueError: + app.log.debug("Failed to parse config overlay", exc_info=True) + return set() + + _commiters = set(overlay['pullRequestSecurity']['allowedAuthors']) + + return _commiters + + +def validate_gh_sig(request: Request): + sig = request.headers.get('X-Hub-Signature-256', None) + if not sig or not sig.startswith('sha256='): + raise BadRequestError('X-Hub-Signature-256 not of expected format') + + sig = sig[len('sha256=') :] + calculated_sig = sign_request_body(request) + + app.log.debug('Checksum verification - expected %s got %s', calculated_sig, sig) + + if not hmac.compare_digest(sig, calculated_sig): + raise ForbiddenError('Spoofed request') + + +def sign_request_body(request: Request) -> str: + global GH_WEBHOOK_TOKEN + if GH_WEBHOOK_TOKEN is None: + if 'GH_WEBHOOK_TOKEN' in os.environ: + # Local dev support: + GH_WEBHOOK_TOKEN = os.environ['GH_WEBHOOK_TOKEN'].encode('utf-8') + else: + encrypted = os.environb[b'GH_WEBHOOK_TOKEN_ENCRYPTED'] + + kms = boto3.client('kms') + response = kms.decrypt(CiphertextBlob=codecs.decode(encrypted, 'base64')) + GH_WEBHOOK_TOKEN = response['Plaintext'] + body = cast(bytes, request.raw_body) + return hmac.new(GH_WEBHOOK_TOKEN, body, digestmod='SHA256').hexdigest() # type: ignore + + +def increment_dynamodb_counter(delta: int = 1) -> int: + dynamodb = boto3.client('dynamodb') + args = dict( + TableName=TABLE_NAME, + Key={'id': {'S': 'queued_jobs'}}, + ExpressionAttributeValues={':delta': {'N': str(delta)}}, + UpdateExpression='ADD queued :delta', + ReturnValues='UPDATED_NEW', + ) + + if delta < 0: + # Make sure it never goes below zero! + args['ExpressionAttributeValues'][':limit'] = {'N': str(-delta)} + args['ConditionExpression'] = 'queued >= :limit' + + resp = dynamodb.update_item(**args) + return int(resp['Attributes']['queued']['N']) + + +def scale_asg_if_needed(num_queued_jobs: int) -> dict: + asg = boto3.client('autoscaling') + + resp = asg.describe_auto_scaling_groups( + AutoScalingGroupNames=[ASG_GROUP_NAME], + ) + + asg_info = resp['AutoScalingGroups'][0] + + current = asg_info['DesiredCapacity'] + max_size = asg_info['MaxSize'] + + busy = 0 + for instance in asg_info['Instances']: + if instance['LifecycleState'] == 'InService' and instance['ProtectedFromScaleIn']: + busy += 1 + app.log.info("Busy instances: %d, num_queued_jobs: %d, current_size: %d", busy, num_queued_jobs, current) + + new_size = num_queued_jobs + busy + if new_size > current: + if new_size <= max_size or current < max_size: + try: + new_size = min(new_size, max_size) + asg.set_desired_capacity(AutoScalingGroupName=ASG_GROUP_NAME, DesiredCapacity=new_size) + return {'new_capcity': new_size} + except asg.exceptions.ScalingActivityInProgressFault as e: + return {'error': str(e)} + else: + return {'capacity_at_max': True} + else: + return {'idle_instances': True} diff --git a/lambdas/scale_out_runner/requirements.txt b/lambdas/scale_out_runner/requirements.txt new file mode 100644 index 0000000..35f005b --- /dev/null +++ b/lambdas/scale_out_runner/requirements.txt @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +boto3 diff --git a/requirements-test.txt b/requirements-test.txt new file mode 100644 index 0000000..4e2748b --- /dev/null +++ b/requirements-test.txt @@ -0,0 +1,2 @@ +pytest~=6.0 +moto diff --git a/requirements.txt b/requirements.txt index 432240d..95528c3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,3 +18,4 @@ boto3 click~=7.1 requests +pytest~=6.0 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..13a8339 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/lambdas/scale_out_runner/conftest.py b/tests/lambdas/scale_out_runner/conftest.py new file mode 100644 index 0000000..378e130 --- /dev/null +++ b/tests/lambdas/scale_out_runner/conftest.py @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +import sys + +import pytest +from chalice.test import Client + +path = os.path.dirname(__file__) +idx = path.rfind('/tests/') +path = path[:idx] + path[idx + 6 :] +sys.path.append(path) + + +@pytest.fixture +def client(request): + app = getattr(request.module, "app") + + with Client(app) as client: + yield client diff --git a/tests/lambdas/scale_out_runner/test_app.py b/tests/lambdas/scale_out_runner/test_app.py new file mode 100644 index 0000000..52f8e99 --- /dev/null +++ b/tests/lambdas/scale_out_runner/test_app.py @@ -0,0 +1,61 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import json + +import pytest +from app import app # noqa + + +@pytest.fixture(autouse=True) +def no_requests(monkeypatch): + monkeypatch.setenv("GH_WEBHOOK_TOKEN", "abc") + + +def test_no_auth(client): + response = client.http.post('/', body=json.dumps({'hello': 'world'})) + assert response.status_code == 400 + + +@pytest.mark.parametrize( + "sig", + [ + "md5=", + # Valid, but not prefixed + "160156e060356c9444613b224fc5613a0a25315b7898fd5d8c7656bd8a6654af", + ], +) +def test_bad_auth(sig, client): + response = client.http.post( + '/', + headers={ + 'X-Hub-Signature-256': sig, + }, + body=json.dumps({'hello': 'world'}), + ) + assert response.status_code == 400 + + +def test_auth(client): + response = client.http.post( + '/', + headers={ + 'X-Hub-Signature-256': 'sha256=160156e060356c9444613b224fc5613a0a25315b7898fd5d8c7656bd8a6654af' + }, + body=json.dumps({'hello': 'world'}), + ) + assert response.status_code == 200