Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions app/api/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import api.app
import api.logging
import api.logging.audit
from api.app_config import AppConfig
from api.util.local import load_local_env_vars

Expand All @@ -17,6 +18,7 @@ def main() -> None:
load_local_env_vars()
app_config = AppConfig()

api.logging.audit.init_security_logging()
api.logging.init(__package__)
logger.info("Running API Application")

Expand Down
72 changes: 72 additions & 0 deletions app/api/logging/audit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#
# Application-level audit logging.
#
# See https://docs.python.org/3/library/audit_events.html
# https://docs.python.org/3/library/sys.html#sys.addaudithook
# https://www.python.org/dev/peps/pep-0578/
#

import logging
import sys
from typing import Any

import api.logging
import api.util.collections

logger = api.logging.get_logger(__name__)

AUDIT = 32

logging.addLevelName(AUDIT, "AUDIT")


def init_security_logging() -> None:
sys.addaudithook(audit_hook)


IGNORE_AUDIT_EVENTS = {
"builtins.id",
"code.__new__",
"compile",
"exec",
"import",
"marshal.loads",
"object.__getattr__",
"object.__setattr__",
"os.listdir",
"os.scandir",
"socket.__new__",
"sys._current_frames",
"sys._getframe",
"sys.settrace", # interferes with PyCharm debugger
}


def audit_hook(event_name: str, args: tuple[Any, ...]) -> None:
if event_name in IGNORE_AUDIT_EVENTS:
return
if event_name == "open" and isinstance(args[0], str) and "/__pycache__/" in args[0]:
# Python has a high rate of these events in normal operation.
return
if event_name == "os.chmod" and type(args[0]) is int:
# Gunicorn generates a high volume of these events in normal operation (see workertmp.py)
return
if event_name == "open" and isinstance(args[0], str) and "/pytz/" in args[0]:
# The pytz module generates a high volume of these events in normal operation.
return

audit_log(event_name, args)


def audit_log(event_name: str, args: tuple[Any, ...]) -> None:
"""Log a message but only log recently repeated messages at intervals."""
key = (event_name, repr(args))
count = audit_message_count[key] = audit_message_count[key] + 1
if count <= 10 or (count <= 100 and (count % 10) == 0) or (count % 100) == 0:
extra = {"audit.event_name": event_name, "count": count}
for arg_index, arg in enumerate(args):
extra["audit.args.%i" % arg_index] = arg
logger.log(AUDIT, event_name, extra=extra)


audit_message_count = api.util.collections.LeastRecentlyUsedDict()
5 changes: 5 additions & 0 deletions app/api/util/collections/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#
# Container datatypes and utilities.
#

from .dict import LeastRecentlyUsedDict # noqa: F401
37 changes: 37 additions & 0 deletions app/api/util/collections/dict.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#
# Dictionaries with additional behaviour.
#

import collections
from typing import Any


class LeastRecentlyUsedDict(collections.OrderedDict):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this file be named least_recently_used_dict.py?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And in app/api/util/collections/__init__.py we can do:

from .least_recently_used_dict import LeastRecentlyUsedDict

And to use it:

from api.util.collections import LeastRecentlyUsedDict

LeastRecentlyUsedDict(maxsize=4)

Which I think is a little cleaner than:

from api.util.collections.dict import dict_util

dict_util.LeastRecentlyUsedDict(maxsize=4)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used the name dict as this could contain other dict related utilities in future. I added the import to __init__.py.

"""A dict with a maximum size, evicting the least recently written key when full.

Getting a key that is not present returns a default value of 0.

Setting a key marks it as most recently used and removes the oldest key if full.

May be useful for tracking the count of items where limited memory usage is needed even if
the set of items can be unlimited.

Based on the example at
https://docs.python.org/3/library/collections.html#ordereddict-examples-and-recipes
"""

def __init__(self, maxsize: int = 128, *args: Any, **kwargs: Any) -> None:
self.maxsize = maxsize
super().__init__(*args, **kwargs)

def __getitem__(self, key: Any) -> Any:
if key in self:
return super().__getitem__(key)
return 0

def __setitem__(self, key: Any, value: Any) -> None:
if key in self:
self.move_to_end(key)
super().__setitem__(key, value)
if self.maxsize < len(self):
self.popitem(last=False)
Empty file.
64 changes: 64 additions & 0 deletions app/tests/api/logging/test_audit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#
# Tests for api.logging.audit.
#

