Skip to content

Commit

Permalink
Refactor imports using "isort"
Browse files Browse the repository at this point in the history
  • Loading branch information
Delgan committed Dec 6, 2018
1 parent bfce0fd commit d0aa354
Show file tree
Hide file tree
Showing 9 changed files with 124 additions and 55 deletions.
8 changes: 4 additions & 4 deletions loguru/_datetime.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from datetime import datetime as datetime_, timezone, timedelta
from calendar import day_name, day_abbr, month_name, month_abbr
from time import time, localtime
import re

from calendar import day_abbr, day_name, month_abbr, month_name
from datetime import datetime as datetime_
from datetime import timedelta, timezone
from time import localtime, time

tokens = r"H{1,2}|h{1,2}|m{1,2}|s{1,2}|S{1,6}|YYYY|YY|M{1,4}|D{1,4}|Z{1,2}|zz|A|X|x|E|Q|dddd|ddd|d"

Expand Down
1 change: 1 addition & 0 deletions loguru/_defaults.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from os import environ


def env(key, type_, default=None):
if key not in environ:
return default
Expand Down
126 changes: 83 additions & 43 deletions loguru/_file_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@


class FileDateFormatter:

def __init__(self):
self.datetime = now()

Expand All @@ -22,9 +21,18 @@ def __format__(self, spec):


class FileSink:

def __init__(self, path, *, rotation=None, retention=None, compression=None, delay=False,
mode='a', buffering=1, **kwargs):
def __init__(
self,
path,
*,
rotation=None,
retention=None,
compression=None,
delay=False,
mode="a",
buffering=1,
**kwargs
):
self.mode = mode
self.buffering = buffering
self.kwargs = kwargs.copy()
Expand Down Expand Up @@ -80,45 +88,49 @@ def initialize_file(self, *, rename_existing):
self.file_path = new_path

def format_path(self):
path = self.path.format_map({'time': FileDateFormatter()})
path = self.path.format_map({"time": FileDateFormatter()})
return os.path.abspath(path)

@staticmethod
def make_glob_pattern(path):
tokens = string.Formatter().parse(path)
parts = (glob.escape(text) + '*' * (name is not None) for text, name, *_ in tokens)
root, ext = os.path.splitext(''.join(parts))
parts = (glob.escape(text) + "*" * (name is not None) for text, name, *_ in tokens)
root, ext = os.path.splitext("".join(parts))
if ext:
pattern = root + '.*'
pattern = root + ".*"
else:
pattern = root + '*'
pattern = root + "*"
return pattern

def make_rotation_function(self, rotation):

def make_from_size(size_limit):
def rotation_function(message, file):
file.seek(0, 2)
return file.tell() + len(message) >= size_limit

return rotation_function

def make_from_time(step_forward, time_init=None):
start_time = time_limit = now().replace(tzinfo=None)
if time_init is not None:
time_limit = time_limit.replace(hour=time_init.hour,
minute=time_init.minute,
second=time_init.second,
microsecond=time_init.microsecond)
time_limit = time_limit.replace(
hour=time_init.hour,
minute=time_init.minute,
second=time_init.second,
microsecond=time_init.microsecond,
)
if time_limit <= start_time:
time_limit = step_forward(time_limit)

def rotation_function(message, file):
nonlocal time_limit
record_time = message.record['time'].replace(tzinfo=None)
record_time = message.record["time"].replace(tzinfo=None)
if record_time >= time_limit:
while time_limit <= record_time:
time_limit = step_forward(time_limit)
return True
return False

return rotation_function

if rotation is None:
Expand All @@ -140,34 +152,42 @@ def rotation_function(message, file):
return self.make_rotation_function(time)
if time is None:
time = datetime.time(0, 0, 0)

def next_day(t):
while True:
t += datetime.timedelta(days=1)
if t.weekday() == day:
t += datetime.timedelta(days=1)
if t.weekday() == day:
return t

return make_from_time(next_day, time_init=time)
raise ValueError("Cannot parse rotation from: '%s'" % rotation)
elif isinstance(rotation, (numbers.Real, decimal.Decimal)):
return make_from_size(rotation)
elif isinstance(rotation, datetime.time):

