diff --git a/loguru/_datetime.py b/loguru/_datetime.py index ec77d9f8c..9c83c82c3 100644 --- a/loguru/_datetime.py +++ b/loguru/_datetime.py @@ -1,8 +1,8 @@ -from datetime import datetime as datetime_, timezone, timedelta -from calendar import day_name, day_abbr, month_name, month_abbr -from time import time, localtime import re - +from calendar import day_abbr, day_name, month_abbr, month_name +from datetime import datetime as datetime_ +from datetime import timedelta, timezone +from time import localtime, time tokens = r"H{1,2}|h{1,2}|m{1,2}|s{1,2}|S{1,6}|YYYY|YY|M{1,4}|D{1,4}|Z{1,2}|zz|A|X|x|E|Q|dddd|ddd|d" diff --git a/loguru/_defaults.py b/loguru/_defaults.py index ad8d76871..f26b94c67 100644 --- a/loguru/_defaults.py +++ b/loguru/_defaults.py @@ -1,5 +1,6 @@ from os import environ + def env(key, type_, default=None): if key not in environ: return default diff --git a/loguru/_file_sink.py b/loguru/_file_sink.py index 3fc60b973..3a9c273a9 100644 --- a/loguru/_file_sink.py +++ b/loguru/_file_sink.py @@ -11,7 +11,6 @@ class FileDateFormatter: - def __init__(self): self.datetime = now() @@ -22,9 +21,18 @@ def __format__(self, spec): class FileSink: - - def __init__(self, path, *, rotation=None, retention=None, compression=None, delay=False, - mode='a', buffering=1, **kwargs): + def __init__( + self, + path, + *, + rotation=None, + retention=None, + compression=None, + delay=False, + mode="a", + buffering=1, + **kwargs + ): self.mode = mode self.buffering = buffering self.kwargs = kwargs.copy() @@ -80,45 +88,49 @@ def initialize_file(self, *, rename_existing): self.file_path = new_path def format_path(self): - path = self.path.format_map({'time': FileDateFormatter()}) + path = self.path.format_map({"time": FileDateFormatter()}) return os.path.abspath(path) @staticmethod def make_glob_pattern(path): tokens = string.Formatter().parse(path) - parts = (glob.escape(text) + '*' * (name is not None) for text, name, *_ in tokens) - root, ext = os.path.splitext(''.join(parts)) + parts = (glob.escape(text) + "*" * (name is not None) for text, name, *_ in tokens) + root, ext = os.path.splitext("".join(parts)) if ext: - pattern = root + '.*' + pattern = root + ".*" else: - pattern = root + '*' + pattern = root + "*" return pattern def make_rotation_function(self, rotation): - def make_from_size(size_limit): def rotation_function(message, file): file.seek(0, 2) return file.tell() + len(message) >= size_limit + return rotation_function def make_from_time(step_forward, time_init=None): start_time = time_limit = now().replace(tzinfo=None) if time_init is not None: - time_limit = time_limit.replace(hour=time_init.hour, - minute=time_init.minute, - second=time_init.second, - microsecond=time_init.microsecond) + time_limit = time_limit.replace( + hour=time_init.hour, + minute=time_init.minute, + second=time_init.second, + microsecond=time_init.microsecond, + ) if time_limit <= start_time: time_limit = step_forward(time_limit) + def rotation_function(message, file): nonlocal time_limit - record_time = message.record['time'].replace(tzinfo=None) + record_time = message.record["time"].replace(tzinfo=None) if record_time >= time_limit: while time_limit <= record_time: time_limit = step_forward(time_limit) return True return False + return rotation_function if rotation is None: @@ -140,34 +152,42 @@ def rotation_function(message, file): return self.make_rotation_function(time) if time is None: time = datetime.time(0, 0, 0) + def next_day(t): while True: - t += datetime.timedelta(days=1) - if t.weekday() == day: + t += datetime.timedelta(days=1) + if t.weekday() == day: return t + return make_from_time(next_day, time_init=time) raise ValueError("Cannot parse rotation from: '%s'" % rotation) elif isinstance(rotation, (numbers.Real, decimal.Decimal)): return make_from_size(rotation) elif isinstance(rotation, datetime.time): + def next_day(t): return t + datetime.timedelta(days=1) + return make_from_time(next_day, time_init=rotation) elif isinstance(rotation, datetime.timedelta): + def add_interval(t): return t + rotation + return make_from_time(add_interval) elif callable(rotation): return rotation else: - raise ValueError("Cannot infer rotation for objects of type: '%s'" % type(rotation).__name__) + raise ValueError( + "Cannot infer rotation for objects of type: '%s'" % type(rotation).__name__ + ) def make_retention_function(self, retention): - def make_from_filter(filter_logs): def retention_function(logs): for log in filter_logs(logs): os.remove(log) + return retention_function if retention is None: @@ -178,74 +198,92 @@ def retention_function(logs): raise ValueError("Cannot parse retention from: '%s'" % retention) return self.make_retention_function(interval) elif isinstance(retention, int): + def key_log(log): return (-os.stat(log).st_mtime, log) + def filter_logs(logs): return sorted(logs, key=key_log)[retention:] + return make_from_filter(filter_logs) elif isinstance(retention, datetime.timedelta): seconds = retention.total_seconds() + def filter_logs(logs): t = now().timestamp() return [log for log in logs if os.stat(log).st_mtime <= t - seconds] + return make_from_filter(filter_logs) elif callable(retention): return retention else: - raise ValueError("Cannot infer retention for objects of type: '%s'" % type(retention).__name__) + raise ValueError( + "Cannot infer retention for objects of type: '%s'" % type(retention).__name__ + ) def make_compression_function(self, compression): - def make_compress_generic(opener, **kwargs): def compress(path_in, path_out): - with open(path_in, 'rb') as f_in: - with opener(path_out, 'wb', **kwargs) as f_out: + with open(path_in, "rb") as f_in: + with opener(path_out, "wb", **kwargs) as f_out: shutil.copyfileobj(f_in, f_out) + return compress def make_compress_archive(mode): import tarfile + def compress(path_in, path_out): - with tarfile.open(path_out, 'w:' + mode) as f_comp: + with tarfile.open(path_out, "w:" + mode) as f_comp: f_comp.add(path_in, os.path.basename(path_in)) + return compress def make_compress_zipped(): - import zlib, zipfile + import zipfile + def compress(path_in, path_out): - with zipfile.ZipFile(path_out, 'w', compression=zipfile.ZIP_DEFLATED) as f_comp: + with zipfile.ZipFile(path_out, "w", compression=zipfile.ZIP_DEFLATED) as f_comp: f_comp.write(path_in, os.path.basename(path_in)) + return compress if compression is None: return None elif isinstance(compression, str): - ext = compression.strip().lstrip('.') + ext = compression.strip().lstrip(".") + + if ext == "gz": + import gzip - if ext == 'gz': - import zlib, gzip compress = make_compress_generic(gzip.open) - elif ext == 'bz2': + elif ext == "bz2": import bz2 + compress = make_compress_generic(bz2.open) - elif ext == 'xz': + elif ext == "xz": import lzma + compress = make_compress_generic(lzma.open, format=lzma.FORMAT_XZ) - elif ext == 'lzma': + elif ext == "lzma": import lzma + compress = make_compress_generic(lzma.open, format=lzma.FORMAT_ALONE) - elif ext == 'tar': - compress = make_compress_archive('') - elif ext == 'tar.gz': - import zlib, gzip - compress = make_compress_archive('gz') - elif ext == 'tar.bz2': + elif ext == "tar": + compress = make_compress_archive("") + elif ext == "tar.gz": + import gzip + + compress = make_compress_archive("gz") + elif ext == "tar.bz2": import bz2 - compress = make_compress_archive('bz2') - elif ext == 'tar.xz': + + compress = make_compress_archive("bz2") + elif ext == "tar.xz": import lzma - compress = make_compress_archive('xz') - elif ext == 'zip': + + compress = make_compress_archive("xz") + elif ext == "zip": compress = make_compress_zipped() else: raise ValueError("Invalid compression format: '%s'" % ext) @@ -265,7 +303,9 @@ def compression_function(path_in): elif callable(compression): return compression else: - raise ValueError("Cannot infer compression for objects of type: '%s'" % type(compression).__name__) + raise ValueError( + "Cannot infer compression for objects of type: '%s'" % type(compression).__name__ + ) def stop(self): self.terminate(teardown=self.rotation_function is None) diff --git a/loguru/_handler.py b/loguru/_handler.py index 5098a7990..5b946b1cf 100644 --- a/loguru/_handler.py +++ b/loguru/_handler.py @@ -1,10 +1,10 @@ import functools import json import multiprocessing +import string import sys import threading import traceback -import string import ansimarkup diff --git a/loguru/_logger.py b/loguru/_logger.py index 66b5c9f03..0b89b78d0 100644 --- a/loguru/_logger.py +++ b/loguru/_logger.py @@ -1,7 +1,6 @@ import functools import itertools import logging -import os import threading from collections import namedtuple from datetime import timedelta @@ -18,7 +17,8 @@ from ._file_sink import FileSink from ._get_frame import get_frame from ._handler import Handler -from ._recattrs import LevelRecattr, FileRecattr, ThreadRecattr, ProcessRecattr, ExceptionRecattr +from ._recattrs import (ExceptionRecattr, FileRecattr, LevelRecattr, + ProcessRecattr, ThreadRecattr) Level = namedtuple('Level', ['no', 'color', 'icon']) @@ -228,7 +228,7 @@ def start(self, sink, *, level=_defaults.LOGURU_LEVEL, format=_defaults.LOGURU_F elif hasattr(sink, 'write') and callable(sink.write): try: converter = AnsiToWin32(sink, convert=None, strip=False) - except: + except Exception: if colorize is None: colorize = False stream = sink diff --git a/loguru/_notifier.py b/loguru/_notifier.py index 6db6d7bb8..0f38ac0cc 100644 --- a/loguru/_notifier.py +++ b/loguru/_notifier.py @@ -1,5 +1,3 @@ -import textwrap - import notifiers diff --git a/loguru/_recattrs.py b/loguru/_recattrs.py index f801ccc4e..26abca623 100644 --- a/loguru/_recattrs.py +++ b/loguru/_recattrs.py @@ -1,4 +1,3 @@ -import random import re import sys import traceback @@ -6,7 +5,6 @@ from better_exceptions_fork import ExceptionFormatter - loguru_traceback = namedtuple('loguru_traceback', ('tb_frame', 'tb_lasti', 'tb_lineno', 'tb_next')) diff --git a/setup.py b/setup.py index ab0042fe5..cf66372e2 100644 --- a/setup.py +++ b/setup.py @@ -32,6 +32,7 @@ extras_require={ 'dev': [ 'coveralls>=1.3.0', + 'isort>=4.3.4', 'pytest>=3.5.0', 'pytest-cov>=2.5.1', ], diff --git a/tests/test_filesink_compression.py b/tests/test_filesink_compression.py index da4795945..e4bf6193e 100644 --- a/tests/test_filesink_compression.py +++ b/tests/test_filesink_compression.py @@ -1,5 +1,6 @@ import pytest import os +import sys from loguru import logger @pytest.mark.parametrize('compression', [ @@ -79,3 +80,33 @@ def test_rename_existing_before_compression(monkeypatch_date, tmpdir): def test_invalid_compression(compression): with pytest.raises(ValueError): logger.start('test.log', compression=compression) + +@pytest.mark.parametrize('ext', ['gz', 'tar.gz']) +def test_gzip_module_unavailable(ext, monkeypatch): + monkeypatch.setitem(sys.modules, 'gzip', None) + with pytest.raises(ImportError): + logger.start("test.log", compression=ext) + +@pytest.mark.parametrize('ext', ['bz2', 'tar.bz2']) +def test_bz2_module_unavailable(ext, monkeypatch): + monkeypatch.setitem(sys.modules, 'bz2', None) + with pytest.raises(ImportError): + logger.start("test.log", compression=ext) + +@pytest.mark.parametrize('ext', ['xz', 'lzma', 'tar.xz']) +def test_lzma_module_unavailable(ext, monkeypatch): + monkeypatch.setitem(sys.modules, 'lzma', None) + with pytest.raises(ImportError): + logger.start("test.log", compression=ext) + +@pytest.mark.parametrize('ext', ['tar', 'tar.gz', 'tar.bz2', 'tar.xz']) +def test_tarfile_module_unavailable(ext, monkeypatch): + monkeypatch.setitem(sys.modules, 'tarfile', None) + with pytest.raises(ImportError): + logger.start("test.log", compression=ext) + +@pytest.mark.parametrize('ext', ['zip']) +def test_zipfile_module_unavailable(ext, monkeypatch): + monkeypatch.setitem(sys.modules, 'zipfile', None) + with pytest.raises(ImportError): + logger.start("test.log", compression=ext)