Skip to content

Webhook API to receive and process Workflow payload #673

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: deployments-webhook
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions backend/analytics_server/app.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from os import getenv

from flask import Flask
from flask import Flask, jsonify

from env import load_app_env

Expand All @@ -16,8 +16,9 @@
from mhq.api.teams import app as teams_api
from mhq.api.bookmark import app as bookmark_api
from mhq.api.ai.dora_ai import app as ai_api

from mhq.api.webhooks import app as webhook_api
from mhq.store.initialise_db import initialize_database
from mhq.exceptions.webhook import WebhookException

ANALYTICS_SERVER_PORT = getenv("ANALYTICS_SERVER_PORT")

Expand All @@ -32,9 +33,22 @@
app.register_blueprint(teams_api)
app.register_blueprint(bookmark_api)
app.register_blueprint(ai_api)
app.register_blueprint(webhook_api)

configure_db_with_app(app)
initialize_database(app)


# Webhook Error handler
@app.errorhandler(WebhookException)
def handle_webhook_exception(e):
error_details = {
"error": e.message,
"resolution": e.resolution,
"exception_type": e.__class__.__name__,
}
return jsonify(error_details), 200


if __name__ == "__main__":
app.run(port=ANALYTICS_SERVER_PORT)
30 changes: 30 additions & 0 deletions backend/analytics_server/mhq/api/webhooks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from flask import Blueprint, request
from typing import Any, Dict, List
from mhq.service.query_validator import get_query_validator
from mhq.store.models.webhooks.enums import WebhookEventRequestType
from mhq.service.webhooks.factory import WebhookEventFactory
from mhq.service.queue.tasks import WebhookQueue

app = Blueprint("webhooks", __name__)


@app.route("/public/webhook/<event_type>", methods={"POST"})
def receive_webhook_workflows(event_type: str):
webhook_event_type = WebhookEventRequestType(event_type)
secret_key = request.headers.get("X-API-KEY")

query_validator = get_query_validator()
default_org = query_validator.get_default_org()
org_id = str(default_org.id)
query_validator.api_key_validator(secret_key, org_id)

webhook_event_factory = WebhookEventFactory()
webhook_service = webhook_event_factory(webhook_event_type)

payload: Dict[str, List[Any]] = request.get_json()
webhook_service.validate_payload(payload)
webhook_event_id = webhook_service.save_webhook_event(org_id, payload)

job_id = WebhookQueue.enqueue_webhook.defer(webhook_event_id=str(webhook_event_id))

return {"message": "Job enqueued successfully", "job_id": job_id}, 200
44 changes: 44 additions & 0 deletions backend/analytics_server/mhq/exceptions/webhook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Base exception class for webhook-related errors
class WebhookException(Exception):
def __init__(self, message: str, resolution: str):
self.message = message
self.resolution = resolution
super().__init__(self.message)


class InvalidApiKeyError(WebhookException):
def __init__(self):
super().__init__(
message="Invalid or missing API key",
resolution=(
"Ensure you are passing the correct API key in the `X-API-KEY` header. "
"To generate a new key, navigate to Manage Integrations → Webhook → Setup."
),
)


class PayloadLimitExceededError(WebhookException):
def __init__(self):
super().__init__(
message="Payload exceeds the allowed size limit.",
resolution="Only a maximum of 500 records is allowed per request.",
)

Comment on lines +9 to +26
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice


class InvalidPayloadError(WebhookException):
def __init__(self, message: str = "Invalid JSON payload received."):
super().__init__(
message=message,
resolution=(
"Please ensure your payload follows the correct format. "
"You can review the expected structure under Manage Integrations → Webhook → Setup."
),
)


class InvalidEventTypeError(WebhookException):
def __init__(self, event_type: str):
super().__init__(
message=f"Invalid webhook event type received: '{event_type}'",
resolution="Ensure the URL path ends with either `/workflow` or `/incident`",
)
27 changes: 26 additions & 1 deletion backend/analytics_server/mhq/service/code/integration.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List
from typing import Dict, List

from mhq.store.models import UserIdentityProvider, Integration
from mhq.store.repos.core import CoreRepoService
Expand All @@ -8,6 +8,11 @@
UserIdentityProvider.GITLAB.value,
]

PROVIDER_TO_DEFAULT_DOMAIN_URL_MAP: Dict[str, str] = {
UserIdentityProvider.GITHUB.value: "https://github.com",
UserIdentityProvider.GITLAB.value: "https://gitlab.com",
}