def next_day(t):
return t + datetime.timedelta(days=1)

return make_from_time(next_day, time_init=rotation)
elif isinstance(rotation, datetime.timedelta):

def add_interval(t):
return t + rotation

return make_from_time(add_interval)
elif callable(rotation):
return rotation
else:
raise ValueError("Cannot infer rotation for objects of type: '%s'" % type(rotation).__name__)
raise ValueError(
"Cannot infer rotation for objects of type: '%s'" % type(rotation).__name__
)

def make_retention_function(self, retention):

def make_from_filter(filter_logs):
def retention_function(logs):
for log in filter_logs(logs):
os.remove(log)

return retention_function

if retention is None:
Expand All @@ -178,74 +198,92 @@ def retention_function(logs):
raise ValueError("Cannot parse retention from: '%s'" % retention)
return self.make_retention_function(interval)
elif isinstance(retention, int):

def key_log(log):
return (-os.stat(log).st_mtime, log)

def filter_logs(logs):
return sorted(logs, key=key_log)[retention:]

return make_from_filter(filter_logs)
elif isinstance(retention, datetime.timedelta):
seconds = retention.total_seconds()

def filter_logs(logs):
t = now().timestamp()
return [log for log in logs if os.stat(log).st_mtime <= t - seconds]

return make_from_filter(filter_logs)
elif callable(retention):
return retention
else:
raise ValueError("Cannot infer retention for objects of type: '%s'" % type(retention).__name__)
raise ValueError(
"Cannot infer retention for objects of type: '%s'" % type(retention).__name__
)

def make_compression_function(self, compression):

def make_compress_generic(opener, **kwargs):
def compress(path_in, path_out):
with open(path_in, 'rb') as f_in:
with opener(path_out, 'wb', **kwargs) as f_out:
with open(path_in, "rb") as f_in:
with opener(path_out, "wb", **kwargs) as f_out:
shutil.copyfileobj(f_in, f_out)

return compress

def make_compress_archive(mode):
import tarfile

def compress(path_in, path_out):
with tarfile.open(path_out, 'w:' + mode) as f_comp:
with tarfile.open(path_out, "w:" + mode) as f_comp:
f_comp.add(path_in, os.path.basename(path_in))

return compress

def make_compress_zipped():
import zlib, zipfile
import zipfile

def compress(path_in, path_out):
with zipfile.ZipFile(path_out, 'w', compression=zipfile.ZIP_DEFLATED) as f_comp:
with zipfile.ZipFile(path_out, "w", compression=zipfile.ZIP_DEFLATED) as f_comp:
f_comp.write(path_in, os.path.basename(path_in))

return compress

if compression is None:
return None
elif isinstance(compression, str):
ext = compression.strip().lstrip('.')
ext = compression.strip().lstrip(".")

if ext == "gz":
import gzip

if ext == 'gz':
import zlib, gzip
compress = make_compress_generic(gzip.open)
elif ext == 'bz2':
elif ext == "bz2":
import bz2

compress = make_compress_generic(bz2.open)
elif ext == 'xz':
elif ext == "xz":
import lzma

compress = make_compress_generic(lzma.open, format=lzma.FORMAT_XZ)
elif ext == 'lzma':
elif ext == "lzma":
import lzma

compress = make_compress_generic(lzma.open, format=lzma.FORMAT_ALONE)
elif ext == 'tar':
compress = make_compress_archive('')
elif ext == 'tar.gz':
import zlib, gzip
compress = make_compress_archive('gz')
elif ext == 'tar.bz2':
elif ext == "tar":
compress = make_compress_archive("")
elif ext == "tar.gz":
import gzip

compress = make_compress_archive("gz")
elif ext == "tar.bz2":
import bz2
compress = make_compress_archive('bz2')
elif ext == 'tar.xz':

compress = make_compress_archive("bz2")
elif ext == "tar.xz":
import lzma
compress = make_compress_archive('xz')
elif ext == 'zip':