import logging

from api.logging import audit


def test_audit_hook(caplog):
caplog.set_level(logging.INFO)

# Should appear in audit log.
audit.audit_hook("open", ("/dev/null", None, 123000))

# Various common cases that should not appear in audit log (normal behaviour & too noisy).
audit.audit_hook("compile", (b"def _(): pass", "<unknown>"))
audit.audit_hook("open", ("/srv/api/__pycache__/status.cpython-310.pyc", "r", 500010))
audit.audit_hook("os.chmod", (7, 1, -1))
audit.audit_hook(
"open", ("/app/.venv/lib/python3.10/site-packages/pytz/zoneinfo/US/Eastern", "r", 524288)
)

assert [(r.funcName, r.levelname, r.message) for r in caplog.records] == [
("audit_log", "AUDIT", "open")
]
assert caplog.records[0].__dict__["audit.event_name"] == "open"
assert caplog.records[0].__dict__["audit.args.0"] == "/dev/null"
assert caplog.records[0].__dict__["audit.args.1"] is None
assert caplog.records[0].__dict__["audit.args.2"] == 123000
assert "audit.args.3" not in caplog.records[0].__dict__


def test_audit_log_repeated_message(caplog):
caplog.set_level(logging.INFO)

for _i in range(500):
audit.audit_log("abc", (1, 2, 3))

assert [(r.funcName, r.levelname, r.message, r.count) for r in caplog.records] == [
("audit_log", "AUDIT", "abc", 1),
("audit_log", "AUDIT", "abc", 2),
("audit_log", "AUDIT", "abc", 3),
("audit_log", "AUDIT", "abc", 4),
("audit_log", "AUDIT", "abc", 5),
("audit_log", "AUDIT", "abc", 6),
("audit_log", "AUDIT", "abc", 7),
("audit_log", "AUDIT", "abc", 8),
("audit_log", "AUDIT", "abc", 9),
("audit_log", "AUDIT", "abc", 10),
("audit_log", "AUDIT", "abc", 20),
("audit_log", "AUDIT", "abc", 30),
("audit_log", "AUDIT", "abc", 40),
("audit_log", "AUDIT", "abc", 50),
("audit_log", "AUDIT", "abc", 60),
("audit_log", "AUDIT", "abc", 70),
("audit_log", "AUDIT", "abc", 80),
("audit_log", "AUDIT", "abc", 90),
("audit_log", "AUDIT", "abc", 100),
("audit_log", "AUDIT", "abc", 200),
("audit_log", "AUDIT", "abc", 300),
("audit_log", "AUDIT", "abc", 400),
("audit_log", "AUDIT", "abc", 500),
]
Empty file added app/tests/api/util/__init__.py
Empty file.
Empty file.
36 changes: 36 additions & 0 deletions app/tests/api/util/collections/test_dict.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#
# Unit tests for api.util.collections.dict.
#

import api.util.collections.dict as dict_util


def test_least_recently_used_dict():
lru_dict = dict_util.LeastRecentlyUsedDict(maxsize=4)

assert lru_dict["a"] == 0
assert len(lru_dict) == 0

lru_dict["a"] = 10
lru_dict["b"] = 20
lru_dict["c"] = 30
lru_dict["d"] = 40

assert len(lru_dict) == 4
assert tuple(lru_dict.items()) == (("a", 10), ("b", 20), ("c", 30), ("d", 40))
assert lru_dict["a"] == 10
assert lru_dict["b"] == 20
assert lru_dict["c"] == 30
assert lru_dict["d"] == 40
assert lru_dict["e"] == 0
assert len(lru_dict) == 4

lru_dict["a"] += 1 # Write existing a, move to end
assert len(lru_dict) == 4
assert tuple(lru_dict.items()) == (("b", 20), ("c", 30), ("d", 40), ("a", 11))

lru_dict["f"] = 50 # Write new key f, and evict oldest b
lru_dict["c"] += 1 # Write existing c, move to end, and evict oldest d
lru_dict["g"] = 60 # Write new key g, and evict oldest d
assert len(lru_dict) == 4
assert tuple(lru_dict.items()) == (("a", 11), ("f", 50), ("c", 31), ("g", 60))