Skip to content

Commit 0d1dd57

Browse files
authored
Merge pull request #33 from middlewarehq/GROW-1224
Grow 1224: Add MTD Broker
2 parents 4eab3c0 + d087cc9 commit 0d1dd57

File tree

10 files changed

+279
-21
lines changed

10 files changed

+279
-21
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from sync import sync_code_repos
2+
from .integration import get_code_integration_service
3+
from .pr_filter import apply_pr_filter

apiserver/dora/service/code/pr_filter.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
1-
from dataclasses import dataclass
21
from typing import List, Dict, Any
32

4-
from sqlalchemy import and_, or_
5-
from sqlalchemy.dialects.postgresql import Any
63
from dora.service.settings.configuration_settings import get_settings_service
74
from dora.service.settings.models import ExcludedPRsSetting
8-
95
from dora.store.models.code import PRFilter
106
from dora.store.models.settings.configuration_settings import SettingType
117
from dora.store.models.settings.enums import EntityType

apiserver/dora/service/code/sync/etl_handler.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88
CodeProviderETLHandler,
99
CodeETLFactory,
1010
)
11+
from dora.service.merge_to_deploy_broker import (
12+
get_merge_to_deploy_broker_utils_service,
13+
MergeToDeployBrokerUtils,
14+
)
1115
from dora.store.models.code import OrgRepo, BookmarkType, Bookmark, PullRequest
1216
from dora.store.repos.code import CodeRepoService
1317
from dora.utils.log import LOG
@@ -18,9 +22,11 @@ def __init__(
1822
self,
1923
code_repo_service: CodeRepoService,
2024
etl_service: CodeProviderETLHandler,
25+
mtd_broker: MergeToDeployBrokerUtils,
2126
):
2227
self.code_repo_service = code_repo_service
2328
self.etl_service = etl_service
29+
self.mtd_broker = mtd_broker
2430