compress = make_compress_archive("xz")
elif ext == "zip":
compress = make_compress_zipped()
else:
raise ValueError("Invalid compression format: '%s'" % ext)
Expand All @@ -265,7 +303,9 @@ def compression_function(path_in):
elif callable(compression):
return compression
else:
raise ValueError("Cannot infer compression for objects of type: '%s'" % type(compression).__name__)
raise ValueError(
"Cannot infer compression for objects of type: '%s'" % type(compression).__name__
)

def stop(self):
self.terminate(teardown=self.rotation_function is None)
Expand Down
2 changes: 1 addition & 1 deletion loguru/_handler.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import functools
import json
import multiprocessing
import string
import sys
import threading
import traceback
import string

import ansimarkup

Expand Down
6 changes: 3 additions & 3 deletions loguru/_logger.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import functools
import itertools
import logging
import os
import threading
from collections import namedtuple
from datetime import timedelta
Expand All @@ -18,7 +17,8 @@
from ._file_sink import FileSink
from ._get_frame import get_frame
from ._handler import Handler
from ._recattrs import LevelRecattr, FileRecattr, ThreadRecattr, ProcessRecattr, ExceptionRecattr
from ._recattrs import (ExceptionRecattr, FileRecattr, LevelRecattr,
ProcessRecattr, ThreadRecattr)

Level = namedtuple('Level', ['no', 'color', 'icon'])

Expand Down Expand Up @@ -228,7 +228,7 @@ def start(self, sink, *, level=_defaults.LOGURU_LEVEL, format=_defaults.LOGURU_F
elif hasattr(sink, 'write') and callable(sink.write):
try:
converter = AnsiToWin32(sink, convert=None, strip=False)
except:
except Exception:
if colorize is None:
colorize = False
stream = sink
Expand Down
2 changes: 0 additions & 2 deletions loguru/_notifier.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import textwrap

import notifiers


Expand Down
2 changes: 0 additions & 2 deletions loguru/_recattrs.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import random
import re
import sys
import traceback
from collections import namedtuple

from better_exceptions_fork import ExceptionFormatter


loguru_traceback = namedtuple('loguru_traceback', ('tb_frame', 'tb_lasti', 'tb_lineno', 'tb_next'))


Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
extras_require={
'dev': [
'coveralls>=1.3.0',
'isort>=4.3.4',
'pytest>=3.5.0',
'pytest-cov>=2.5.1',
],
Expand Down
31 changes: 31 additions & 0 deletions tests/test_filesink_compression.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pytest
import os
import sys
from loguru import logger

@pytest.mark.parametrize('compression', [
Expand Down Expand Up @@ -79,3 +80,33 @@ def test_rename_existing_before_compression(monkeypatch_date, tmpdir):
def test_invalid_compression(compression):
with pytest.raises(ValueError):
logger.start('test.log', compression=compression)

@pytest.mark.parametrize('ext', ['gz', 'tar.gz'])
def test_gzip_module_unavailable(ext, monkeypatch):
monkeypatch.setitem(sys.modules, 'gzip', None)
with pytest.raises(ImportError):
logger.start("test.log", compression=ext)

@pytest.mark.parametrize('ext', ['bz2', 'tar.bz2'])
def test_bz2_module_unavailable(ext, monkeypatch):
monkeypatch.setitem(sys.modules, 'bz2', None)
with pytest.raises(ImportError):
logger.start("test.log", compression=ext)

@pytest.mark.parametrize('ext', ['xz', 'lzma', 'tar.xz'])
def test_lzma_module_unavailable(ext, monkeypatch):
monkeypatch.setitem(sys.modules, 'lzma', None)
with pytest.raises(ImportError):
logger.start("test.log", compression=ext)

@pytest.mark.parametrize('ext', ['tar', 'tar.gz', 'tar.bz2', 'tar.xz'])
def test_tarfile_module_unavailable(ext, monkeypatch):
monkeypatch.setitem(sys.modules, 'tarfile', None)
with pytest.raises(ImportError):
logger.start("test.log", compression=ext)

@pytest.mark.parametrize('ext', ['zip'])
def test_zipfile_module_unavailable(ext, monkeypatch):
monkeypatch.setitem(sys.modules, 'zipfile', None)
with pytest.raises(ImportError):
logger.start("test.log", compression=ext)

0 comments on commit d0aa354

Please sign in to comment.