Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 37 additions & 5 deletions compliance-monitor/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
db_get_keys, db_insert_report, db_get_recent_results2, db_patch_approval2, db_get_report,
db_ensure_schema, db_get_apikeys, db_update_apikey, db_filter_apikeys, db_clear_delegates,
db_find_subjects, db_insert_result2, db_get_relevant_results2, db_add_delegate, db_get_group,
db_get_relevant_compliance_results, db_insert_compliance_result, db_insert_event,
)


Expand Down Expand Up @@ -463,11 +464,12 @@ async def post_report(
if not documents:
raise HTTPException(status_code=200, detail="empty reports")

allowed_subjects = {auth_subject} | set(delegation_subjects)
for document in documents:
check_role(account, document['subject'], ROLES['append_any'])
if document['subject'] not in allowed_subjects:
raise HTTPException(status_code=401, detail="delegation problem?")
reported_subjects = {document['subject'] for document in documents}
for subj in reported_subjects:
check_role(account, subj, ROLES['append_any'])
extra_subjects = reported_subjects - {auth_subject} - set(delegation_subjects)
if extra_subjects:
raise HTTPException(status_code=401, detail="delegation problem?")

with conn.cursor() as cur:
for document, json_text in zip(documents, json_texts):
Expand All @@ -493,6 +495,36 @@ async def post_report(
result = rdata['result']
approval = 1 == result # pre-approve good result
db_insert_result2(cur, checked_at, subject, scopeuuid, version, check, result, approval, reportid)

checked_at = datetime.now()
# add new compliance result if existing compliance result is not newer than `threshold`
threshold = checked_at - timedelta(hours=12)
for approved_only in (False, True):
# fetch latest compliance results before new report
rows = db_get_relevant_compliance_results(cur, approved_only=approved_only)
results0 = defaultdict(lambda: defaultdict(dict))
for row in rows:
subj, scope_uuid, version, result, _, ch_at = row
if subj not in reported_subjects:
continue
results0[subj][scope_uuid][version] = (result, ch_at)
# compute latest compliance results after new report
rows2 = db_get_relevant_results2(cur, approved_only=approved_only)
results = convert_result_rows_to_dict2(rows2, get_scopes())
# update compliance table and report changes
for subj, subj_results in results.items():
if subj not in reported_subjects:
continue
for scope_uuid, scope_results in subj_results.items():
for version, version_results in scope_results['versions'].items():
result, ch_at = results0[subj][scope_uuid].get(version, (0, threshold))
new_result = version_results['result']
if new_result != result or ch_at <= threshold:
print(ch_at, threshold)
db_insert_compliance_result(cur, checked_at, subj, scope_uuid, version, new_result, approved_only)
if new_result != result:
db_insert_event(cur, checked_at, subj, scope_uuid, version, result, new_result, approved_only)
print(f"{subj} {scope_uuid} {version}: {result} -> {new_result}")
conn.commit()


Expand Down
100 changes: 99 additions & 1 deletion compliance-monitor/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

# list schema versions in ascending order
SCHEMA_VERSION_KEY = 'version'
SCHEMA_VERSIONS = ['v1', 'v2', 'v3', 'v4']
SCHEMA_VERSIONS = ['v1', 'v2', 'v3', 'v4', 'v5']
# use ... (Ellipsis) here to indicate that no default value exists (will lead to error if no value is given)
ACCOUNT_DEFAULTS = {'subject': ..., 'api_key': ..., 'roles': ..., 'group': None}
PUBLIC_KEY_DEFAULTS = {'public_key': ..., 'public_key_type': ..., 'public_key_name': ...}
Expand Down Expand Up @@ -143,6 +143,33 @@ def db_ensure_schema_v4(cur: cursor):
''')


def db_ensure_schema_v5(cur: cursor):
# v5 mainly extends v4
db_ensure_schema_v4(cur)
# introduce tables compliance that track compliance over time
cur.execute('''
CREATE TABLE IF NOT EXISTS compliance (
resultid SERIAL PRIMARY KEY,
checked_at timestamp NOT NULL,
subject text NOT NULL,
scopeuuid text NOT NULL,
version text NOT NULL,
result int,
approval boolean
);
CREATE TABLE IF NOT EXISTS event (
eventid SERIAL PRIMARY KEY,
eventdate timestamp NOT NULL,
subject text NOT NULL,
scopeuuid text NOT NULL,
version text NOT NULL,
old_result int,
new_result int,
approval boolean
);
''')


def db_upgrade_data_v1_v2(cur):
# we are going to drop table result, but use delete anyway to have the transaction safety
cur.execute('''
Expand Down Expand Up @@ -219,6 +246,10 @@ def db_upgrade_schema(conn: connection, cur: cursor):
db_ensure_schema_v4(cur)
db_set_schema_version(cur, 'v4')
conn.commit()
elif current == 'v4':
db_ensure_schema_v5(cur)
db_set_schema_version(cur, 'v5')
conn.commit()


def db_ensure_schema(conn: connection):
Expand Down Expand Up @@ -424,3 +455,70 @@ def db_patch_approval2(cur: cursor, record):
RETURNING resultid;''', record)
resultid, = cur.fetchone()
return resultid


def db_insert_compliance_result(
cur: cursor, checked_at, subject, scopeuuid, version, result, approval
):
# this is an exception in that we don't use a record parameter (it's just not as practical here)
cur.execute('''
INSERT INTO compliance (checked_at, subject, scopeuuid, version, result, approval)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING resultid;''', (checked_at, subject, scopeuuid, version, result, approval))
resultid, = cur.fetchone()
return resultid


def db_get_relevant_compliance_results(
cur: cursor,
subject=None, scopeuuid=None, version=None, approved_only=True,
):
"""for each combination of scope/version/check, get the most recent test result that is still valid"""
# find the latest result per subject/scopeuuid/version/checkid for this subject
# DISTINCT ON is a Postgres-specific construct that comes in very handy here :)
cur.execute(sql.SQL('''
SELECT DISTINCT ON (subject, scopeuuid, version)
subject, scopeuuid, version, result, approval, checked_at
FROM compliance
{filter_condition}
ORDER BY subject, scopeuuid, version, checked_at DESC;
''').format(
filter_condition=make_where_clause(
sql.SQL('approval') if approved_only else None,
None if scopeuuid is None else sql.SQL('scopeuuid = %(scopeuuid)s'),
None if version is None else sql.SQL('version = %(version)s'),
None if subject is None else sql.SQL('subject = %(subject)s'),
),
), {"subject": subject, "scopeuuid": scopeuuid, "version": version})
return cur.fetchall()


def db_insert_event(
cur: cursor, eventdate, subject, scopeuuid, version, old_result, new_result, approval
):
# this is an exception in that we don't use a record parameter (it's just not as practical here)
cur.execute('''
INSERT INTO event (eventdate, subject, scopeuuid, version, old_result, new_result, approval)
VALUES (%s, %s, %s, %s, %s, %s, %s)
RETURNING eventid;''', (eventdate, subject, scopeuuid, version, old_result, new_result, approval))
resultid, = cur.fetchone()
return resultid


def db_get_recent_events(cur: cursor, approved, limit, skip, max_age_days=None):
"""list recent events without grouping by scope/version/check"""
columns = ('date', 'subject', 'scopeuuid', 'version', 'old_result', 'new_result', 'approval')
cur.execute(sql.SQL('''
SELECT eventdate, subject, scopeuuid, version, old_result, new_result, approval
FROM event
{where_clause}
ORDER BY eventdate
LIMIT %(limit)s OFFSET %(skip)s;''').format(
where_clause=make_where_clause(
None if max_age_days is None else sql.SQL(
f"eventdate > NOW() - interval '{max_age_days:d} days'"
),
None if approved is None else sql.SQL('approval = %(approved)s'),
),
), {"limit": limit, "skip": skip, "approved": approved})
return [{col: val for col, val in zip(columns, row)} for row in cur.fetchall()]