Skip to content

ci: save job steps, partially pr event fields in webhook receiver server #10255

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 73 additions & 24 deletions ydb/ci/gh-webhook-job/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
flask_app = Flask("github-webhook")


logger = logging.getLogger(__name__)
logger = logging.getLogger("wh")


def check_github_signature(f):
Expand All @@ -37,21 +37,8 @@ def decorated_func(*args, **kwargs):
return decorated_func


@flask_app.route("/webhooks", methods=["GET", "POST"])
@check_github_signature
def webhooks():
cfg = current_app.config

job = request.get_json()

if "workflow_job" not in job:
print("No workflow_job, skip")
return jsonify({"status": False, "description": "No workflow_job in the request"})

# noinspection HttpUrlsUsage
ch_url = f"http://{current_app.config['CH_FQDNS'][0]}:8123"

query = "INSERT INTO workflow_jobs FORMAT JSONEachRow"
def ch_execute(cfg, query, data):
ch_url = f"http://{cfg['CH_FQDNS'][0]}:8123"

params = {
"database": cfg["CH_DATABASE"],
Expand All @@ -70,20 +57,84 @@ def webhooks():
}
)

job = job["workflow_job"]
job["steps"] = len(job["steps"])

for i in range(5):
response = None
try:
response = requests.post(ch_url, params=params, data=json.dumps(job), headers=headers)
response = requests.post(ch_url, params=params, data=data, headers=headers)
response.raise_for_status()
break
except Exception as e:
logger.exception("while inserting into clickhouse", exc_info=e)
if response:
logger.warning("Response text %s", response.text)
time.sleep(0.1 * i)
else:
raise Exception("Unable to execute")

return response


def save_workflow_job(cfg, event):
logger.info("save_workflow_job")
job = event["workflow_job"]
orig_steps = job["steps"]
job["steps"] = len(job["steps"])

logger.info("save job")
ch_execute(cfg, "INSERT INTO workflow_jobs FORMAT JSONEachRow", json.dumps(job))

for step in orig_steps:
step = step.copy()
for f in ["id", "run_id", "started_at"]:
step[f"wf_{f}"] = job[f]
logger.info("save step")
ch_execute(cfg, "INSERT INTO workflow_job_steps FORMAT JSONEachRow", json.dumps(step))


def save_pullrequest(cfg, event):
action = event["action"]

logger.info("save_pullrequest")

pr = event["pull_request"]
data = {
"id": pr["id"],
"action": action,
"state": pr["state"],
"url": pr["url"],
"html_url": pr["html_url"],
"number": pr["number"],
"user_login": pr["user"]["login"],
"labels": [l["name"] for l in pr["labels"]],
"head_sha": pr["head"]["sha"],
"head_ref": pr["head"]["ref"],
"base_sha": pr["base"]["sha"],
"base_ref": pr["base"]["ref"],
"merge_commit_sha": pr["merge_commit_sha"],
"merged": pr["merged"],
"draft": pr["draft"],
"created_at": pr["created_at"],
"updated_at": pr["updated_at"],
"closed_at": pr["closed_at"],
"merged_at": pr["merged_at"],
}
ch_execute(cfg, "INSERT INTO pull_request FORMAT JSONEachRow", json.dumps(data))


@flask_app.route("/webhooks", methods=["GET", "POST"])
@check_github_signature
def webhooks():
cfg = current_app.config

event = request.get_json()

if "workflow_job" in event:
save_workflow_job(cfg, event)
elif "pull_request" in event:
save_pullrequest(cfg, event)
else:
logger.error("Unknown event action=%s, keys=%r, skip", event["action"], list(event.keys()))
return jsonify({"status": False, "description": "No workflow_job in the request"})

return jsonify({"status": True})

Expand Down Expand Up @@ -120,12 +171,11 @@ def prepare_logger():
},
},
"loggers": {
"": {
#
"wh": {
"handlers": ["default"],
"level": "INFO",
"propagate": False,
},
}
},
}
)
Expand All @@ -143,7 +193,6 @@ def main():
"GH_WEBHOOK_SECRET": os.environ["GH_WEBHOOK_SECRET"].encode("utf8"),
}
)

# https://docs.gunicorn.org/en/stable/settings.html
config = {
"bind": f"[::]:{port}",
Expand Down
45 changes: 44 additions & 1 deletion ydb/ci/gh-webhook-job/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,47 @@ CREATE TABLE gh.workflow_jobs (
)
ENGINE = MergeTree
PARTITION BY toStartOfMonth(started_at)
ORDER BY (id, updated_at)
ORDER BY (id, updated_at);


CREATE TABLE gh.workflow_job_steps (
wf_id UInt64,
wf_run_id UInt64,
wf_started_at DateTime,
name LowCardinality(String),
status LowCardinality(String),
conclusion LowCardinality(String),
number UInt16,
started_at DateTime,
completed_at DateTime DEFAULT now()
)
ENGINE = MergeTree
PARTITION BY toStartOfMonth(wf_started_at)
ORDER BY (wf_id, number);


CREATE TABLE gh.pull_request (
id UInt64,
action LowCardinality(String),
state LowCardinality(String),
number UInt64,
url String,
html_url String,
user_login LowCardinality(String),
labels Array(LowCardinality(String)),
head_sha String,
head_ref String,
base_sha String,
base_ref String,
merge_commit_sha String,
draft bool,
merged bool,
created_at DateTime,
updated_at DateTime,
closed_at DateTime,
merged_at DateTime

)
ENGINE = MergeTree
PARTITION BY toStartOfMonth(created_at)
ORDER BY (number, action);
2 changes: 1 addition & 1 deletion ydb/ci/ydb-ci-cloud/terraform/ydb-ci-cloud/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,5 @@ variable "ssh-keys-lockbox-version-id" {

variable "webhook_container_image" {
type = string
default = "cr.yandex/crp2lrlsrs36odlvd8dv/github-runner-scale-webhook:1"
default = "cr.yandex/crp2lrlsrs36odlvd8dv/github-runner-scale-webhook:2"
}