Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions apiserver/dora/service/code/sync/etl_code_factory.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from dora.service.code.sync.etl_github_handler import get_github_etl_handler
from dora.service.code.sync.etl_provider_handler import ProviderETLHandler
from dora.store.models import UserIdentity
from dora.service.code.sync.etl_provider_handler import CodeProviderETLHandler
from dora.store.models.code import CodeProvider


class CodeETLFactory:
def __init__(self, org_id: str):
self.org_id = org_id

def __call__(self, provider: str) -> ProviderETLHandler:
if provider == UserIdentity.GITHUB.value:
def __call__(self, provider: str) -> CodeProviderETLHandler:
if provider == CodeProvider.GITHUB.value:
return get_github_etl_handler(self.org_id)
7 changes: 4 additions & 3 deletions apiserver/dora/service/code/sync/etl_github_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from dora.exapi.github import GithubApiService
from dora.service.code.sync.etl_code_analytics import CodeETLAnalyticsService
from dora.service.code.sync.etl_provider_handler import ProviderETLHandler
from dora.service.code.sync.etl_provider_handler import CodeProviderETLHandler
from dora.service.code.sync.revert_prs_github_sync import (
RevertPRsGitHubSyncHandler,
get_revert_prs_github_sync_handler,
Expand All @@ -25,6 +25,7 @@
PullRequestEvent,
PullRequestEventType,
PullRequestRevertPRMapping,
CodeProvider,
)
from dora.store.repos.code import CodeRepoService
from dora.store.repos.core import CoreRepoService
Expand All @@ -33,7 +34,7 @@
PR_PROCESSING_CHUNK_SIZE = 100


