Skip to content

Commit 6e7c10c

Browse files
Feature/178 improve performance of fetching missing issues (#187)
* #178 - Improve performance of fetching missing issues - Added multiple threads to fetching of missing issues. - Added multiple threads to record creation.
1 parent 2fd72de commit 6e7c10c

File tree

2 files changed

+210
-102
lines changed

2 files changed

+210
-102
lines changed

release_notes_generator/data/miner.py

Lines changed: 116 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import logging
2222
import sys
2323
import traceback
24+
from concurrent.futures import ThreadPoolExecutor, as_completed
2425
from typing import Optional
2526

2627
import semver
@@ -96,6 +97,9 @@ def mine_missing_sub_issues(self, data: MinedData) -> tuple[dict[Issue, Reposito
9697
logger.info("Mapping sub-issues...")
9798
data.parents_sub_issues = self._scan_sub_issues_for_parents([get_id(i, r) for i, r in data.issues.items()])
9899

100+
logger.info("Fetch all repositories in cache...")
101+
self._fetch_all_repositories_in_cache(data)
102+
99103
logger.info("Fetching missing issues...")
100104
fetched_issues = self._fetch_missing_issues(data)
101105

@@ -126,65 +130,131 @@ def _scan_sub_issues_for_parents(self, parents_to_check: list[str]) -> dict[str,
126130

127131
return parents_sub_issues
128132

129-
def _fetch_missing_issues(self, data: MinedData) -> dict[Issue, Repository]:
133+
def _fetch_all_repositories_in_cache(self, data: MinedData) -> None:
134+
def _check_repo_and_add(iid: str):
135+
org, repo, _num = parse_issue_id(iid)
136+
full_name = f"{org}/{repo}"
137+
if data.get_repository(full_name) is None:
138+
new_repo = self._fetch_repository(full_name)
139+
if new_repo is None:
140+
logger.error("Repository fetch returned None for %s", full_name)
141+
return
142+
143+
data.add_repository(new_repo)
144+
logger.debug("Fetched missing repository: %s", full_name)
145+
146+
# check keys
147+
for iid in data.parents_sub_issues.keys():
148+
_check_repo_and_add(iid)
149+
150+
# check values
151+
for ids in data.parents_sub_issues.values():
152+
for iid in ids:
153+
_check_repo_and_add(iid)
154+
155+
def _fetch_missing_issues(
156+
self,
157+
data: MinedData,
158+
max_workers: int = 8,
159+
) -> dict[Issue, Repository]:
130160
"""
131-
Fetch missing issues.
132-
133-
Parameters:
134-
data (MinedData): The mined data containing origin sets of issues and pull requests.
135-
Returns:
136-
dict[Issue, Repository]: A dictionary mapping fetched issues to their repositories.
161+
Parallel version of _fetch_missing_issues.
162+
Threaded to speed up GitHub API calls while avoiding data races.
137163
"""
138164
fetched_issues: dict[Issue, Repository] = {}
139-
140165
origin_issue_ids = {get_id(i, r) for i, r in data.issues.items()}
141-
issues_for_remove: list[str] = []
142-
for parent_id in data.parents_sub_issues.keys():
143-
if parent_id in origin_issue_ids:
144-
continue
145-
146-
# fetch issue by id
147-
org, repo, num = parse_issue_id(parent_id)
148166

149-
if data.get_repository(f"{org}/{repo}") is None:
150-
new_repo = self._fetch_repository(f"{org}/{repo}")
151-
if new_repo is not None:
152-
# cache for subsequent lookups
153-
data.add_repository(new_repo)
167+
# Worklist: only parents not already present among origin_issue_ids
168+
to_check: list[str] = [pid for pid in data.parents_sub_issues.keys() if pid not in origin_issue_ids]
169+
170+
# Collect IDs to remove (those that don't meet criteria) and errors
171+
issues_for_remove: set[str] = set()
172+
errors: list[tuple[str, str]] = [] # (parent_id, error_msg)
173+
174+
def should_fetch(issue: Issue) -> bool:
175+
# Mirrors original logic
176+
if not issue.closed_at:
177+
return False
178+
if data.since:
179+
# if since > closed_at => skip
180+
if issue.closed_at and data.since > issue.closed_at:
181+
return False
182+
return True
183+
184+
def worker(parent_id: str) -> tuple[str, Optional[Issue], Optional[Repository], Optional[str]]:
185+
"""
186+
Returns (parent_id, issue|None, repo|None, error|None)
187+
- issue=None & error=None => mark for remove (didn't meet criteria)
188+
- issue=None & error!=None => log error
189+
"""
190+
try:
191+
org, repo, num = parse_issue_id(parent_id)
192+
except Exception as e: # defensive
193+
return (parent_id, None, None, f"parse_issue_id failed: {e}")
154194

155-
issue = None
156195
r = data.get_repository(f"{org}/{repo}")
157-
if r is not None:
196+
if r is None:
197+
return (parent_id, None, None, f"Cannot get repository for {org}/{repo}")
198+
199+
# GitHub call
200+
try:
158201
logger.debug("Fetching missing issue: %s", parent_id)
159202
issue = self._safe_call(r.get_issue)(num)
160-
if issue is None:
161-
logger.error("Issue not found: %s", parent_id)
203+
except Exception as e:
204+
return (parent_id, None, r, f"get_issue failed: {e}")
205+
206+
if issue is None:
207+
return (parent_id, None, r, "Issue not found")
208+
209+
# Criteria
210+
if should_fetch(issue):
211+
return (parent_id, issue, r, None)
212+
213+
return (parent_id, None, r, None) # means: mark for remove
214+
215+
if not to_check:
216+
logger.debug("Fetched 0 missing issues (nothing to check).")
217+
return fetched_issues
218+
219+
# Thread pool
220+
with ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="fetch-issue") as ex:
221+
futures = {ex.submit(worker, pid): pid for pid in to_check}
222+
for fut in as_completed(futures):
223+
parent_id = futures[fut]
224+
try:
225+
pid, issue, repo, err = fut.result()
226+
except Exception as e:
227+
errors.append((parent_id, f"worker crash: {e}"))
162228
continue
163229

164-
fetch: bool = True
165-
if not issue.closed_at:
166-
fetch = False
167-
elif data.since:
168-
if issue.closed_at and data.since > issue.closed_at:
169-
fetch = False
230+
if err:
231+
# Log and skip; don't remove mapping unless you’re sure you want to drop errored items
232+
logger.error("Error fetching %s: %s", pid, err)
233+
continue
170234

171-
if fetch:
172-
# add to issues list
173-
fetched_issues[issue] = r
235+
if issue is None:
236+
# Did not meet criteria => schedule removal
237+
issues_for_remove.add(pid)
174238
else:
175-
logger.debug("Skipping issue %s since it does not meet criteria.", parent_id)
176-
issues_for_remove.append(parent_id)
177-
else:
178-
logger.error("Cannot get repository for issue %s. Skipping...", parent_id)
179-
180-
# remove issue which does not meet criteria
181-
for iid in issues_for_remove:
182-
data.parents_sub_issues.pop(iid, None)
183-
for sub_issues in data.parents_sub_issues.values():
184-
if iid in sub_issues:
185-
sub_issues.remove(iid)
186-
187-
logger.debug("Fetched %d missing issues.", len(fetched_issues))
239+
# Add to results
240+
fetched_issues[issue] = repo # type: ignore[assignment]
241+
242+
# Apply removals AFTER parallelism to avoid concurrent mutation
243+
if issues_for_remove:
244+
for iid in issues_for_remove:
245+
data.parents_sub_issues.pop(iid, None)
246+
for sub_issues in data.parents_sub_issues.values():
247+
# parents_sub_issues can be dict[str, list[str]] or now dict[str, str] per your later change;
248+
# if it's list[str], this removal is ok; if changed to str, guard it.
249+
if isinstance(sub_issues, list) and iid in sub_issues:
250+
sub_issues.remove(iid)
251+
252+
logger.debug(
253+
"Fetched %d missing issues in parallel (removed %d).",
254+
len(fetched_issues),
255+
len(issues_for_remove),
256+
)
257+
188258
return fetched_issues
189259

190260
def _fetch_repository(self, full_name: str) -> Optional[Repository]:

release_notes_generator/record/factory/default_record_factory.py

Lines changed: 94 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
"""
2020

2121
import logging
22+
from concurrent.futures import ThreadPoolExecutor
2223
from typing import cast, Optional
2324

2425
from github import Github
@@ -70,45 +71,31 @@ def generate(self, data: MinedData) -> dict[str, Record]:
7071
Returns:
7172
dict[str, Record]: A dictionary of records indexed by their IDs.
7273
"""
73-
logger.debug("Creation of records started...")
74+
logger.info("Creation of records started...")
7475

75-
# Before the loop, compute a flat set of all sub-issue IDs
76-
all_sub_issue_ids = {iid for sublist in data.parents_sub_issues.values() for iid in sublist}
77-
78-
for issue, repo in data.issues.items():
79-
iid = get_id(issue, repo)
80-
81-
if len(data.parents_sub_issues.get(iid, [])) > 0:
82-
# issue has sub-issues - it is either hierarchy issue or sub-hierarchy issue
83-
self._create_record_for_hierarchy_issue(issue, iid)
84-
85-
elif iid in all_sub_issue_ids:
86-
# issue has no sub-issues - it is sub-issue
87-
self._create_record_for_sub_issue(issue, iid)
88-
89-
else:
90-
# issue is not sub-issue and has no sub-issues - it is issue
91-
self._create_record_for_issue(issue, iid)
76+
built = build_issue_records_parallel(self, data, max_workers=8)
77+
self._records.update(built)
78+
self.__registered_issues.update(built.keys())
9279

9380
# dev note: Each issue is now in records dict by its issue number - all on same level - no hierarchy
9481
# --> This is useful for population by PRs and commits
9582

96-
logger.debug("Registering Commits to Pull Requests and Pull Requests to Issues...")
83+
logger.info("Registering Commits to Pull Requests and Pull Requests to Issues...")
9784
for pull, repo in data.pull_requests.items():
9885
self._register_pull_and_its_commits_to_issue(pull, get_id(pull, repo), data, target_repository=repo)
9986

10087
if data.pull_requests_of_fetched_cross_issues:
101-
logger.debug("Register cross-repo Pull Requests to its issues")
88+
logger.info("Register cross-repo Pull Requests to its issues")
10289
for iid, prs in data.pull_requests_of_fetched_cross_issues.items():
10390
self._register_cross_repo_prs_to_issue(iid, prs)
10491

105-
logger.debug("Registering direct commits to records...")
92+
logger.info("Registering direct commits to records...")
10693
for commit, repo in data.commits.items():
10794
if commit.sha not in self.__registered_commits:
10895
self._records[get_id(commit, repo)] = CommitRecord(commit)
10996

11097
# dev note: now we have all PRs and commits registered to issues or as stand-alone records
111-
logger.debug("Building issues hierarchy...")
98+
logger.info("Building issues hierarchy...")
11299

113100
sub_i_ids = list({iid for sublist in data.parents_sub_issues.values() for iid in sublist})
114101
sub_i_prts = {sub_issue: parent for parent, sublist in data.parents_sub_issues.items() for sub_issue in sublist}
@@ -193,26 +180,6 @@ def _register_cross_repo_prs_to_issue(self, iid: str, prs: list[PullRequest]) ->
193180
for pr in prs:
194181
cast(IssueRecord, self._records[iid]).register_pull_request(pr)
195182

196-
def _create_record_for_hierarchy_issue(self, i: Issue, iid: str, issue_labels: Optional[list[str]] = None) -> None:
197-
"""
198-
Create a hierarchy issue record and register sub-issues.
199-
200-
Parameters:
201-
i: The issue to create the record for.
202-
issue_labels: The labels of the issue.
203-
204-
Returns:
205-
None
206-
"""
207-
# check for skip labels presence and skip when detected
208-
if issue_labels is None:
209-
issue_labels = self._get_issue_labels_mix_with_type(i)
210-
skip_record = any(item in issue_labels for item in ActionInputs.get_skip_release_notes_labels())
211-
212-
self._records[iid] = HierarchyIssueRecord(issue=i, skip=skip_record, issue_labels=issue_labels)
213-
self.__registered_issues.add(iid)
214-
logger.debug("Created record for hierarchy issue %s: %s", iid, i.title)
215-
216183
def _get_issue_labels_mix_with_type(self, issue: Issue) -> list[str]:
217184
labels: list[str] = [label.name for label in issue.get_labels()]
218185

@@ -223,20 +190,6 @@ def _get_issue_labels_mix_with_type(self, issue: Issue) -> list[str]:
223190

224191
return labels
225192

226-
def _create_record_for_sub_issue(self, issue: Issue, iid: str, issue_labels: Optional[list[str]] = None) -> None:
227-
if issue_labels is None:
228-
issue_labels = self._get_issue_labels_mix_with_type(issue)
229-
230-
skip_record = any(item in issue_labels for item in ActionInputs.get_skip_release_notes_labels())
231-
logger.debug("Created record for sub issue %s: %s", iid, issue.title)
232-
self.__registered_issues.add(iid)
233-
self._records[iid] = SubIssueRecord(issue, issue_labels, skip_record)
234-
235-
if iid.split("#")[0] == self._home_repository.full_name:
236-
return
237-
238-
self._records[iid].is_cross_repo = True
239-
240193
def _re_register_hierarchy_issues(self, sub_issues_ids: list[str], sub_issue_parents: dict[str, str]):
241194
logger.debug("Re-registering hierarchy issues ...")
242195
reduced_sub_issue_ids: list[str] = sub_issues_ids[:]
@@ -292,3 +245,88 @@ def order_hierarchy_levels(self, level: int = 0) -> None:
292245
top_hierarchy_records = [rec for rec in self._records.values() if isinstance(rec, HierarchyIssueRecord)]
293246
for rec in top_hierarchy_records:
294247
rec.order_hierarchy_levels(level=level)
248+
249+
def build_record_for_hierarchy_issue(self, issue: Issue, issue_labels: Optional[list[str]] = None) -> Record:
250+
"""
251+
Build a hierarchy issue record.
252+
253+
Parameters:
254+
issue (Issue): The issue to build.
255+
issue_labels (list[str]): The labels to use for this issue.
256+
Returns:
257+
Record: The built record.
258+
"""
259+
if issue_labels is None:
260+
issue_labels = self._get_issue_labels_mix_with_type(issue)
261+
skip_record = any(lbl in ActionInputs.get_skip_release_notes_labels() for lbl in issue_labels)
262+
return HierarchyIssueRecord(issue=issue, skip=skip_record, issue_labels=issue_labels)
263+
264+
def build_record_for_sub_issue(self, issue: Issue, iid: str, issue_labels: Optional[list[str]] = None) -> Record:
265+
"""
266+
Build a sub issue record.
267+
268+
Parameters:
269+
issue (Issue): The issue to build.
270+
iid (str): The id to use for this issue.
271+
issue_labels (list[str]): The labels to use for this issue.
272+
Returns:
273+
Record: The built record.
274+
"""
275+
if issue_labels is None:
276+
issue_labels = self._get_issue_labels_mix_with_type(issue)
277+
skip_record = any(lbl in ActionInputs.get_skip_release_notes_labels() for lbl in issue_labels)
278+
rec = SubIssueRecord(issue, issue_labels, skip_record)
279+
# preserve cross-repo flag behavior
280+
if iid.split("#")[0] != self._home_repository.full_name:
281+
rec.is_cross_repo = True
282+
return rec
283+
284+
def build_record_for_issue(self, issue: Issue, issue_labels: Optional[list[str]] = None) -> Record:
285+
"""
286+
Build an issue record.
287+
288+
Parameters:
289+
issue (Issue): The issue to build.
290+
issue_labels (list[str]): The labels to use for this issue.
291+
Returns:
292+
Record: The built record.
293+
"""
294+
if issue_labels is None:
295+
issue_labels = self._get_issue_labels_mix_with_type(issue)
296+
skip_record = any(lbl in ActionInputs.get_skip_release_notes_labels() for lbl in issue_labels)
297+
return IssueRecord(issue=issue, skip=skip_record, issue_labels=issue_labels)
298+
299+
300+
def build_issue_records_parallel(gen, data, max_workers: int = 8) -> dict[str, "Record"]:
301+
"""
302+
Build issue records in parallel with no side effects on `gen`.
303+
Returns: {iid: Record}
304+
"""
305+
parents_sub_issues = data.parents_sub_issues # read-only snapshot for this phase
306+
all_sub_issue_ids = {iid for subs in parents_sub_issues.values() for iid in subs}
307+
issues_items = list(data.issues.items()) # snapshot
308+
309+
def _classify_and_build(issue, repo) -> tuple[str, "Record"]:
310+
iid = get_id(issue, repo)
311+
312+
# classification
313+
if len(parents_sub_issues.get(iid, [])) > 0:
314+
# hierarchy node (has sub-issues)
315+
rec = gen.build_record_for_hierarchy_issue(issue)
316+
elif iid in all_sub_issue_ids:
317+
# leaf sub-issue
318+
rec = gen.build_record_for_sub_issue(issue, iid)
319+
else:
320+
# plain issue
321+
rec = gen.build_record_for_issue(issue)
322+
return iid, rec
323+
324+
results: dict[str, "Record"] = {}
325+
if not issues_items:
326+
return results
327+
328+
with ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="build-issue-rec") as ex:
329+
for iid, rec in ex.map(lambda ir: _classify_and_build(*ir), issues_items):
330+
results[iid] = rec
331+
332+
return results

0 commit comments

Comments
 (0)