Skip to content

Commit

Permalink
Merge pull request #4971 from ministryofjustice/certificates_renewal_…
Browse files Browse the repository at this point in the history
…migration

Certificates renewal migration
  • Loading branch information
abachleda-baca authored Nov 1, 2024
2 parents 139ff78 + d300670 commit e00276b
Show file tree
Hide file tree
Showing 6 changed files with 299 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@@486457dc46e82b9a740ca0ef1dac6a38a3fc272d # v4.0.2
uses: aws-actions/configure-aws-credentials@486457dc46e82b9a740ca0ef1dac6a38a3fc272d # v4.0.2
with:
role-to-assume: ${{secrets.AWS_GITHUB_DORMANT_USERS_ARN}}
aws-region: eu-west-2
Expand Down
36 changes: 36 additions & 0 deletions .github/workflows/job-certificate-expire-test-run.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: Certificate Expiry Test Run

on:
workflow_dispatch:
inputs:
email:
description: What is the email address of the recipient?
jobs:
certificate-expiry-check-test-run:
name: Run certificate expiry script in test mode
runs-on: ubuntu-latest
permissions:
id-token: write
contents: read
steps:
- name: checkout repo content
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@486457dc46e82b9a740ca0ef1dac6a38a3fc272d # v4.0.2
with:
role-to-assume: ${{secrets.AWS_CERTIFICATE_EMAIL_ARN}}
aws-region: eu-west-2
- name: Python Setup
uses: actions/setup-python@0b93645e9fea7318ecaed2b359559ac225c90a2b # v5.3.0
with:
python-version: '3.11'
- name: Install Pipenv
run: |
pip install pipenv
pipenv install
- run: pipenv run python3 -m bin.check_certificate_expiry --test ${{ github.event.inputs.email }}
env:
GANDI_CERTIFICATES_TOKEN: ${{ secrets.GANDI_CERTIFICATES_TOKEN }}
NOTIFY_PROD_API_KEY: ${{ secrets.NOTIFY_PROD_API_KEY }}
S3_CERT_BUCKET_NAME: ${{ secrets.S3_CERT_BUCKET_NAME }}
S3_CERT_OBJECT_NAME: ${{ secrets.S3_CERT_OBJECT_NAME}}
101 changes: 101 additions & 0 deletions bin/check_certificate_expiry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import os
import sys
import logging

from services.gandi_service import GandiService
from services.notify_service import NotifyService
from services.s3_service import S3Service

from config.constants import MINISTRY_OF_JUSTICE


logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)


cert_config = {
"CERT_REPLY_EMAIL": "certificates@digital.justice.gov.uk",
"CERT_EXPIRY_THRESHOLDS": [30],
"CERT_URL_EXTENSION": "v5/certificate/issued-certs",
"CERT_REPORT_TEMPLATE_ID": "04b6ca6c-2945-4a0d-a267-53fb61b370ef",
"CERT_EXPIRY_TEMPALATE_ID": "06abd028-0a8f-43d9-a122-90a92f9b62ee"
}


def get_environment_variables() -> tuple:
gandi_token = os.environ.get("GANDI_CERTIFICATES_TOKEN")
if not gandi_token:
raise ValueError("No GANDI_CERTIFICATES_TOKEN environment variable set")

notify_api_key = os.environ.get("NOTIFY_PROD_API_KEY")
if not notify_api_key:
raise ValueError("No NOTIFY_PROD_API_KEY environment variable set")

s3_bucket_name = os.environ.get("S3_CERT_BUCKET_NAME")
if not s3_bucket_name:
raise ValueError("S3_CERT_BUCKET_NAME environment variable set")

s3_object_name = os.environ.get("S3_CERT_OBJECT_NAME")
if not s3_object_name:
raise ValueError("S3_CERT_OBJECT_NAME environment variable set")

return gandi_token, notify_api_key, s3_bucket_name, s3_object_name


def main(testrun: bool = False, test_email: str = ""):

gandi_token, notify_api_key, s3_bucket_name, s3_object_name = get_environment_variables()
logger.info("Instantiating services...")
gandi_service = GandiService(gandi_token, cert_config["CERT_URL_EXTENSION"])
notify_service = NotifyService(cert_config, notify_api_key, MINISTRY_OF_JUSTICE)
s3_service = S3Service(s3_bucket_name, MINISTRY_OF_JUSTICE,)

logger.info("Extracting email map from S3")
email_mappings = s3_service.get_json_file(s3_object_name, s3_object_name)