class GithubETLHandler(ProviderETLHandler):
class GithubETLHandler(CodeProviderETLHandler):
def __init__(
self,
org_id: str,
Expand All @@ -51,7 +52,7 @@ def __init__(
self.github_revert_pr_sync_handler: RevertPRsGitHubSyncHandler = (
github_revert_pr_sync_handler
)
self.provider: str = UserIdentityProvider.GITHUB.value
self.provider: str = CodeProvider.GITHUB.value

def check_pat_validity(self) -> bool:
"""
Expand Down
7 changes: 5 additions & 2 deletions apiserver/dora/service/code/sync/etl_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
import pytz

from dora.service.code.integration import get_code_integration_service
from dora.service.code.sync.etl_code_factory import ProviderETLHandler, CodeETLFactory
from dora.service.code.sync.etl_code_factory import (
CodeProviderETLHandler,
CodeETLFactory,
)
from dora.store.models.code import OrgRepo, BookmarkType, Bookmark, PullRequest
from dora.store.repos.code import CodeRepoService
from dora.utils.log import LOG
Expand All @@ -14,7 +17,7 @@ class CodeETLHandler:
def __init__(
self,
code_repo_service: CodeRepoService,
etl_service: ProviderETLHandler,
etl_service: CodeProviderETLHandler,
):
self.code_repo_service = code_repo_service
self.etl_service = etl_service
Expand Down
2 changes: 1 addition & 1 deletion apiserver/dora/service/code/sync/etl_provider_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
)


class ProviderETLHandler(ABC):
class CodeProviderETLHandler(ABC):
@abstractmethod
def check_pat_validity(self) -> bool:
"""
Expand Down
Empty file.
Empty file.
168 changes: 168 additions & 0 deletions apiserver/dora/service/workflows/sync/etl_github_actions_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
from datetime import datetime
from typing import Dict, Optional, List

import pytz

from dora.exapi.github import GithubApiService
from dora.service.workflows.sync.etl_provider_handler import WorkflowProviderETLHandler
from dora.store.models import UserIdentityProvider
from dora.store.models.code import (
RepoWorkflowProviders,
RepoWorkflowRunsStatus,
RepoWorkflowRuns,
OrgRepo,
RepoWorkflowRunsBookmark,
RepoWorkflow,
)
from dora.utils.log import LOG
from dora.utils.time import ISO_8601_DATE_FORMAT, time_now

DEFAULT_WORKFLOW_SYNC_DAYS = 31
WORKFLOW_PROCESSING_CHUNK_SIZE = 100


class GithubActionsETLHandler(WorkflowProviderETLHandler):
def __init__(self, org_id: str, github_api_service: GithubApiService):
self.org_id = org_id
self._api: GithubApiService = github_api_service
self._provider = RepoWorkflowProviders.GITHUB_ACTIONS.value

def check_pat_validity(self) -> bool:
"""
This method checks if the PAT is valid.
:returns: PAT details
:raises: Exception if PAT is invalid
"""
is_valid = self._api.check_pat()
if not is_valid:
raise Exception("Github Personal Access Token is invalid")
return is_valid

def get_workflow_runs(
self,
org_repo: OrgRepo,
repo_workflow: RepoWorkflow,
bookmark: RepoWorkflowRunsBookmark,
) -> List[RepoWorkflowRuns]:
"""
This method returns all workflow runs of a repo's workflow. After the bookmark date.
:param org_repo: OrgRepo object to get workflow runs for
:param repo_workflow: RepoWorkflow object to get workflow runs for
:param bookmark: Bookmark object to get all workflow runs after this date
:return: Workflow runs
"""
bookmark_time_stamp = datetime.fromisoformat(bookmark.bookmark)
try:
github_workflow_runs = self._api.get_workflow_runs(
org_repo.org_name,
org_repo.name,
repo_workflow.provider_workflow_id,
bookmark_time_stamp,
)
except Exception as e:
raise Exception(
f"[GitHub Sync Repo Workflow Worker] Error fetching workflow {str(repo_workflow.id)} "
f"for repo {str(org_repo.repo_id)}: {str(e)}"
)

if not github_workflow_runs:
LOG.info(
f"[GitHub Sync Repo Workflow Worker] No Workflow Runs found for "
f"Workflow: {str(repo_workflow.provider_workflow_id)}. Repo: {org_repo.org_name}/{org_repo.name}. "
f"Org: {self.org_id}"
)
return []

bookmark.bookmark = self._get_new_bookmark_time_stamp(
github_workflow_runs
).isoformat()

return self._get_db_workflows(github_workflow_runs, str(repo_workflow.id))

def _get_new_bookmark_time_stamp(
self, github_workflow_runs: List[Dict]
) -> datetime:
"""
This method returns the new bookmark timestamp for the workflow runs.
It returns the minimum timestamp of the pending jobs if there are any pending jobs.
This is done because there might be a workflow run that is still pending, and we
want to fetch it in the next sync.
"""
pending_job_timestamps = [
self._get_datetime_from_gh_datetime(workflow_run["created_at"])
for workflow_run in github_workflow_runs
if workflow_run["status"] != "completed"
]
return min(pending_job_timestamps) if pending_job_timestamps else time_now()

def _get_db_workflows(
self, github_workflows_runs: List[Dict], repo_workflow_id: str
) -> List[RepoWorkflowRuns]:
repo_workflow_runs: List[RepoWorkflowRuns] = []
for run in github_workflows_runs:
repo_workflow_runs.append(
RepoWorkflowRuns(
repo_workflow_id=repo_workflow_id,
provider_workflow_run_id=str(run["id"]),
event_actor=run["actor"]["login"],
head_branch=run["head_branch"],
status=self._get_repo_workflow_status(run),
created_at=time_now(),
updated_at=time_now(),
conducted_at=self._get_datetime_from_gh_datetime(
run["run_started_at"]
),
duration=self._get_repo_workflow_run_duration(run),
meta=run,
html_url=run["html_url"],
)
)
return repo_workflow_runs

def _get_repo_workflow_status(
self, github_workflow: Dict
) -> RepoWorkflowRunsStatus:
if github_workflow["status"] != "completed":
return RepoWorkflowRunsStatus.PENDING
if github_workflow["conclusion"] == "success":
return RepoWorkflowRunsStatus.SUCCESS
return RepoWorkflowRunsStatus.FAILURE

def _get_repo_workflow_run_duration(
self, github_workflow_run: Dict
) -> Optional[int]:

if not (
github_workflow_run.get("updated_at")
and github_workflow_run.get("run_started_at")
):
return None

workflow_run_updated_at = self._get_datetime_from_gh_datetime(
github_workflow_run.get("updated_at")
)
workflow_run_conducted_at = self._get_datetime_from_gh_datetime(
github_workflow_run.get("run_started_at")
)
return int(
(workflow_run_updated_at - workflow_run_conducted_at).total_seconds()
)

def _get_datetime_from_gh_datetime(self, datetime_str: str) -> datetime:
return datetime.strptime(datetime_str, ISO_8601_DATE_FORMAT).astimezone(
tz=pytz.UTC
)


def get_github_actions_etl_handler(core_repo_service, org_id):
def _get_access_token():
access_token = core_repo_service.get_access_token(
org_id, UserIdentityProvider.GITHUB
)
if not access_token:
raise Exception(
f"Access token not found for org {org_id} and provider {UserIdentityProvider.GITHUB.value}"
)
return access_token

return GithubActionsETLHandler(org_id, GithubApiService(_get_access_token()))
36 changes: 36 additions & 0 deletions apiserver/dora/service/workflows/sync/etl_provider_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from abc import ABC, abstractmethod
from typing import List

from dora.store.models.code import (
OrgRepo,
RepoWorkflow,
RepoWorkflowRunsBookmark,
RepoWorkflowRuns,
)


class WorkflowProviderETLHandler(ABC):
@abstractmethod
def check_pat_validity(self) -> bool:
"""
This method checks if the PAT is valid.
:return: PAT details
:raises: Exception if PAT is invalid
"""
pass

@abstractmethod
def get_workflow_runs(
self,
org_repo: OrgRepo,
repo_workflow: RepoWorkflow,
bookmark: RepoWorkflowRunsBookmark,
) -> List[RepoWorkflowRuns]:
"""
This method returns all workflow runs of a repo's workflow. After the bookmark date.
:param org_repo: OrgRepo object to get workflow runs for
:param repo_workflow: RepoWorkflow object to get workflow runs for
:param bookmark: Bookmark object to get all workflow runs after this date
:return: List of RepoWorkflowRuns objects
"""
pass