Skip to content

Commit

Permalink
Update IB Pipelines Checker workflow (#835)
Browse files Browse the repository at this point in the history
* Update IB Pipeline Checker Workflow

Signed-off-by: Uday Beswal <uday@beswal.com>

* Fix pylint issues

Signed-off-by: Uday Beswal <uday@beswal.com>

---------

Signed-off-by: Uday Beswal <uday@beswal.com>
  • Loading branch information
UdBe authored Oct 8, 2024
1 parent 9baeb85 commit 594a136
Showing 1 changed file with 91 additions and 27 deletions.
118 changes: 91 additions & 27 deletions scripts/ib_pipelines_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import csv
from datetime import datetime, timedelta, timezone
import sys
import re
import requests

class PipelineChecker:
Expand All @@ -23,6 +24,7 @@ def __init__(self):
self.not_found_pipelines = []
self.skipped_pipelines = []
self.inactive_pipelines = []
self.partial_coverage_pipelines = []
self.total_pipelines = 0
self.passed_pipelines = 0
self.init_csv_file()
Expand Down Expand Up @@ -92,6 +94,26 @@ def get_jobs(self, endpoint, pipeline_id):
response.raise_for_status()
return response.json()

def get_job_trace(self, job_web_url):
"""
Get the log trace of a specific job using the job's web URL.
Args:
job_web_url (str): The web URL of the job.
Returns:
str: The job log trace.
"""
raw_log_url = job_web_url + '/trace'
response = requests.get(raw_log_url, timeout=20)
if response.status_code != 200:
print(
f"Failed to retrieve job trace from {raw_log_url}. "
f"Status code: {response.status_code}"
)
return ""
return response.text

def check_rapidfort_scan(self, jobs):
"""
Check the status of the rapidfort-scan job.
Expand All @@ -100,12 +122,12 @@ def check_rapidfort_scan(self, jobs):
jobs (list): The list of jobs in the pipeline.
Returns:
str: The status of the rapidfort-scan job or "not found" if the job does not exist.
tuple: (status, job) of the rapidfort-scan job.
"""
for job in jobs:
if job['name'] == 'rapidfort-scan':
return job['status']
return "not found"
return job['status'], job
return "not found", None

@staticmethod
def format_timestamp(iso_timestamp):
Expand All @@ -124,13 +146,13 @@ def format_timestamp(iso_timestamp):
@staticmethod
def is_pipeline_inactive(pipeline_created_at):
"""
Check if the pipeline was last run more than 7 days ago.
Check if the pipeline was last run more than 3 days ago.
Args:
pipeline_created_at (str): The ISO 8601 timestamp of the pipeline creation.
Returns:
bool: True if the pipeline was run more than 7 days ago, False otherwise.
bool: True if the pipeline was run more than 3 days ago, False otherwise.
"""
dt = datetime.fromisoformat(pipeline_created_at.replace("Z", "+00:00"))
return datetime.now(timezone.utc) - dt > timedelta(days=3)
Expand All @@ -150,20 +172,50 @@ def process_pipeline(self, link):
pipeline_web_url = latest_pipeline['web_url']
pipeline_time_created = self.format_timestamp(latest_pipeline['created_at'])
if self.is_pipeline_inactive(latest_pipeline['created_at']):
self.inactive_pipelines.append(f"{project_name}\nPipeline ID: {pipeline_id}\nPipeline URL: {pipeline_web_url}")
self.inactive_pipelines.append(
f"{project_name}\nPipeline ID: {pipeline_id}\n"
f"Pipeline URL: {pipeline_web_url}"
)
jobs = self.get_jobs(endpoint, pipeline_id)
rf_scan_status = self.check_rapidfort_scan(jobs)
self.write_to_csv(pipeline_time_created, pipeline_id, pipeline_web_url, rf_scan_status, project_name)
print(f"Time Created At: {pipeline_time_created}\nPipeline ID: {pipeline_id}\nURL: {pipeline_web_url}\nrapidfort-scan status: {rf_scan_status}")
rf_scan_status, rf_scan_job = self.check_rapidfort_scan(jobs)
print(
f"Time Created At: {pipeline_time_created}\n"
f"Pipeline ID: {pipeline_id}\nURL: {pipeline_web_url}\n"
f"rapidfort-scan status: {rf_scan_status}"
)
print("-" * 50)
if rf_scan_status == 'failed':
self.failed_pipelines.append(f"{project_name}\nPipeline ID: {pipeline_id}\nPipeline URL: {pipeline_web_url}")
if rf_scan_status == 'success':
job_trace = self.get_job_trace(rf_scan_job['web_url'])
if re.search(r'Partial coverage completed', job_trace, re.IGNORECASE):
self.partial_coverage_pipelines.append(
f"{project_name}\nPipeline ID: {pipeline_id}\n"
f"Pipeline URL: {pipeline_web_url}"
)
rf_scan_status = 'success (partial coverage)'
else:
self.passed_pipelines += 1
elif rf_scan_status == 'failed':
self.failed_pipelines.append(
f"{project_name}\nPipeline ID: {pipeline_id}\n"
f"Pipeline URL: {pipeline_web_url}"
)
elif rf_scan_status == 'not found':
self.not_found_pipelines.append(f"{project_name}\nPipeline ID: {pipeline_id}\nPipeline URL: {pipeline_web_url}")
self.not_found_pipelines.append(
f"{project_name}\nPipeline ID: {pipeline_id}\n"
f"Pipeline URL: {pipeline_web_url}"
)
elif rf_scan_status == 'skipped':
self.skipped_pipelines.append(f"{project_name}\nPipeline ID: {pipeline_id}\nPipeline URL: {pipeline_web_url}")
self.skipped_pipelines.append(
f"{project_name}\nPipeline ID: {pipeline_id}\n"
f"Pipeline URL: {pipeline_web_url}"
)
else:
self.passed_pipelines += 1
print(f"Unknown rapidfort-scan status: {rf_scan_status}")
print("-" * 50)
self.write_to_csv(
pipeline_time_created, pipeline_id, pipeline_web_url,
rf_scan_status, project_name
)
else:
print(f"No pipelines found for project endpoint: {endpoint}")
print("-" * 50)
Expand All @@ -174,22 +226,29 @@ def init_csv_file(self):
"""
with open(self.CSV_FILE_PATH, 'w', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
writer.writerow(["Pipeline Time Created", "Pipeline ID", "Pipeline URL", "rapidfort-scan Status", "Project Name"])

def write_to_csv(self, pipeline_time_created, pipeline_id, pipeline_web_url, rf_scan_status, project_name):
writer.writerow([
"Pipeline Time Created", "Pipeline ID", "Pipeline URL",
"rapidfort-scan Status", "Project Name"
])
# pylint: disable=too-many-arguments
def write_to_csv(self, pipeline_time_created, pipeline_id, pipeline_web_url,
rf_scan_status, project_name):
"""
Write the pipeline information to the CSV file.
Write pipeline information to the CSV file.
Args:
pipeline_time_created (str): The formatted pipeline creation time.
pipeline_id (int): The pipeline ID.
pipeline_web_url (str): The pipeline URL.
pipeline_time_created (str): The time the pipeline was created.
pipeline_id (int): The ID of the pipeline.
pipeline_web_url (str): The web URL of the pipeline.
rf_scan_status (str): The status of the rapidfort-scan job.
project_name (str): The name of the project.
"""
with open(self.CSV_FILE_PATH, 'a', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
writer.writerow([pipeline_time_created, pipeline_id, pipeline_web_url, rf_scan_status, project_name])
writer.writerow([
pipeline_time_created, pipeline_id, pipeline_web_url,
rf_scan_status, project_name
])

def print_summary(self):
"""
Expand All @@ -198,16 +257,20 @@ def print_summary(self):
print("Summary of Pipelines:")
print(f"Total: {self.total_pipelines}")
print(f"Passed: {self.passed_pipelines}")
print(f"Failed: {len(self.failed_pipelines)}")
print(
f"Failed (including partial coverage): "
f"{len(self.failed_pipelines) + len(self.partial_coverage_pipelines)}"
)
print(f"Skipped due to non-related failure: {len(self.skipped_pipelines)}")
print(f"Not found: {len(self.not_found_pipelines)}")
print(f"Inactive (not run in last 3 days): {len(self.inactive_pipelines)}")
print("-" * 50)

if self.failed_pipelines:
print("Failed Pipelines:")
for idx, failed_pipeline in enumerate(self.failed_pipelines, 1):
print(f"\n{idx}. {failed_pipeline}")
combined_failed_pipelines = self.failed_pipelines + self.partial_coverage_pipelines
if combined_failed_pipelines:
print("Failed Pipelines (including partial coverage):")
for idx, pipeline in enumerate(combined_failed_pipelines, 1):
print(f"\n{idx}. {pipeline}")
else:
print("No pipelines failed the rapidfort-scan.")
print("-" * 50)
Expand Down Expand Up @@ -255,6 +318,7 @@ def run(self):
print("::set-output name=workflow-status::passed")
sys.exit(0) # Exit with zero status


if __name__ == "__main__":
checker = PipelineChecker()
checker.run()

0 comments on commit 594a136

Please sign in to comment.