logger.info("Extracting certificate list from Gandi...")
certificate_list = gandi_service.get_certificate_list()
valid_certificate_list = gandi_service.get_certificates_in_valid_state(
certificate_list, email_mappings)
if expired_certificate_list := gandi_service.get_expired_certificates_from_valid_certificate_list(
valid_certificate_list, email_mappings, cert_config["CERT_EXPIRY_THRESHOLDS"]
):

print("Building parameters to send emails...")
email_parameter_list = notify_service.build_email_parameter_list_crs(
expired_certificate_list)

if testrun:
logger.info("Sending test email to {test_email}...")
notify_service.send_test_email_from_parameters_crs(
email_parameter_list, test_email)
logger.info("Building main report...")
report = notify_service.build_main_report_string_crs(
email_parameter_list)
logger.info("Sending test report to %s...", test_email)
notify_service.send_report_email_crs(
report, cert_config["CERT_REPORT_TEMPLATE_ID"], test_email)

else:
logger.info("Sending live emails...")
notify_service.send_emails_from_parameters_crs(email_parameter_list)
print("Building live report...")
report = notify_service.build_main_report_string_crs(
email_parameter_list)
print("Sending live report to Operations Engineering...")
notify_service.send_report_email_crs(
report, cert_config["CERT_REPORT_TEMPLATE_ID"], cert_config["CERT_REPLY_EMAIL"])
else:
logger.info("No expiring certificates found.")


if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == '--test':
if len(sys.argv) > 2:
main(True, sys.argv[2])
else:
raise SystemExit('Email address of recipient expected.')
else:
main()
75 changes: 75 additions & 0 deletions services/gandi_service.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import requests
import datetime


class GandiService:
def __init__(self, token, url_extension) -> None:
self.headers = {'Authorization': f'Bearer {token}'}
self.url = "https://api.gandi.net/" + url_extension
self.params = {'per_page': 1000}

def get_current_account_balance_from_org(self, org_id):
try:
Expand All @@ -18,3 +20,76 @@ def get_current_account_balance_from_org(self, org_id):
except TypeError as api_key_error:
raise TypeError(
f"Gandi API key does not exist or is in the wrong format:\n {api_key_error}") from api_key_error

def _get_email_address_of_domain_owners(self, domain_name, email_list):
domain_name_to_check = self._remove_suffix_if_present(domain_name)
if email_list[domain_name_to_check]['external_cname']:
return email_list[domain_name_to_check]['external_cname']
email_addresses_of_domain_owners = [
email_list[domain_name_to_check]['recipient']]
if email_list[domain_name_to_check]['recipientcc']:
email_addresses_of_domain_owners.extend(
iter(email_list[domain_name_to_check]['recipientcc'])
)
return email_addresses_of_domain_owners

def _remove_suffix_if_present(self, domain_name):
base, sep, suffix = domain_name.rpartition('.')
return base if sep == '.' and suffix.isdigit() else domain_name

def _check_certificate_state(self, domain_item, email_list, certificate_state) -> bool:
return domain_item['cn'] in email_list and domain_item['status'] == certificate_state

def _is_certificate_owned_by_operations_engineering(self, domain_item, email_list):
return email_list[domain_item['cn']]['owner'] == 'OE'

def _get_days_between_now_and_expiry_date(self, expiry_date):
return (expiry_date - (datetime.datetime.now().date())).days

def _format_expiry_date(self, date_string: str) -> datetime.date:
return datetime.datetime.strptime(date_string, '%Y-%m-%dT%H:%M:%SZ').date()

def get_certificate_list(self):
try:
response = requests.get(
url=self.url, params=self.params, headers=self.headers, timeout=60)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as authentication_error:
raise requests.exceptions.HTTPError(
f"You may need to export your Gandi API key:\n {authentication_error}") from authentication_error
except TypeError as api_key_error:
raise TypeError(
f"Gandi API key does not exist or is in the wrong format:\n {api_key_error}") from api_key_error

def get_certificates_in_valid_state(self, certificate_list, email_list):
valid_state_certificates = {}
for domain_item in certificate_list:
if self._check_certificate_state(domain_item, email_list, 'valid') and \
self._is_certificate_owned_by_operations_engineering(domain_item, email_list):
expiry_date = self._format_expiry_date(
domain_item['dates']['ends_at'])
base_cn = domain_item['cn']
suffix = 0
while base_cn in valid_state_certificates:
suffix += 1
base_cn = f"{domain_item['cn']}.{suffix}"
valid_state_certificates[base_cn] = {
"expiry_date": expiry_date
}
return valid_state_certificates

