Skip to content

Track Accurate PR cycle timing #661

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jul 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 111 additions & 1 deletion backend/analytics_server/mhq/exapi/github.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
import contextlib
from datetime import datetime
from http import HTTPStatus
from typing import Optional, Dict, Tuple, List
from typing import Optional, Dict, Tuple, List, cast

import requests

from github import Github, UnknownObjectException
from github.GithubException import GithubException
from github.Organization import Organization as GithubOrganization
from github.PaginatedList import PaginatedList as GithubPaginatedList
from github.PullRequest import PullRequest as GithubPullRequest
from github.Repository import Repository as GithubRepository

from mhq.exapi.schemas.timeline import (
GitHubPullTimelineEvent,
GitHubPrTimelineEventsDict,
)
from mhq.exapi.models.github import GitHubContributor
from mhq.exapi.models.github_timeline import GithubPullRequestTimelineEvents
from mhq.utils.log import LOG

PAGE_SIZE = 100
Expand Down Expand Up @@ -271,3 +277,107 @@ def _fetch_workflow_runs(page: int = 1):
page += 1
data = _fetch_workflow_runs(page=page)
return repo_workflows

def _fetch_timeline_events(
self, repo_name: str, pr_number: int, page: int = 1
) -> List[Dict]:
github_url = f"{self.base_url}/repos/{repo_name}/issues/{pr_number}/timeline"
query_params = {"per_page": PAGE_SIZE, "page": page}

try:
response = requests.get(
github_url, headers=self.headers, params=query_params
)
except requests.RequestException as e:
raise GithubException(
HTTPStatus.SERVICE_UNAVAILABLE, f"Network error: {str(e)}"
) from e

if response.status_code == HTTPStatus.NOT_FOUND:
raise GithubException(
HTTPStatus.NOT_FOUND,
f"PR {pr_number} not found for repo {repo_name}",
)

if response.status_code == HTTPStatus.FORBIDDEN:
raise GithubRateLimitExceeded("GitHub API rate limit exceeded")

if response.status_code != HTTPStatus.OK:
raise GithubException(
response.status_code,
f"Failed to fetch timeline events: {response.text}",
)

try:
return response.json()
except ValueError as e:
raise GithubException(
HTTPStatus.INTERNAL_SERVER_ERROR, f"Invalid JSON response: {str(e)}"
) from e

def _create_timeline_event(self, event_data: Dict) -> GitHubPrTimelineEventsDict:
return GitHubPrTimelineEventsDict(
event=event_data.get("event", ""),
data=cast(GitHubPullTimelineEvent, event_data),
)

def get_pr_timeline_events(
self, repo_name: str, pr_number: int
) -> List[GithubPullRequestTimelineEvents]:

all_timeline_events: List[GitHubPrTimelineEventsDict] = []
page = 1

try:
while True:
timeline_events = self._fetch_timeline_events(
repo_name, pr_number, page
)
if not timeline_events:
break

all_timeline_events.extend(
[
self._create_timeline_event(event_data)
for event_data in timeline_events
]
)

if len(timeline_events) < PAGE_SIZE:
break
page += 1

except GithubException:
raise
except Exception as e:
raise GithubException(
HTTPStatus.INTERNAL_SERVER_ERROR, f"Unexpected error: {str(e)}"
) from e

return self._adapt_github_timeline_events(all_timeline_events)

@staticmethod
def _adapt_github_timeline_events(
timeline_events: List[GitHubPrTimelineEventsDict],
) -> List[GithubPullRequestTimelineEvents]:
adapted_timeline_events: List[GithubPullRequestTimelineEvents] = []

for timeline_event in timeline_events:
event_data = timeline_event.get("data")
if not event_data:
continue

event_type = timeline_event.get("event")
if not event_type:
continue

event = GithubPullRequestTimelineEvents(event_type, event_data)

if all([event.timestamp, event.type, event.id, event.user]):
adapted_timeline_events.append(event)
else:
LOG.warning(
f"Skipping incomplete timeline event: {event_type} with id: {event.id}"
)

return adapted_timeline_events
139 changes: 139 additions & 0 deletions backend/analytics_server/mhq/exapi/models/github_timeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
from datetime import datetime
from dataclasses import dataclass
from typing import Any, Optional, Dict, cast


from mhq.exapi.schemas.timeline import (
GitHubPullTimelineEvent,
)
from mhq.store.models.code.enums import PullRequestEventType
from mhq.utils.log import LOG
from mhq.utils.time import dt_from_iso_time_string


