Skip to content

merge tool mvp #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions merge_tool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from logzero import logger

from utils import merge_lag_faults

def main():
merge_lag_faults()

if __name__ == "__main__":
main()
77 changes: 76 additions & 1 deletion utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import json
import os
import re
import pandas as pd
from datetime import datetime as dtime
from datetime import timedelta
from typing import Dict, List
from urllib.parse import urlparse
from sqlalchemy import create_engine

import marko
import pytz
Expand All @@ -22,6 +24,7 @@
CLEANR = re.compile("<.*?>")

FAULT_RECORD_API_URL = os.getenv("FAULT_RECORD_API_URL")
FAULT_RECORD_DB_URI = os.getenv("FAULT_RECORD_DB_URI")

SHIB_MEMBER = os.getenv("SHIB_MEMBER")
SHIB_FIRST_NAME = os.getenv("SHIB_FIRST_NAME")
Expand All @@ -30,6 +33,78 @@

SCRAPER_USER_ID = -1

def merge_lag_faults():

global SCRAPER_USER_ID

if SCRAPER_USER_ID == -1:
SCRAPER_USER_ID = get_or_create_user(SCRAPER_EMAIL, SHIB_FIRST_NAME, SHIB_LAST_NAME, FAULT_RECORD_API_URL)

FRengine = create_engine(FAULT_RECORD_DB_URI)
query = f"""
SELECT
*
FROM fault_record.records
"""
df = pd.read_sql(query , FRengine)

# Scrape information needed to merge
regex = r"\*(.*)\* is (\d+) days old - \(last update: (.*)\) (.*)"
df['split_desc'] = df.desc.apply(lambda x: re.findall(regex, x))
cols = ['scraped_source', 'lag', 'date', 'scraped_desc']
regex = r"\*(?P<scraped_source>.*)\* is (?P<lag>\d+) days old - \(last update: (?P<date>.*)\) (?P<scraped_desc>.*)"
regex_cols = df.desc.str.extract(regex)
for col in cols:
df[col] = regex_cols[col]

# Drop the cols that don't have the matching data
df = df.drop('DVC_id', axis=1).dropna()
df['lag'] = df.lag.astype(int)

# Aggregate the faults we want to merge
df_merged = df.groupby(['scraped_source', 'date'])\
.agg({
'first_occurance': 'min',
'last_occurance': 'max',
'record_date': 'min',
'fault_id': 'min',
'lag': ['max', pd.Series.nunique]
}).reset_index()

df_merged.columns = ['scraped_source', 'date', 'first_occurance', 'last_occurance',
'record_date', 'fault_id', 'lag', 'rows']


headers = {"Content-type": "application/json", "Accept": "text/plain", "Member": SHIB_MEMBER, "givenName": SHIB_FIRST_NAME, "sn": SHIB_LAST_NAME}

# Update the first fault with the range of data
for _, row in df_merged.iterrows():
if row['rows'] > 1:
desc = df[df['fault_id']==row['fault_id']]['desc'].iloc[0]
url = f"{FAULT_RECORD_API_URL}/api/v1/admin/faults/{row['fault_id']}"
payload = {'published': '', 'user_id': SCRAPER_USER_ID}
payload['name'] = f"Merged lag faults for {row['scraped_source']}"
payload['desc'] = f"Merged faults with lag that grew to {row['lag']} with messages similar to \n {desc}"
payload['first_occurance'] = row['first_occurance'].isoformat()
payload['last_occurance'] = row['last_occurance'].isoformat()
payload['record_date'] = row['record_date'].isoformat()

logger.info(f"Merging {row['rows']} records into fault# {row['fault_id']}.")
r = requests.put(url, data=json.dumps(payload), headers=headers)

if not r.ok:
logger.error(f"Something went wrong when trying to merge fault# {row['fault_id']}: {r.json()}")

# Delete the faults that were merged into the first
df_delete = df['fault_id'][~df['fault_id'].isin(df_merged['fault_id'])]
for fault_id in df_delete:
url = f"{FAULT_RECORD_API_URL}/api/v1/admin/faults/{fault_id}"

logger.info(f"Deleting fault# {fault_id}.")
r = requests.delete(url, headers=headers)

if not r.ok:
logger.error(f"Something went wrong when trying to delete fault# {fault_id}: {r.json()}")

def parse_timestamp(ts: float) -> str:
"""Convert Slack message timestamp to date
Expand Down Expand Up @@ -254,7 +329,7 @@ def get_signal_ids(message: str) -> List:
"""
try:
source, signals = extract_source_signal_pair(message)
query_signals_url = get_signals_url(source, signals)
query_signals_url = get_signals_url(source, FAULT_RECORD_API_URL, signals)
signals = requests.get(query_signals_url)
signal_ids = [sig.get("signal_id") for sig in signals.json()]
return signal_ids
Expand Down