def get_expired_certificates_from_valid_certificate_list(self, valid_state_certificate_list: dict, email_list, cert_expiry_thresholds: list[int]):
expired_certificates = {}
for domain_item in valid_state_certificate_list:
days_between_now_and_expiry_date = self._get_days_between_now_and_expiry_date(
valid_state_certificate_list[domain_item]['expiry_date'])
if days_between_now_and_expiry_date in cert_expiry_thresholds:
email_addresses_of_domain_owners = \
self._get_email_address_of_domain_owners(
domain_item, email_list)
expired_certificates[domain_item] = {
"expiry_date": valid_state_certificate_list[domain_item]['expiry_date'],
"emails": email_addresses_of_domain_owners
}
return expired_certificates
68 changes: 68 additions & 0 deletions services/notify_service.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# pylint: disable=C0411
from datetime import datetime, timezone
import requests
from notifications_python_client.notifications import NotificationsAPIClient
Expand Down Expand Up @@ -103,3 +104,70 @@ def _send_email_reply_to_ops_eng(self, template_id: str, email: str, personalisa
raise requests.exceptions.HTTPError(
f"You may need to export your Notify API Key:\n {api_key_error}"
) from api_key_error

def build_email_parameter_list_crs(self, valid_certificate_list):
emails_parameter_list = []
for valid_certificate in valid_certificate_list:
params = {
'email_addresses': valid_certificate_list.get(valid_certificate).get('emails'),
'domain_name': valid_certificate,
'csr_email': self.config['CERT_REPLY_EMAIL'],
'end_date': valid_certificate_list.get(valid_certificate).get('expiry_date')
}
emails_parameter_list.append(params)
return emails_parameter_list

def _send_email_crs(self, email_params, recipients):
for email in recipients:
domain_name = self._remove_suffix_if_present(
email_params['domain_name'])
try:
self.client.send_email_notification(
email_address=email,
template_id=self.config["CERT_EXPIRY_TEMPALATE_ID"],
personalisation={
"domain_name": domain_name,
"csr_email": email_params['csr_email'],
"end_date": email_params['end_date'].strftime('%d/%m/%Y')
}
)
except requests.exceptions.HTTPError as api_key_error:
raise requests.exceptions.HTTPError(
f"You may need to export your Notify API Key:\n {api_key_error}"
) from api_key_error

def send_emails_from_parameters_crs(self, email_parameter_list):
for email_parameters in email_parameter_list:
self._send_email_crs(email_parameters,
email_parameters['email_addresses'])

def send_test_email_from_parameters_crs(self, email_parameter_list, test_email):
for email_parameters in email_parameter_list:
self._send_email_crs(email_parameters, [test_email])

def _remove_suffix_if_present(self, domain_name):
base, sep, suffix = domain_name.rpartition('.')
return base if sep == '.' and suffix.isdigit() else domain_name

def build_main_report_string_crs(self, email_parameter_list):
new_line = '\n'
return "".join(
f"Domain Name: {self._remove_suffix_if_present(email_parameter['domain_name'])}\n"
f"Sent to:\n{''.join([f'{address}{new_line}' for address in email_parameter['email_addresses']])}"
f"\nExpiry Date: {email_parameter['end_date']} \n\n"
for email_parameter in email_parameter_list
)

def send_report_email_crs(self, report, template_id, email):
try:
self.client.send_email_notification(
email_address=email,
template_id=template_id,
personalisation={
"report": report
}
)
except requests.exceptions.HTTPError as api_key_error:
raise requests.exceptions.HTTPError(
f"You may need to export your Notify API Key:\n {api_key_error}"
) from api_key_error
18 changes: 18 additions & 0 deletions services/s3_service.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
# pylint: disable=wrong-import-order
import os
import boto3
import json
from botocore.exceptions import ClientError

from json import JSONDecodeError


class S3Service:
def __init__(self, bucket_name: str, organisation_name: str) -> None:
Expand Down Expand Up @@ -32,3 +35,18 @@ def is_well_known_mta_sts_enforce(self, domain: str) -> bool:
return any(line.startswith("mode: enforce") for line in sts_content.split('\n'))
except ClientError:
return False

def get_json_file(self, object_name: str, file_path: str):

try:
with open(file_path, 'wb') as file:
self.client.download_fileobj(self.bucket_name, object_name, file)
with open(file_path, 'r', encoding="utf-8") as file:
mappings = file.read()
return json.loads(mappings)

except FileNotFoundError as e:
raise FileNotFoundError("Error downloading file") from e

except JSONDecodeError as e:
raise ValueError("File not in JSON Format") from e

0 comments on commit e00276b

Please sign in to comment.