From 709247e694e87f95d494e1dc4676877995ed2aad Mon Sep 17 00:00:00 2001 From: "Jason M. Gates" Date: Tue, 10 Aug 2021 11:28:07 -0600 Subject: [PATCH] WIP COMMIT --- src/shelllogger/ShellLogger.py | 118 ++++--- src/shelllogger/classes.py | 154 +++++---- src/shelllogger/util.py | 607 +++++++++++++++++++++++++-------- 3 files changed, 631 insertions(+), 248 deletions(-) diff --git a/src/shelllogger/ShellLogger.py b/src/shelllogger/ShellLogger.py index 202ba00..c7ddc42 100644 --- a/src/shelllogger/ShellLogger.py +++ b/src/shelllogger/ShellLogger.py @@ -3,7 +3,7 @@ from __future__ import annotations from .classes import (Shell, Trace, StatsCollector, trace_collector, stats_collectors) -from .util import (nested_SimpleNamespace_to_dict, opening_html_text, +from .util import (nested_simplenamespace_to_dict, opening_html_text, closing_html_text, append_html, html_message_card, message_card, command_card, child_logger_card, parent_logger_card_html) @@ -29,8 +29,8 @@ class ShellLoggerEncoder(json.JSONEncoder): """ This is a helper class to make the :class:`ShellLogger` class JSON - serializable. This particular class is used in the process of - saving :class:`ShellLogger` objects to JSON. + serializable. It is used in the process of saving + :class:`ShellLogger` objects to JSON. Usage:: @@ -80,8 +80,8 @@ def default(self, obj: object) -> object: class ShellLoggerDecoder(json.JSONDecoder): """ This is a helper class to make the :class:`ShellLogger` class JSON - serializable. This particular class is used in the process of - retrieving :class:`ShellLogger` objects from JSON. + serializable. It is used in the process of retrieving + :class:`ShellLogger` objects from JSON. Usage:: @@ -100,8 +100,7 @@ def __init__(self): def dict_to_object(obj: dict) -> object: """ This converts data dictionaries given by the JSONDecoder into - objects of type :class:`ShellLogger`, - :class:`datetime`, etc. + objects of type :class:`ShellLogger`, :class:`datetime`, etc. Args: obj: The JSON-serialized representation of an object. @@ -167,7 +166,7 @@ class ShellLogger: stream_dir (Path): Path to directory where ``stdout``/``stderr`` stream logs are stored. html_file (Path): Path to main HTML file for the parent and - children :class:`ShellLogger` objects. + child :class:`ShellLogger` objects. indent (int): The indentation level of this :class:`ShellLogger` object. The parent has a level 0. Each successive child's indent is increased by 1. @@ -177,7 +176,7 @@ class ShellLogger: was created. done_time (datetime): The time this :class:`ShellLogger` object is done with its commands/messages. - duration (str): String formatted duration of this + duration (str): The string-formatted duration of this :class:`ShellLogger`, updated when the :func:`finalize` method is called. shell (Shell): The :class:`Shell` in which all commands will be @@ -401,7 +400,7 @@ def strfdelta(tdelta: timedelta, fmt: str) -> str: Parameters: tdelta: The time delta object. - fmt (str): The delta format string. + fmt: The delta format string. Returns: A string representation of the time delta. @@ -461,7 +460,7 @@ def to_html(self) -> List[object]: the :attr:`stream_dir`. Returns: - Todo: Figure this out. + Todo: Figure this out. List[str] or List[List[str]] or... """ html = [] for log in self.log_book: @@ -526,7 +525,7 @@ def finalize(self) -> None: def log( self, msg: str, - cmd: Union[str, List[str]], + cmd: str, cwd: Optional[Path] = None, live_stdout: bool = False, live_stderr: bool = False, @@ -536,9 +535,7 @@ def log( **kwargs ) -> dict: """ - Add something to the log. To conserve memory, ``stdout`` and - ``stderr`` will be written to the files as they are being - generated. + Execute a command, and log the corresponding information. Parameters: msg: A message to be recorded with the command. This could @@ -563,7 +560,7 @@ def log( causes problems, and we need the flexibility to revert back to standard behavior. **kwargs: Any other keyword arguments to pass on to - :func:`run`. + :func:`_run`. Returns: A dictionary containing ``stdout``, ``stderr``, ``trace``, @@ -574,16 +571,11 @@ def log( dictionary will contain the output of the specified trace; otherwise, it will be ``None``. - Todo: - Deprecate the ability to pass in a ``List[str]`` for the - ``cmd``. We automatically convert the list to a string - anyway, and there's no need to maintain a similar interface - to ``subprocess.Popen()``. + Note: + To conserve memory, ``stdout`` and ``stderr`` will be + written to the files as they are being generated. """ start_time = datetime.now() - if isinstance(cmd, list): - cmd = ' '.join("'" + str(x).replace("'", "'\"'\"'") + "'" - for x in cmd) # Create a unique command ID that will be used to find the # location of the `stdout`/`stderr` files in the temporary @@ -591,7 +583,7 @@ def log( cmd_id = 'cmd_' + ''.join(random.choice(string.ascii_lowercase) for _ in range(9)) - # Create & open files for `stdout` and `stderr`. + # Create & open files for `stdout`, `stderr`, and trace data. time_str = start_time.strftime("%Y-%m-%d_%H%M%S") stdout_path = self.stream_dir / f"{time_str}_{cmd_id}_stdout" stderr_path = self.stream_dir / f"{time_str}_{cmd_id}_stderr" @@ -613,18 +605,18 @@ def log( 'return_code': 0} # Execute the command. - result = self.run(cmd, - quiet_stdout=not live_stdout, - quiet_stderr=not live_stderr, - stdout_str=return_info, - stderr_str=return_info, - trace_str=return_info, - stdout_path=stdout_path, - stderr_path=stderr_path, - trace_path=trace_path, - devnull_stdin=stdin_redirect, - pwd=cwd, - **kwargs) + result = self._run(cmd, + quiet_stdout=not live_stdout, + quiet_stderr=not live_stderr, + stdout_str=return_info, + stderr_str=return_info, + trace_str=return_info, + stdout_path=stdout_path, + stderr_path=stderr_path, + trace_path=trace_path, + devnull_stdin=stdin_redirect, + pwd=cwd, + **kwargs) # Update the log information and save it to the `log_book`. h = int(result.wall / 3600000) @@ -632,24 +624,25 @@ def log( s = int(result.wall / 1000) % 60 log["duration"] = f"{h}h {m}m {s}s" log["return_code"] = result.returncode - log = {**log, **nested_SimpleNamespace_to_dict(result)} + log = {**log, **nested_simplenamespace_to_dict(result)} self.log_book.append(log) return {'return_code': log['return_code'], 'stdout': result.stdout, 'stderr': result.stderr} - def run(self, command: str, **kwargs) -> SimpleNamespace: + def _run(self, command: str, **kwargs) -> SimpleNamespace: """ - Todo: Fill this out. + Execute a command, capturing various information as you go. Parameters: - command: + command: The command to execute. **kwargs: Returns: Todo: - Finish commenting this function. Figure out how it works - first. + * Finish commenting this function. Figure out how it works + first. + * Replace `**kwargs` with actual parameters. """ completed_process, trace_output = None, None for key in ["stdout_str", "stderr_str", "trace_str"]: @@ -672,6 +665,8 @@ def run(self, command: str, **kwargs) -> SimpleNamespace: # /usr/include/linux/un.h`. old_tmpdir = os.environ.get("TMPDIR") os.environ["TMPDIR"] = "/tmp" + + # Start up any stats or trace collectors the user has requested. collectors = stats_collectors(**kwargs) stats = {} if len(collectors) > 0 else None for collector in collectors: @@ -685,6 +680,8 @@ def run(self, command: str, **kwargs) -> SimpleNamespace: trace = trace_collector(**kwargs) command = trace.command(command, **kwargs) trace_output = trace.output_path + + # Run the command, and stop any collectors that were started. completed_process = self.shell.run(command, **kwargs) for collector in collectors: stats[collector.stat_name] = collector.finish() @@ -704,10 +701,12 @@ def run(self, command: str, **kwargs) -> SimpleNamespace: def auxiliary_information(self) -> SimpleNamespace: """ - Todo: Insert docstring. + Capture all sorts of auxiliary information before running a + command. Returns: - + The working directory, environment, umask, hostname, user, + group, shell, and ulimit. """ pwd, _ = self.shell.auxiliary_command(posix="pwd", nt="cd", strip=True) environment, _ = self.shell.auxiliary_command(posix="env", nt="set") @@ -735,11 +734,12 @@ def auxiliary_information(self) -> SimpleNamespace: @Trace.subclass class Strace(Trace): """ - Todo: Insert docstring. + An interface between :class:`ShellLogger` and the ``strace`` + command. """ trace_name = "strace" - def __init__(self, **kwargs): + def __init__(self, **kwargs) -> None: """ Todo: Insert docstring. """ @@ -748,9 +748,9 @@ def __init__(self, **kwargs): self.expression = kwargs.get("expression") @property - def trace_args(self): + def trace_args(self) -> str: """ - Todo: Insert docstring. + Wraps a command in a ``strace`` command. """ args = f"strace -f -o {self.output_path}" if self.summary: @@ -796,9 +796,13 @@ class DiskStatsCollector(StatsCollector): """ stat_name = "disk" - def __init__(self, interval, manager): + def __init__(self, interval: float, manager: object) -> None: """ Todo: Insert docstring. + + Args: + interval: + manager: """ super().__init__(interval, manager) self.stats = manager.dict() @@ -814,17 +818,21 @@ def __init__(self, interval, manager): for m in self.mountpoints: self.stats[m] = manager.list() - def collect(self): + def collect(self) -> None: """ - Todo: Insert docstring. + Poll the disks to determine how much free space you have. """ timestamp = round(time.time() * 1000) for m in self.mountpoints: self.stats[m].append((timestamp, psutil.disk_usage(m).percent)) - def unproxied_stats(self): + def unproxied_stats(self) -> dict: """ - Todo: Insert docstring. + Todo: FOO + + Returns: + A mapping from the disk mount points to tuples of + timestamps and percent of disk space free. """ return {k: list(v) for k, v in self.stats.items()} @@ -881,6 +889,8 @@ def unproxied_stats(self): Todo: Insert docstring. """ return list(self.stats) + +# If we don't have `psutil`, return null objects. else: @StatsCollector.subclass class DiskStatsCollector(StatsCollector): diff --git a/src/shelllogger/classes.py b/src/shelllogger/classes.py index fb5b862..bf5e825 100644 --- a/src/shelllogger/classes.py +++ b/src/shelllogger/classes.py @@ -14,18 +14,20 @@ from threading import Thread from time import sleep, time from types import SimpleNamespace -from typing import List, Tuple +from typing import List, TextIO, Tuple def trace_collector(**kwargs) -> object: """ - Todo: Insert docstring. + Todo: Insert docstring. Factory. Returns any subclass of Trace + that has the @Trace.subclass annotation. Parameters: - **kwargs: + **kwargs: Any supported arguments of the :class:`Trace` + subclass. Returns: - Todo: Figure this out. + Todo: Figure this out. Single Trace subclass instance. """ trace_name = kwargs["trace"] collectors = [c for c in Trace.subclasses if c.trace_name == trace_name] @@ -40,13 +42,13 @@ def trace_collector(**kwargs) -> object: def stats_collectors(**kwargs) -> List[object]: """ - Todo: Insert docstring. + Todo: Insert docstring. See above. Parameters: **kwargs: Returns: - Todo: Figure this out. + Todo: Figure this out. List[StatsCollector subclasses] """ collectors = [] if "measure" in kwargs: @@ -60,7 +62,8 @@ def stats_collectors(**kwargs) -> List[object]: class Shell: """ - Todo: Insert docstring. + Spawns a shell subprocess that inherits five unnamed pipes (stdout, + stderr, stdin, aux_stdout, aux_stderr). """ def __init__(self, pwd: Path = Path.cwd()) -> None: @@ -71,23 +74,29 @@ def __init__(self, pwd: Path = Path.cwd()) -> None: pwd: The directory to change to when starting the :class:`Shell`. """ + + # Corresponds to 0,1,2 file descriptors of the shell we're going + # to spawn. self.aux_stdin_rfd, self.aux_stdin_wfd = os.pipe() self.aux_stdout_rfd, self.aux_stdout_wfd = os.pipe() self.aux_stderr_rfd, self.aux_stderr_wfd = os.pipe() - # Todo: What's happening here? + # Get the current flags of the file descriptors. aux_stdout_write_flags = fcntl.fcntl(self.aux_stdout_wfd, fcntl.F_GETFL) + aux_stderr_write_flags = fcntl.fcntl(self.aux_stderr_wfd, + fcntl.F_GETFL) + + # Make writes non-blocking. fcntl.fcntl(self.aux_stdout_wfd, fcntl.F_SETFL, aux_stdout_write_flags | os.O_NONBLOCK) - aux_stderr_write_flags = fcntl.fcntl(self.aux_stderr_wfd, - fcntl.F_GETFL) fcntl.fcntl(self.aux_stderr_wfd, fcntl.F_SETFL, aux_stderr_write_flags | os.O_NONBLOCK) - # Todo: What's happening here? + # Ensure the file descriptors are inheritable by the shell + # subprocess. os.set_inheritable(self.aux_stdout_wfd, True) os.set_inheritable(self.aux_stderr_wfd, True) self.shell = subprocess.Popen(os.environ.get("SHELL") or "/bin/sh", @@ -97,6 +106,8 @@ def __init__(self, pwd: Path = Path.cwd()) -> None: close_fds=False) os.set_inheritable(self.aux_stdout_wfd, False) os.set_inheritable(self.aux_stderr_wfd, False) + + # Change to the directory FOOBAR self.cd(pwd) def __del__(self) -> None: @@ -141,18 +152,17 @@ def cd(self, path: Path) -> None: Parameters: path: The directory to change to. - - Todo: Figure out if ``path`` is a ``str`` or ``Path``. """ os.chdir(path) self.auxiliary_command(posix=f"cd {path}", nt=f"cd {path}") def run(self, command: str, **kwargs) -> SimpleNamespace: """ - Todo: Insert docstring. + Write a ``command`` to the :class:`Shell` class' shell + subprocess' ``stdin``, and pull the ``stdout`` and ``stderr``. Parameters: - command: The command to run in the :class:`Shell`. + command: The command to run in the shell subprocess. **kwargs: Returns: @@ -160,23 +170,34 @@ def run(self, command: str, **kwargs) -> SimpleNamespace: """ start = round(time() * 1000) - # Todo: Why are the enclosing braces necessary? + # Wrap the `command` in {braces} to support newlines and + # heredocs to tell the shell "this is one giant statement". + # Then run the command. if kwargs.get("devnull_stdin"): os.write(self.aux_stdin_wfd, f"{{\n{command}\n}} &2\n".encode()) + + # Tee the output to multiple sinks (files, strings, + # stdout/stderr). try: output = self.tee(self.shell.stdout, self.shell.stderr, **kwargs) + + # Note: If something goes wrong in `tee()`, the only way to reliably + # propagate an exception from a thread that's spawned is to raise a + # KeyboardInterrupt. except KeyboardInterrupt: os.close(self.aux_stdin_wfd) - - # Todo: Should this error message be changed? raise RuntimeError( f"There was a problem running the command `{command}`. " "This is a fatal error and we cannot continue. Ensure that " @@ -184,7 +205,9 @@ def run(self, command: str, **kwargs) -> SimpleNamespace: ) finish = round(time() * 1000) - # Todo: Why is there no `nt="foo"`? + # Pull the return code and return the results. Note that if the + # command executed spawns a sub-shell, you won't really have a + # return code. aux_out, _ = self.auxiliary_command(posix="echo $RET_CODE") try: returncode = int(aux_out) @@ -201,7 +224,11 @@ def run(self, command: str, **kwargs) -> SimpleNamespace: ) @staticmethod - def tee(stdout, stderr, **kwargs) -> SimpleNamespace: + def tee( + stdout: TextIO, + stderr: TextIO, + **kwargs + ) -> SimpleNamespace: """ Todo: Insert docstring. @@ -224,24 +251,27 @@ def tee(stdout, stderr, **kwargs) -> SimpleNamespace: stdout_tee = [sys_stdout, stdout_io, stdout_path] stderr_tee = [sys_stderr, stderr_io, stderr_path] - def write(input_file, output_files) -> None: + def write(input_file: TextIO, output_files: List[TextIO]) -> None: """ Todo: Insert docstring. Parameters: input_file: output_files: - - Todo: Determine types for inputs. """ - chunk = os.read(input_file.fileno(), 4096) + chunk_size = 4096 # 4 KB + chunk = os.read(input_file.fileno(), chunk_size) while chunk and chunk[-1] != 4: for output_file in output_files: if output_file is not None: output_file.write(chunk.decode(errors="ignore")) chunk = os.read(input_file.fileno(), 4096) if not chunk: + + # If something goes wrong in the `tee()`, see the note elsewhere. _thread.interrupt_main() + + # Remove the EOT character, and write the last chunk. chunk = chunk[:-1] for output_file in output_files: if output_file is not None: @@ -269,7 +299,15 @@ def write(input_file, output_files) -> None: def auxiliary_command(self, **kwargs) -> Tuple[str, str]: """ - Todo: Insert docstring. + Todo: Insert docstring. The same as the `run` command, but: + 1. stdout/stderr get redirected to the aux fds + 2. you don't tee any out/err + + Purpose is to run aux commands like umask, pwd, env, etc. + + Could maybe combine this with `run` with extra flags. + + Todo: Rip out Windows support. Parameters: **kwargs: @@ -290,17 +328,17 @@ def auxiliary_command(self, **kwargs) -> Tuple[str, str]: stdout = "" stderr = "" - # Todo: What's with the magic numbers? - aux = os.read(self.aux_stdout_rfd, 65536) + magic_number = 65536 # Max amount of info you can write to an unnamed pipe without flushing it. https://unix.stackexchange.com/questions/343302/anonymous-pipe-kernel-buffer-size + aux = os.read(self.aux_stdout_rfd, magic_number) while aux[-1] != 4: stdout += aux.decode() - aux = os.read(self.aux_stdout_rfd, 65536) + aux = os.read(self.aux_stdout_rfd, magic_number) aux = aux[:-1] stdout += aux.decode() - aux = os.read(self.aux_stderr_rfd, 65536) + aux = os.read(self.aux_stderr_rfd, magic_number) while aux[-1] != 4: stderr += aux.decode() - aux = os.read(self.aux_stderr_rfd, 65536) + aux = os.read(self.aux_stderr_rfd, magic_number) aux = aux[:-1] stderr += aux.decode() if kwargs.get("strip"): @@ -313,23 +351,27 @@ def auxiliary_command(self, **kwargs) -> Tuple[str, str]: class Trace: """ - Todo: Insert docstring. + Provides an interface for the :class:`ShellLogger` to run commands + with a certain trace (e.g., ``strace`` or ``ltrace``). """ - trace_name = "undefined" + trace_name = "undefined" # Should be defined by subclasses. subclasses = [] - def subclass(TraceSubclass): + @staticmethod # Or is there some @decorator annotation? + def subclass(tracesubclass: type): """ - Todo: Insert docstring. + Todo: Insert docstring. Decorator. Adds to a list of supported Trace classes for the trace_collector factory method. """ - if issubclass(TraceSubclass, Trace): - Trace.subclasses.append(TraceSubclass) - return TraceSubclass + if issubclass(tracesubclass, Trace): + Trace.subclasses.append(tracesubclass) + return tracesubclass def __init__(self, **kwargs): """ Todo: Insert docstring. """ + + # Set up the output file where you'll write the trace info. if kwargs.get("trace_path"): self.output_path = Path(kwargs["trace_path"]) else: @@ -339,13 +381,13 @@ def __init__(self, **kwargs): @abstractmethod def trace_args(self): """ - Todo: Insert docstring. + Todo: Insert docstring. The trace command + the arguments you pass to it (but not the command you're tracing). Needs to be overridden in subclasses. E.g. return `strace -f -c -e "open"`. """ raise AbstractMethod() def command(self, command, **kwargs): """ - Todo: Insert docstring. + Return a command that runs a trace on a command. E.g. "ls -l" -> "strace -f -c -e 'open' -- ls -l" """ return f"{self.trace_args} -- {command}" @@ -357,13 +399,13 @@ class StatsCollector: stat_name = "undefined" subclasses = [] - def subclass(StatsCollectorSubclass): + def subclass(statscollectorsubclass: type): """ Todo: Insert docstring. """ - if issubclass(StatsCollectorSubclass, StatsCollector): - StatsCollector.subclasses.append(StatsCollectorSubclass) - return StatsCollectorSubclass + if issubclass(statscollectorsubclass, StatsCollector): + StatsCollector.subclasses.append(statscollectorsubclass) + return statscollectorsubclass def __init__(self, interval, manager): """ @@ -374,13 +416,15 @@ def __init__(self, interval, manager): def start(self): """ - Todo: Insert docstring. + Start a subprocess to poll at a certain interval for certain + statistics. """ self.process.start() def loop(self): """ - Todo: Insert docstring. + Infinitely loop, collecting stats, until the subprocess is + terminated. """ while True: self.collect() @@ -389,34 +433,28 @@ def loop(self): @abstractmethod def collect(self): """ - Todo: Insert docstring. + Meant to be overridden. Called at an interval. Instantaneous + collection of a stat. """ raise AbstractMethod() @abstractmethod def unproxied_stats(self): """ - Todo: Insert docstring. + Convert from Python's Manager's datastrcutrues to base Python + datastructures. """ raise AbstractMethod() def finish(self): """ - Todo: Insert docstring. + Terminate the infinite loop that's collecting the stats, and + then return the unproxied stats. """ self.process.terminate() return self.unproxied_stats() -class FileAlreadyExists(RuntimeError): - def __init__(self, file): - """ - Todo: Insert docstring. - """ - super().__init__(f"{file.resolve()} already exists! " - "Delete or rename and try rerunning this.") - - class AbstractMethod(NotImplementedError): def __init__(self): """ diff --git a/src/shelllogger/util.py b/src/shelllogger/util.py index 89451c8..70fc16f 100644 --- a/src/shelllogger/util.py +++ b/src/shelllogger/util.py @@ -1,80 +1,147 @@ #!/usr/bin/env python3 from collections.abc import Iterable, Mapping -import datetime +from datetime import datetime import pkgutil -import os from pathlib import Path import re import textwrap -from types import SimpleNamespace, GeneratorType - -def nested_SimpleNamespace_to_dict(object): - if "_asdict" in dir(object): - return nested_SimpleNamespace_to_dict(object._asdict()) - elif isinstance(object, (str, bytes, tuple)): - return object - elif isinstance(object, Mapping): - return {k:nested_SimpleNamespace_to_dict(v) for k,v in object.items()} - elif isinstance(object, Iterable): - return [nested_SimpleNamespace_to_dict(x) for x in object] - elif isinstance(object, SimpleNamespace): - return nested_SimpleNamespace_to_dict(object.__dict__) +from types import SimpleNamespace +from typing import TextIO, Tuple + + +def nested_simplenamespace_to_dict(namespace: object) -> object: + """ + Todo: Figure this out. + + Parameters: + namespace: + + Returns: + + """ + if "_asdict" in dir(namespace): + return nested_simplenamespace_to_dict(namespace._asdict()) + elif isinstance(namespace, (str, bytes, tuple)): + return namespace + elif isinstance(namespace, Mapping): + return {k: nested_simplenamespace_to_dict(v) for k, v in + namespace.items()} + elif isinstance(namespace, Iterable): + return [nested_simplenamespace_to_dict(x) for x in namespace] + elif isinstance(namespace, SimpleNamespace): + return nested_simplenamespace_to_dict(namespace.__dict__) else: - return object - -def filter_junk_from_env(env, junk_list): - filtered_env = "" - for line in env.split('\n'): - is_junk = any([line[:len(junk)+1] == f"{junk}=" for junk in junk_list]) - if not is_junk: - filtered_env += line + '\n' - return filtered_env - -def miliseconds_to_datetime(miliseconds): - return datetime.datetime.fromtimestamp(miliseconds / 1000.0) - -def miliseconds_to_human_time(miliseconds): - return miliseconds_to_datetime(miliseconds).strftime('%Y-%m-%d %H:%M:%S.%f') - -def opening_html_text(): - return ( - "" + - "" + - html_header() + return namespace + + +def get_human_time(milliseconds: float) -> str: + """ + Get a human-readable date/time. + + Parameters: + milliseconds: The number of milliseconds since epoch. + + Returns: + A string representation of the date and time. + """ + return datetime.fromtimestamp(milliseconds / 1000.0).strftime( + '%Y-%m-%d %H:%M:%S.%f' ) -def closing_html_text(): + +def opening_html_text() -> str: + """ + Get the opening HTML text. + + Returns: + A string containing the first line of the HTML document through + `` ``. + """ + return ("" + + "" + + html_header()) + + +def closing_html_text() -> str: + """ + Get the closing HTML tag. + + Returns: + A string with the closing HTML tag in it. + """ return "" -def append_html(*args, output=None): - def _append_html(file, *args): - for arg in args: + +def append_html(*args: object, output: Path) -> None: + """ + Todo: Figure this out. + + Parameters: + *args: + output: The HTML file to append to. + """ + def _append_html(f: TextIO, *inner_args: object) -> None: + """ + Todo: Figure this out. + + Parameters: + f: The HTML file to write to. + *inner_args: + """ + for arg in inner_args: if isinstance(arg, str): - file.write(arg) + f.write(arg) elif isinstance(arg, bytes): - file.write(arg.decode()) + f.write(arg.decode()) elif isinstance(arg, Iterable): - _append_html(file, *element) + _append_html(f, *arg) else: raise RuntimeError(f"Unsupported type: {type(arg)}") - with open(output, "a") as file: - _append_html(file, *args) -def fixed_width(text): + with open(output, "a") as output_file: + _append_html(output_file, *args) + + +def fixed_width(text: str) -> str: + """ + Wrap the given ``text`` in a ``...`` block such that it + displays in a fixed-width font. + + Parameters: + text: The text to wrap. + + Returns: + The ``...`` block. + """ return f"{html_encode(text)}" -def flatten(element): + +def flatten(element: object) -> object: + """ + Takes a tree of lists and turns it into a flat iterables. Ish. + + Parameters: + element: + """ if isinstance(element, str): yield element elif isinstance(element, bytes): - file.write(element.decode()) + yield element.decode() elif isinstance(element, Iterable): for _element in element: yield from flatten(_element) else: yield element -def parent_logger_card_html(name, *args): + +def parent_logger_card_html(name: object, *args: object) -> object: + """ + Todo: Figure this out. + + Parameters: + name: + *args: + """ header, indent, footer = split_template(parent_logger_template, "parent_body", name=name) @@ -83,11 +150,30 @@ def parent_logger_card_html(name, *args): yield textwrap.indent(arg, indent) yield footer -def child_logger_card(log): + +def child_logger_card(log: object) -> object: + """ + Todo: Figure this out. + + Parameters: + log: + + Returns: + + """ child_html = log.to_html() return child_logger_card_html(log.name, log.duration, *child_html) -def child_logger_card_html(name, duration, *args): + +def child_logger_card_html(name: object, duration: object, *args: object) -> object: + """ + Todo: Figure this out. + + Parameters: + name: + duration: + *args: + """ header, indent, footer = split_template(child_logger_template, "child_body", name=name, @@ -101,7 +187,15 @@ def child_logger_card_html(name, duration, *args): yield textwrap.indent(_arg, indent) yield footer -def command_card_html(log, *args): + +def command_card_html(log: object, *args: object) -> object: + """ + Todo: Figure this out. + + Parameters: + log: + *args: + """ header, indent, footer = split_template(command_template, "more_info", cmd_id=log["cmd_id"], @@ -118,7 +212,14 @@ def command_card_html(log, *args): yield textwrap.indent(_arg, indent) yield footer -def html_message_card(log): + +def html_message_card(log: object) -> object: + """ + Todo: Figure this out. + + Parameters: + log: + """ timestamp = log["timestamp"] timestamp = timestamp.replace(' ', '_') timestamp = timestamp.replace(':', '-') @@ -134,7 +235,14 @@ def html_message_card(log): yield textwrap.indent(text, indent) + '\n' yield footer -def message_card(log): + +def message_card(log: object) -> object: + """ + Todo: Figure this out. + + Parameters: + log: + """ header, indent, footer = split_template(message_template, "message") text = html_encode(log["msg"]) text = "
" + text.replace('\n', "
") + "
" @@ -142,7 +250,15 @@ def message_card(log): yield textwrap.indent(text, indent) + '\n' yield footer -def command_detail_list(cmd_id, *args): + +def command_detail_list(cmd_id: object, *args: object) -> object: + """ + Todo: Figure this out. + + Parameters: + cmd_id: + *args: + """ header, indent, footer = split_template(command_detail_list_template, "details", cmd_id=cmd_id) @@ -152,7 +268,20 @@ def command_detail_list(cmd_id, *args): yield textwrap.indent(arg, indent) yield footer -def command_detail(cmd_id, name, value, hidden=False): + +def command_detail(cmd_id: object, name: object, value: object, hidden: object = False) -> object: + """ + Todo: Figure this out. + + Parameters: + cmd_id: + name: + value: + hidden: + + Returns: + + """ if hidden: return hidden_command_detail_template.format(cmd_id=cmd_id, name=name, @@ -160,7 +289,18 @@ def command_detail(cmd_id, name, value, hidden=False): else: return command_detail_template.format(name=name, value=value) -def command_card(log, strm_dir): + +def command_card(log: object, strm_dir: object) -> object: + """ + Todo: Figure this out. + + Parameters: + log: + strm_dir: + + Returns: + + """ cmd_id = log["cmd_id"] stdout_path = strm_dir / f"{log['timestamp']}_{cmd_id}_stdout" stderr_path = strm_dir / f"{log['timestamp']}_{cmd_id}_stderr" @@ -199,8 +339,8 @@ def command_card(log, strm_dir): if log["stats"].get("disk"): uninteresting_disks = ["/var", "/var/log", "/var/log/audit", "/boot", "/boot/efi"] - disk_stats = { x:y for x, y in log["stats"]["disk"].items() - if x not in uninteresting_disks } + disk_stats = {x: y for x, y in log["stats"]["disk"].items() + if x not in uninteresting_disks} # We sort because JSON deserialization may change # the ordering of the map. for disk, data in sorted(disk_stats.items()): @@ -209,26 +349,70 @@ def command_card(log, strm_dir): return command_card_html(log, *info) -def timeseries_plot(cmd_id, data_tuples, series_title): - labels = [miliseconds_to_human_time(x) for x, _ in data_tuples] + +def timeseries_plot(cmd_id: object, data_tuples: object, series_title: object) -> object: + """ + Todo: Figure this out. + + Parameters: + cmd_id: + data_tuples: + series_title: + + Returns: + + """ + labels = [get_human_time(x) for x, _ in data_tuples] values = [y for _, y in data_tuples] id = f"{cmd_id}-{series_title.lower().replace(' ', '-')}-chart" return stat_chart_card(labels, values, series_title, id) -def disk_timeseries_plot(cmd_id, data_tuples, volume_name): - labels = [miliseconds_to_human_time(x) for x, _ in data_tuples] + +def disk_timeseries_plot(cmd_id: object, data_tuples: object, volume_name: object) -> object: + """ + Todo: Figure this out. + + Parameters: + cmd_id: + data_tuples: + volume_name: + + Returns: + + """ + labels = [get_human_time(x) for x, _ in data_tuples] values = [y for _, y in data_tuples] id = f"{cmd_id}-volume{volume_name.replace('/', '_')}-usage" stat_title = f"Used Space on {volume_name}" return stat_chart_card(labels, values, stat_title, id) -def stat_chart_card(labels, data, title, id): + +def stat_chart_card(labels: object, data: object, title: object, id: object) -> object: + """ + Todo: Figure this out. + + Parameters: + labels: + data: + title: + id: + """ yield stat_chart_template.format(labels=labels, data=data, title=title, id=id) -def output_block_card(title, string, cmd_id, collapsed=True): + +def output_block_card(title: object, string: object, cmd_id: object, collapsed: object = True) -> object: + """ + Todo: Figure this out. + + Parameters: + title: + string: + cmd_id: + collapsed: + """ name = title.replace(' ', '_').lower() if collapsed: template = output_card_collapsed_template @@ -244,16 +428,33 @@ def output_block_card(title, string, cmd_id, collapsed=True): yield textwrap.indent(line, indent) yield footer -def output_block(input, name, cmd_id): - if isinstance(input, Path): - with open(input) as f: + +def output_block(input_file_or_str: object, name: object, cmd_id: object) -> object: + """ + Todo: Figure this out. + + Parameters: + input_file_or_str: + name: + cmd_id: + """ + if isinstance(input_file_or_str, Path): + with open(input_file_or_str) as f: for string in output_block_html(f, name, cmd_id): yield string - if isinstance(input, str): - for string in output_block_html(input, name, cmd_id): + if isinstance(input_file_or_str, str): + for string in output_block_html(input_file_or_str, name, cmd_id): yield string -def diagnostics_card(cmd_id, *args): + +def diagnostics_card(cmd_id: object, *args: object) -> object: + """ + Todo: Figure this out. + + Parameters: + cmd_id: + *args: + """ header, indent, footer = split_template(diagnostics_template, "diagnostics", cmd_id=cmd_id) @@ -266,7 +467,16 @@ def diagnostics_card(cmd_id, *args): yield textwrap.indent(_arg, indent) yield footer -def output_block_html(lines, name, cmd_id): + +def output_block_html(lines: object, name: object, cmd_id: object) -> object: + """ + Todo: Figure this out. + + Parameters: + lines: + name: + cmd_id: + """ if isinstance(lines, str): lines = lines.split('\n') header, indent, footer = split_template(output_block_template, @@ -280,26 +490,74 @@ def output_block_html(lines, name, cmd_id): yield textwrap.indent(output_line_html(line, lineno), indent) yield footer -def split_template(template, split_at, **kwargs): - format = { k:v for k, v in kwargs.items() if k != split_at } + +def split_template( + template: str, + split_at: str, + **kwargs: object +) -> Tuple[str, str, str]: + """ + Todo: Figure this out. + + Parameters: + template: A templated HTML snippet. + split_at: A substring used to split the ``template`` into + before and after chunks. + **kwargs: + + Returns: + + """ + fmt = {k: v for k, v in kwargs.items() if k != split_at} pattern = re.compile(f"(.*\\n)(\\s*)\\{{{split_at}\\}}\\n(.*)", flags=re.DOTALL) before, indent, after = pattern.search(template).groups() - return before.format(**format), indent, after.format(**format) + return before.format(**fmt), indent, after.format(**fmt) + + +def output_line_html(line: object, lineno: object) -> object: + """ + Todo: Figure this out. -def output_line_html(line, lineno): + Parameters: + line: + lineno: + + Returns: + + """ encoded_line = html_encode(line).rstrip() return output_line_template.format(line=encoded_line, lineno=lineno) -def html_encode(text): + +def html_encode(text: str) -> str: + """ + Replace special characters with their HTML encodings. + + Parameters: + text: The text to encode. + + Returns: + The encoded text. + """ text = text.replace('&', "&") text = text.replace('<', "<") text = text.replace('>', ">") - text = text.replace('-', "-⁠") # non breaking dashes + text = text.replace('-', "-⁠") # Non-breaking dashes. text = sgr_to_html(text) return text -def sgr_to_html(text): + +def sgr_to_html(text: object) -> object: + """ + Todo: Figure this out. + + Parameters: + text: + + Returns: + + """ span_count = 0 while text.find("\x1b[") >= 0: start = text.find("\x1b[") @@ -330,7 +588,17 @@ def sgr_to_html(text): text = text[:start] + span_string + text[finish+1:] return text -def sgr_4bit_color_and_style_to_html(sgr): + +def sgr_4bit_color_and_style_to_html(sgr: object) -> object: + """ + Todo: Figure this out. + + Parameters: + sgr: + + Returns: + + """ sgr_to_css = { "1": "font-weight: bold;", "2": "font-weight: lighter;", @@ -357,11 +625,21 @@ def sgr_4bit_color_and_style_to_html(sgr): } return f'' -def sgr_8bit_color_to_html(sgr_params): + +def sgr_8bit_color_to_html(sgr_params: object) -> object: + """ + Todo: Figure this out. + + Parameters: + sgr_params: + + Returns: + + """ sgr_256 = int(sgr_params[2]) if len(sgr_params) > 2 else 0 if sgr_256 < 0 or sgr_256 > 255 or not sgr_params: '' - if sgr_256 > 15 and sgr_256 < 232: + if 15 < sgr_256 < 232: red_6cube = (sgr_256 - 16) // 36 green_6cube = (sgr_256 - (16 + red_6cube * 36)) // 6 blue_6cube = (sgr_256 - 16) % 6 @@ -369,7 +647,7 @@ def sgr_8bit_color_to_html(sgr_params): green = str(51 * green_6cube) blue = str(51 * blue_6cube) return sgr_24bit_color_to_html([sgr_params[0], "2", red, green, blue]) - elif sgr_256 < 256 and sgr_256 > 231: + elif 231 < sgr_256 < 256: gray = str(8 + (sgr_256 - 232) * 10) return sgr_24bit_color_to_html([sgr_params[0], "2", gray, gray, gray]) elif sgr_params[0] == "38": @@ -383,71 +661,128 @@ def sgr_8bit_color_to_html(sgr_params): elif sgr_256 < 16: return sgr_4bit_color_and_style_to_html(str(92+sgr_256)) -def sgr_24bit_color_to_html(sgr_params): + +def sgr_24bit_color_to_html(sgr_params: object) -> object: + """ + Todo: Figure this out. + + Parameters: + sgr_params: + + Returns: + + """ r, g, b = sgr_params[2:5] if len(sgr_params) == 5 else ("0", "0", "0") - if len(sgr_params) > 1 and sgr_params[:2] == ["38","2"]: + if len(sgr_params) > 1 and sgr_params[:2] == ["38", "2"]: return f'' - elif len(sgr_params) > 1 and sgr_params[:2] == ["48","2"]: + elif len(sgr_params) > 1 and sgr_params[:2] == ["48", "2"]: return f'' else: return '' -def html_header(): - return ( - "" + - embed_style("bootstrap.min.css") + - embed_style("Chart.min.css") + - embed_style("top_level_style_adjustments.css") + - embed_style("parent_logger_style.css") + - embed_style("child_logger_style.css") + - embed_style("command_style.css") + - embed_style("message_style.css") + - embed_style("detail_list_style.css") + - embed_style("code_block_style.css") + - embed_style("output_style.css") + - embed_style("diagnostics_style.css") + - embed_style("search_controls.css") + - embed_script("jquery.slim.min.js") + - embed_script("bootstrap.bundle.min.js") + - embed_script("Chart.bundle.min.js") + - embed_script("search_output.js") + - embed_html("search_icon.svg") + - " " - ) -def embed_style(resource): - return ( - "\n" - ) +def html_header() -> str: + """ + Get the HTML header, complete with embedded styles and scripts. + + Returns: + A string with the ``... `` contents. + """ + return ("" + + embed_style("bootstrap.min.css") + + embed_style("Chart.min.css") + + embed_style("top_level_style_adjustments.css") + + embed_style("parent_logger_style.css") + + embed_style("child_logger_style.css") + + embed_style("command_style.css") + + embed_style("message_style.css") + + embed_style("detail_list_style.css") + + embed_style("code_block_style.css") + + embed_style("output_style.css") + + embed_style("diagnostics_style.css") + + embed_style("search_controls.css") + + embed_script("jquery.slim.min.js") + + embed_script("bootstrap.bundle.min.js") + + embed_script("Chart.bundle.min.js") + + embed_script("search_output.js") + + embed_html("search_icon.svg") + + " ") + + +def embed_style(resource: str) -> str: + """ + Wrap the given ``resource`` in an appropriate ```` + block for embedding in the HTML header. + + Parameters: + resource: The name of a style file to embed. + + Returns: + A string containing the ```` block. + """ + return ("\n") + + +def embed_script(resource: str) -> str: + """ + Wrap the given ``resource`` in an appropriate + ```` block for embedding in the HTML header. + + Parameters: + resource: The name of a script file to embed. + + Returns: + A string containing the ```` block. + """ + return ("\n") + + +def embed_html(resource: str) -> str: + """ + Get a HTML ``resource`` froma file for the sake of embedding it into + the HTML header. + + Parameters: + resource: The name of a HTML file to embed. + + Returns: + The contents of the file. + + Todo: Why do we use ``pkgutil.get_data()`` instead of a simple + ``read()``. + """ + return pkgutil.get_data(__name__, f"resources/{resource}").decode() -def embed_script(resource): - return ( - "\n" - ) -def embed_html(resource): - return pkgutil.get_data(__name__, f"resources/{resource}").decode() +def load_template(template: str) -> str: + """ + Load a template HTML file. + + Parameters: + template: The file name to load. -def load_template(template): + Returns: + A string containing the contents of the file. + """ template_file = f"resources/templates/{template}" return pkgutil.get_data(__name__, template_file).decode() -command_detail_list_template = load_template("command_detail_list.html") -command_detail_template = load_template("command_detail.html") + +command_detail_list_template = load_template("command_detail_list.html") +command_detail_template = load_template("command_detail.html") hidden_command_detail_template = load_template("hidden_command_detail.html") -stat_chart_template = load_template("stat_chart.html") -diagnostics_template = load_template("diagnostics.html") -output_card_template = load_template("output_card.html") +stat_chart_template = load_template("stat_chart.html") +diagnostics_template = load_template("diagnostics.html") +output_card_template = load_template("output_card.html") output_card_collapsed_template = load_template("output_card_collapsed.html") -output_block_template = load_template("output_block.html") -output_line_template = load_template("output_line.html") -message_template = load_template("message.html") -html_message_template = load_template("html_message.html") -command_template = load_template("command.html") -child_logger_template = load_template("child_logger.html") -parent_logger_template = load_template("parent_logger.html") - +output_block_template = load_template("output_block.html") +output_line_template = load_template("output_line.html") +message_template = load_template("message.html") +html_message_template = load_template("html_message.html") +command_template = load_template("command.html") +child_logger_template = load_template("child_logger.html") +parent_logger_template = load_template("parent_logger.html")