@dataclass
class GithubPullRequestTimelineEventConfig:
actor_path: str
timestamp_field: str
id_path: str


@dataclass
class GithubPullRequestTimelineEvents:
REVIEWED_CONFIG = GithubPullRequestTimelineEventConfig(
actor_path="user", timestamp_field="submitted_at", id_path="id"
)

READY_FOR_REVIEW_CONFIG = GithubPullRequestTimelineEventConfig(
actor_path="actor", timestamp_field="created_at", id_path="id"
)

COMMENTED_CONFIG = GithubPullRequestTimelineEventConfig(
actor_path="user", timestamp_field="created_at", id_path="id"
)

COMMITTED_CONFIG = GithubPullRequestTimelineEventConfig(
actor_path="author.name", timestamp_field="author.date", id_path="sha"
)

DEFAULT_CONFIG = GithubPullRequestTimelineEventConfig(
actor_path="actor", timestamp_field="created_at", id_path="id"
)

EVENT_CONFIG = {
"reviewed": REVIEWED_CONFIG,
"ready_for_review": READY_FOR_REVIEW_CONFIG,
"commented": COMMENTED_CONFIG,
"committed": COMMITTED_CONFIG,
"default": DEFAULT_CONFIG,
}
EVENT_TYPE_MAPPING = {
"assigned": PullRequestEventType.ASSIGNED,
"closed": PullRequestEventType.CLOSED,
"commented": PullRequestEventType.COMMENTED,
"committed": PullRequestEventType.COMMITTED,
"convert_to_draft": PullRequestEventType.CONVERT_TO_DRAFT,
"head_ref_deleted": PullRequestEventType.HEAD_REF_DELETED,
"head_ref_force_pushed": PullRequestEventType.HEAD_REF_FORCE_PUSHED,
"labeled": PullRequestEventType.LABELED,
"locked": PullRequestEventType.LOCKED,
"merged": PullRequestEventType.MERGED,
"ready_for_review": PullRequestEventType.READY_FOR_REVIEW,
"referenced": PullRequestEventType.REFERENCED,
"reopened": PullRequestEventType.REOPENED,
"review_dismissed": PullRequestEventType.REVIEW_DISMISSED,
"review_requested": PullRequestEventType.REVIEW_REQUESTED,
"review_request_removed": PullRequestEventType.REVIEW_REQUEST_REMOVED,
"reviewed": PullRequestEventType.REVIEW,
"unassigned": PullRequestEventType.UNASSIGNED,
"unlabeled": PullRequestEventType.UNLABELED,
"unlocked": PullRequestEventType.UNLOCKED,
}

def __init__(self, event_type: str, data: GitHubPullTimelineEvent):
self.event_type = event_type
self.data = data

def _get_nested_value(self, path: str) -> Optional[Any]:
keys = path.split(".")
current = self.data

for key in keys:
if isinstance(current, dict) and key in current:
current = current[key]
else:
return None
return current

@property
def user(self) -> Optional[str]:
config = self.EVENT_CONFIG.get(self.event_type, self.EVENT_CONFIG["default"])
actor_path = config.actor_path

if not actor_path:
return None

if self.event_type == "committed":
return self._get_nested_value(actor_path)

user_data = self._get_nested_value(actor_path)
if not user_data:
return None
if isinstance(user_data, dict) and "login" in user_data:
return user_data["login"]
if hasattr(user_data, "login"):
return user_data.login

LOG.warning(
f"User data does not contain login field for event type: {self.event_type}"
)
return None

@property
def timestamp(self) -> Optional[datetime]:
config = self.EVENT_CONFIG.get(self.event_type, self.EVENT_CONFIG["default"])
timestamp_field = config.timestamp_field
timestamp_value = self._get_nested_value(timestamp_field)

if timestamp_value:
timestamp_str = str(timestamp_value)
return dt_from_iso_time_string(timestamp_str)
return None

@property
def raw_data(self) -> Dict:
return cast(Dict[str, Any], self.data)

@property
def id(self) -> Optional[str]:
config = self.EVENT_CONFIG.get(self.event_type, self.EVENT_CONFIG["default"])
id_path = config.id_path
id_value = self._get_nested_value(id_path)
return str(id_value) if id_value is not None else None

@property
def type(self) -> Optional[PullRequestEventType]:

return self.EVENT_TYPE_MAPPING.get(
self.event_type, PullRequestEventType.UNKNOWN
)
Empty file.
Loading