class CodeIntegrationService:
def __init__(self, core_repo_service: CoreRepoService):
Expand All @@ -23,6 +28,26 @@ def get_org_providers(self, org_id: str) -> List[str]:
return []
return [integration.name for integration in integrations]

def get_domain_url_to_provider_map(self, org_id: str) -> Dict[str, str]:
domain_url_to_provider_map: Dict[str, str] = {}
integrations: List[Integration] = (
self.core_repo_service.get_org_integrations_for_names(
org_id, CODE_INTEGRATION_BUCKET
)
)

for integration in integrations:
web_url = (
integration.provider_meta.get("custom_domain")
if integration.provider_meta
else None
)
if not web_url:
web_url = PROVIDER_TO_DEFAULT_DOMAIN_URL_MAP.get(integration.name)
domain_url_to_provider_map[web_url] = integration.name

return domain_url_to_provider_map


def get_code_integration_service():
return CodeIntegrationService(core_repo_service=CoreRepoService())
17 changes: 17 additions & 0 deletions backend/analytics_server/mhq/service/code/repository_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from mhq.store.models.incidents import OrgIncidentService, IncidentSource
from mhq.utils.time import time_now
from mhq.utils.string import uuid4_str
from mhq.utils.repo import get_tuple_to_repo_url_map
from mhq.service.code.models.org_repo import RawTeamOrgRepo
from mhq.store.models.code import OrgRepo
from mhq.store.models.core import Team
Expand Down Expand Up @@ -38,6 +39,22 @@ def get_repo_id_team_repos_map(

return {str(repo.org_repo_id): repo for repo in team_repos}

def get_repo_url_to_repo_id_map(
self, org_id: str, repo_urls: List[str]
) -> Dict[str, str]:
repo_url_to_repo_id_map: Dict[str, str] = {}
tuple_to_repo_url_map = get_tuple_to_repo_url_map(org_id, repo_urls)
org_repos = self._code_repo_service.get_org_repos_by_org_repo_tuples(
org_id=org_id, tuples=list(tuple_to_repo_url_map.keys())
)

for repo in org_repos:
tuple = (repo.provider, repo.org_name, repo.name)
repo_url = tuple_to_repo_url_map[tuple]
repo_url_to_repo_id_map[repo_url] = str(repo.id)

return repo_url_to_repo_id_map

def update_team_repos(
self, team: Team, raw_org_repos: List[RawTeamOrgRepo]
) -> List[OrgRepo]:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
from typing import List, Tuple
from mhq.store.models.code.workflows import RepoWorkflowType, RepoWorkflow
from typing import Dict, List, Optional, Tuple
from mhq.store.models.code.workflows import (
RepoWorkflowType,
RepoWorkflow,
RepoWorkflowRuns,
RepoWorkflowProviders,
)

from .factory import get_deployments_factory
from .deployments_factory_service import DeploymentsFactoryService
from mhq.store.models.code.filter import PRFilter
from mhq.store.models.code.repository import TeamRepos
from mhq.store.models.code.workflows.filter import WorkflowFilter
from mhq.service.deployments.models.models import Deployment, DeploymentType

from mhq.store.repos.code import CodeRepoService
from mhq.store.repos.workflows import WorkflowRepoService
from mhq.utils.time import Interval
Expand Down Expand Up @@ -161,6 +165,35 @@ def _sort_deployments_by_date(
) -> List[Deployment]:
return sorted(deployments, key=lambda deployment: deployment.conducted_at)

def get_repo_workflow_by_provider_workflow_id(
self, repo_id: str, provider: RepoWorkflowProviders, provider_workflow_id: str
) -> Optional[RepoWorkflow]:
return self.workflow_repo_service.get_repo_workflow_by_provider_workflow_id(
repo_id=repo_id,
provider=provider,
provider_workflow_id=provider_workflow_id,
)

def get_workflow_run_id_to_workflow_map(
self, repo_workflow_id: str
) -> Dict[str, RepoWorkflowRuns]:
repo_workflow_runs = self.workflow_repo_service.get_repo_workflow_runs(
repo_workflow_id
)
return {
workflow_run.provider_workflow_run_id: workflow_run
for workflow_run in repo_workflow_runs
}

def save_repo_workflow_and_workflow_runs(
self,
repo_workflows: List[RepoWorkflow],
repo_workflow_runs: List[RepoWorkflowRuns],
):
return self.workflow_repo_service.save_repo_workflow_and_workflow_runs(
repo_workflows, repo_workflow_runs
)


def get_deployments_service() -> DeploymentsService:
return DeploymentsService(
Expand Down
12 changes: 12 additions & 0 deletions backend/analytics_server/mhq/service/query_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
from typing import List

from werkzeug.exceptions import NotFound, BadRequest
from mhq.exceptions.webhook import InvalidApiKeyError

from mhq.store.models.core import Organization, Team, Users
from mhq.store.repos.core import CoreRepoService
from mhq.store.models import UserIdentityProvider
from mhq.utils.time import Interval

DEFAULT_ORG_NAME = "default"
Expand Down Expand Up @@ -72,6 +74,16 @@ def users_validator(self, user_ids: List[str]) -> List[Users]:

return users

def api_key_validator(self, secret_key: str | None, org_id: str) -> str:
api_key = self.repo_service.get_access_token(
org_id, UserIdentityProvider.WEBHOOK
)

if not api_key or api_key != secret_key:
raise InvalidApiKeyError()

return api_key


def get_query_validator():
return QueryValidator(CoreRepoService())
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from mhq.service.webhooks.factory import WebhookEventFactory
from mhq.service.webhooks.webhook_event_service import (
get_webhook_service,
WebhookEventService,
)
import traceback


class WebhookQueueHandler:
def __init__(self, webhooks_service: WebhookEventService):
self.webhooks_service = webhooks_service

def webhook_receiver_handler(self, webhook_event_id: str):
webhook_event = None
try:
webhook_event = self.webhooks_service.get_webhook_event(webhook_event_id)
if not webhook_event:
raise Exception("Webhook payload not found in database.")
webhook_event_factory = WebhookEventFactory()
webhook_event_handler = webhook_event_factory(webhook_event.request_type)
webhook_event_handler.process_webhook_event(webhook_event)

webhook_event.error = None
self.webhooks_service.update_webhook_event(webhook_event)
except Exception as e:
if not webhook_event:
raise e
webhook_event.error = {
"type": e.__class__.__name__,
"message": str(e),
"args": e.args,
"traceback": traceback.format_exc(),
}
self.webhooks_service.update_webhook_event(webhook_event)
raise e


def get_webhook_queue_handler():
return WebhookQueueHandler(webhooks_service=get_webhook_service())
13 changes: 13 additions & 0 deletions backend/analytics_server/mhq/service/queue/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from procrastinate_worker import app, flask_app
from mhq.service.queue.task_handlers.webhook_queue_handler import (
get_webhook_queue_handler,
)


class WebhookQueue:
@staticmethod
@app.task(queue="webhookQueue", name="WebhookQueue.enqueue_webhook")
def enqueue_webhook(webhook_event_id: str):
with flask_app.app_context():
webhook_queue_handler = get_webhook_queue_handler()
webhook_queue_handler.webhook_receiver_handler(webhook_event_id)
11 changes: 11 additions & 0 deletions backend/analytics_server/mhq/service/webhooks/factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from mhq.service.webhooks.factory_abstract import WebhookEventHandler
from mhq.store.models.webhooks.enums import WebhookEventRequestType
from mhq.service.webhooks.webhook_workflow_handler import get_webhook_workflow_handler


class WebhookEventFactory:
def __call__(self, request_type: WebhookEventRequestType) -> WebhookEventHandler:
if request_type == WebhookEventRequestType.WORKFLOW:
return get_webhook_workflow_handler()

raise NotImplementedError(f"Unknown request type - {request_type}")
29 changes: 29 additions & 0 deletions backend/analytics_server/mhq/service/webhooks/factory_abstract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from abc import ABC, abstractmethod
from typing import Any, Dict
from mhq.store.models.webhooks.webhooks import WebhookEvent


class WebhookEventHandler(ABC):
@abstractmethod
def validate_payload(self, payload: Dict[str, Any]):
"""
Validates the incoming webhook event data before processing.
"""

@abstractmethod
def save_webhook_event(self, org_id: str, payload: Dict[str, Any]) -> str:
"""
Saves the webhook event in database.
"""

@abstractmethod
def process_webhook_event(self, webhook_event: WebhookEvent):
"""
Executes the main business logic for processing the webhook event.
"""

@abstractmethod
def prune_synced_data(self, webhook_event: WebhookEvent):
"""
Prunes the synced data based on Interval.
"""
Loading