2531
def sync_org_repos(self, org_id: str):
2632
org_repos: List[OrgRepo] = self._sync_org_repos(org_id)
@@ -58,6 +64,9 @@ def _sync_repo_pull_requests_data(self, org_repo: OrgRepo) -> None:
5864
pull_requests[-1].state_changed_at.astimezone(tz=pytz.UTC).isoformat()
5965
)
6066
self.code_repo_service.update_org_repo_bookmark(bookmark)
67+
self.mtd_broker.pushback_merge_to_deploy_bookmark(
68+
str(org_repo.id), pull_requests
69+
)
6170
self.__sync_revert_prs_mapping(org_repo, pull_requests)
6271
except Exception as e:
6372
LOG.error(f"Error syncing pull requests for repo {org_repo.name}: {str(e)}")
@@ -91,13 +100,16 @@ def __get_org_repo_bookmark(self, org_repo: OrgRepo, default_sync_days: int = 31
91100

92101

93102
def sync_code_repos(org_id: str):
94-
code_integration_service = get_code_integration_service()
95-
code_repo_service = CodeRepoService()
103+
code_providers: List[str] = get_code_integration_service().get_org_providers(org_id)
96104
etl_factory = CodeETLFactory(org_id)
97-
for provider in code_integration_service.get_org_providers(org_id):
105+
106+
for provider in code_providers:
98107
try:
99-
etl_handler = etl_factory(provider)
100-
code_etl_handler = CodeETLHandler(code_repo_service, etl_handler)
108+
code_etl_handler = CodeETLHandler(
109+
CodeRepoService(),
110+
etl_factory(provider),
111+
get_merge_to_deploy_broker_utils_service(),
112+
)
101113
code_etl_handler.sync_org_repos(org_id)
102114
LOG.info(f"Synced org repos for provider {provider}")
103115
except Exception as e:
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from .deployment_pr_mapper import DeploymentPRMapperService
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
from .mtd_handler import process_merge_to_deploy_cache
2+
from .utils import get_merge_to_deploy_broker_utils_service, MergeToDeployBrokerUtils
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
from datetime import datetime
2+
from typing import List
3+
4+
from dora.service.deployments import DeploymentPRMapperService
5+
from dora.store.models.code import (
6+
PullRequest,
7+
OrgRepo,
8+
RepoWorkflow,
9+
BookmarkMergeToDeployBroker,
10+
RepoWorkflowRuns,
11+
RepoWorkflowRunsStatus,
12+
)
13+
from dora.store.repos.code import CodeRepoService
14+
from dora.store.repos.workflows import WorkflowRepoService
15+
from dora.utils.lock import RedisLockService, get_redis_lock_service
16+
17+
DEPLOYMENTS_TO_PROCESS = 500
18+
19+
20+
class MergeToDeployCacheHandler:
21+
def __init__(
22+
self,
23+
org_id: str,
24+
code_repo_service: CodeRepoService,
25+
workflow_repo_service: WorkflowRepoService,
26+
deployment_pr_mapper_service: DeploymentPRMapperService,
27+
redis_lock_service: RedisLockService,
28+
):
29+
self.org_id = org_id
30+
self.code_repo_service = code_repo_service
31+
self.workflow_repo_service = workflow_repo_service
32+
self.deployment_pr_mapper_service = deployment_pr_mapper_service
33+
self.redis_lock_service = redis_lock_service
34+
35+
def process_org_mtd(self):
36+
org_repos: List[OrgRepo] = self.code_repo_service.get_active_org_repos(
37+
self.org_id
38+
)
39+
for org_repo in org_repos:
40+
try:
41+
with self.redis_lock_service.acquire_lock(
42+
"{org_repo}:" + f"{str(org_repo.id)}:merge_to_deploy_broker"
43+
):
44+
self._process_deployments_for_merge_to_deploy_caching(
45+
str(org_repo.id)
46+
)
47+
except Exception as e:
48+
print(f"Error syncing workflow for repo {str(org_repo.id)}: {str(e)}")
49+
continue
50+
51+
def _process_deployments_for_merge_to_deploy_caching(self, repo_id: str):
52+
org_repo: OrgRepo = self.code_repo_service.get_repo_by_id(repo_id)
53+
if not org_repo:
54+
Exception(f"Repo with {repo_id} not found")
55+
56+
repo_workflows: List[
57+
RepoWorkflow
58+
] = self.workflow_repo_service.get_repo_workflows_by_repo_id(repo_id)
59+
if not repo_workflows:
60+
return
61+
62+
broker_bookmark: BookmarkMergeToDeployBroker = (
63+
self.code_repo_service.get_merge_to_deploy_broker_bookmark(repo_id)
64+
)
65+
if not broker_bookmark:
66+
broker_bookmark = BookmarkMergeToDeployBroker(repo_id=repo_id)
67+
68+
bookmark_time: datetime = broker_bookmark.bookmark_date
69+
70+
repo_workflow_runs: List[
71+
RepoWorkflowRuns
72+
] = self.workflow_repo_service.get_repo_workflow_runs_conducted_after_time(
73+
repo_id, bookmark_time, DEPLOYMENTS_TO_PROCESS
74+
)
75+
76+
if not repo_workflow_runs:
77+
return
78+
79+
for repo_workflow_run in repo_workflow_runs:
80+
try:
81+
self.code_repo_service.get_merge_to_deploy_broker_bookmark(repo_id)
82+
self._cache_prs_merge_to_deploy_for_repo_workflow_run(
83+
repo_id, repo_workflow_run
84+
)
85+
conducted_at: datetime = repo_workflow_run.conducted_at
86+
broker_bookmark.bookmark = conducted_at.isoformat()
87+
self.code_repo_service.update_merge_to_deploy_broker_bookmark(
88+
broker_bookmark
89+
)
90+
except Exception as e:
91+
raise Exception(f"Error caching prs for repo {repo_id}: {str(e)}")
92+
93+
def _cache_prs_merge_to_deploy_for_repo_workflow_run(
94+
self, repo_id: str, repo_workflow_run: RepoWorkflowRuns
95+
):
96+
if repo_workflow_run.status != RepoWorkflowRunsStatus.SUCCESS:
97+
return
98+
99+
conducted_at: datetime = repo_workflow_run.conducted_at
100+
relevant_prs: List[
101+
PullRequest
102+
] = self.code_repo_service.get_prs_in_repo_merged_before_given_date_with_merge_to_deploy_as_null(
103+
repo_id, conducted_at
104+
)
105+
prs_to_update: List[
106+
PullRequest
107+
] = self.deployment_pr_mapper_service.get_all_prs_deployed(
108+
relevant_prs, repo_workflow_run
109+
)
110+
111+
for pr in prs_to_update:
112+
pr.merge_to_deploy = int(
113+
(conducted_at - pr.state_changed_at).total_seconds()
114+
)
115+
self.code_repo_service.update_prs(prs_to_update)
116+
117+
118+
def process_merge_to_deploy_cache(org_id: str):
119+
merge_to_deploy_cache_handler = MergeToDeployCacheHandler(
120+
org_id,
121+
CodeRepoService(),
122+
WorkflowRepoService(),
123+
DeploymentPRMapperService(),
124+
get_redis_lock_service(),
125+
)
126+
merge_to_deploy_cache_handler.process_org_mtd()
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
from datetime import datetime
2+
from typing import List
3+
4+
from dora.store.models.code import (
5+
PullRequest,
6+
PullRequestState,
7+
BookmarkMergeToDeployBroker,
8+
)
9+
from dora.store.repos.code import CodeRepoService
10+
from dora.utils.lock import get_redis_lock_service, RedisLockService
11+
12+
13+
class MergeToDeployBrokerUtils:
14+
def __init__(
15+
self, code_repo_service: CodeRepoService, redis_lock_service: RedisLockService
16+
):
17+
self.code_repo_service = code_repo_service
18+
self.redis_lock_service = redis_lock_service
19+
20+
def pushback_merge_to_deploy_bookmark(self, repo_id: str, prs: List[PullRequest]):
21+
with self.redis_lock_service.acquire_lock(
22+
"{org_repo}:" + f"{repo_id}:merge_to_deploy_broker"
23+
):
24+
self._pushback_merge_to_deploy_bookmark(repo_id, prs)
25+
26+
def _pushback_merge_to_deploy_bookmark(self, repo_id: str, prs: List[PullRequest]):
27+
merged_prs = [pr for pr in prs if pr.state == PullRequestState.MERGED]
28+
if not merged_prs:
29+
return
30+
31+
min_merged_time: datetime = min([pr.state_changed_at for pr in merged_prs])
32+
33+
merge_to_deploy_broker_bookmark: BookmarkMergeToDeployBroker = (
34+
self.code_repo_service.get_merge_to_deploy_broker_bookmark(repo_id)
35+
)
36+
if not merge_to_deploy_broker_bookmark:
37+
merge_to_deploy_broker_bookmark = BookmarkMergeToDeployBroker(
38+
repo_id=repo_id, bookmark=min_merged_time.isoformat()
39+
)
40+
41+
self.code_repo_service.update_merge_to_deploy_broker_bookmark(
42+
merge_to_deploy_broker_bookmark
43+
)
44+
45+
46+
def get_merge_to_deploy_broker_utils_service():
47+
return MergeToDeployBrokerUtils(
48+
CodeRepoService(), redis_lock_service=get_redis_lock_service()
49+
)

apiserver/dora/service/workflows/sync/etl_handler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from typing import List, Tuple
33
from uuid import uuid4
44

5-
from dora.service.code.integration import get_code_integration_service
5+
from dora.service.code import get_code_integration_service
66
from dora.service.workflows.integration import get_workflows_integrations_service
77
from dora.service.workflows.sync.etl_provider_handler import WorkflowProviderETLHandler
88
from dora.store.models.code import (

apiserver/dora/store/repos/code.py

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from datetime import datetime
12
from operator import and_
23
from typing import Optional, List
34

@@ -16,6 +17,7 @@
1617
TeamRepos,
1718
PullRequestState,
1819
PRFilter,
20+
BookmarkMergeToDeployBroker,
1921
)
2022

2123

@@ -51,6 +53,11 @@ def save_pull_requests_data(
5153
]
5254
session.commit()
5355

56+
@rollback_on_exc
57+
def update_prs(self, prs: List[PullRequest]):
58+
[session.merge(pr) for pr in prs]
59+
session.commit()
60+
5461
@rollback_on_exc
5562
def save_revert_pr_mappings(
5663
self, revert_pr_mappings: List[PullRequestRevertPRMapping]
@@ -84,7 +91,7 @@ def get_repo_by_id(self, repo_id: str) -> Optional[OrgRepo]:
8491
def get_repo_pr_by_number(self, repo_id: str, pr_number) -> Optional[PullRequest]:
8592
return (
8693
session.query(PullRequest)
87-
.options(defer("data"))
94+
.options(defer(PullRequest.data))
8895
.filter(
8996
and_(
9097
PullRequest.repo_id == repo_id, PullRequest.number == str(pr_number)
@@ -100,7 +107,7 @@ def get_pr_events(self, pr_model: PullRequest):
100107

101108
pr_events = (
102109
session.query(PullRequestEvent)
103-
.options(defer("data"))
110+
.options(defer(PullRequestEvent.data))
104111
.filter(PullRequestEvent.pull_request_id == pr_model.id)
105112
.all()
106113
)
@@ -121,7 +128,7 @@ def get_prs_by_head_branch_match_strings(
121128
) -> List[PullRequest]:
122129
query = (
123130
session.query(PullRequest)
124-
.options(defer("data"))
131+
.options(defer(PullRequest.data))
125132
.filter(
126133
and_(
127134
PullRequest.repo_id.in_(repo_ids),
@@ -144,7 +151,7 @@ def get_reverted_prs_by_numbers(
144151
) -> List[PullRequest]:
145152
query = (
146153
session.query(PullRequest)
147-
.options(defer("data"))
154+
.options(defer(PullRequest.data))
148155
.filter(
149156
and_(
150157
PullRequest.repo_id.in_(repo_ids),
@@ -156,13 +163,6 @@ def get_reverted_prs_by_numbers(
156163

157164
return query.all()
158165

159-
@rollback_on_exc
160-
def save_revert_pr_mappings(
161-
self, revert_pr_mappings: List[PullRequestRevertPRMapping]
162-
):
163-
[session.merge(revert_pr_map) for revert_pr_map in revert_pr_mappings]
164-
session.commit()
165-
166166
@rollback_on_exc
167167
def get_active_team_repos_by_team_id(self, team_id: str) -> List[TeamRepos]:
168168
return (
@@ -255,6 +255,39 @@ def get_team_repos(self, team_id) -> List[OrgRepo]:
255255
team_repo_ids = [tr.org_repo_id for tr in team_repos]
256256
return self.get_repos_by_ids(team_repo_ids)
257257

258+
@rollback_on_exc
259+
def get_merge_to_deploy_broker_bookmark(
260+
self, repo_id: str
261+
) -> BookmarkMergeToDeployBroker:
262+
return (
263+
session.query(BookmarkMergeToDeployBroker)
264+
.filter(BookmarkMergeToDeployBroker.repo_id == repo_id)
265+
.one_or_none()
266+
)
267+
268+
@rollback_on_exc
269+
def update_merge_to_deploy_broker_bookmark(
270+
self, bookmark: BookmarkMergeToDeployBroker
271+
):
272+
session.merge(bookmark)
273+
session.commit()
274+
275+
@rollback_on_exc
276+
def get_prs_in_repo_merged_before_given_date_with_merge_to_deploy_as_null(
277+
self, repo_id: str, to_time: datetime
278+
):
279+
return (
280+
session.query(PullRequest)
281+
.options(defer(PullRequest.data))
282+
.filter(
283+
PullRequest.repo_id == repo_id,
284+
PullRequest.state == PullRequestState.MERGED,
285+
PullRequest.state_changed_at <= to_time,
286+
PullRequest.merge_to_deploy.is_(None),
287+
)
288+
.all()
289+
)
290+
258291
def _filter_prs_by_repo_ids(self, query, repo_ids: List[str]):
259292
return query.filter(PullRequest.repo_id.in_(repo_ids))
260293

0 commit comments

Comments
 (0)