Skip to content

Commit fde604e

Browse files
committed
修正windows多次调用进程间lock产生的死锁问题
1 parent 1b02fde commit fde604e

File tree

8 files changed

+110
-4
lines changed

8 files changed

+110
-4
lines changed

log_to_kafka/__init__.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,8 @@
88

99
HOME_PAGE = "https://www.github.com/ShichaoMa/log_to_kafka"
1010

11-
from .logger import LogFactory, Logger, LogObject, KafkaHandler
11+
from .logger import LogFactory, Logger, LogObject, KafkaHandler, FixedConcurrentRotatingFileHandler, ConcurrentRotatingFileHandler
12+
13+
__all__ = ["LogFactory", "Logger", "LogObject", "KafkaHandler",
14+
"FixedConcurrentRotatingFileHandler", "ConcurrentRotatingFileHandler",
15+
"HOME_PAGE", "AUTHOR_EMAIL", "VERSION", "AUTHOR"]

log_to_kafka/logger.py

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,47 @@
77
import sys
88
import time
99
import traceback
10+
1011
from functools import wraps
12+
from portalocker import lock, unlock, LOCK_EX
1113

12-
from cloghandler import ConcurrentRotatingFileHandler
14+
from cloghandler import ConcurrentRotatingFileHandler, NullLogRecord
1315
from kafka import KafkaClient, SimpleProducer
1416
from kafka.common import FailedPayloadsError
1517
from pythonjsonlogger import jsonlogger
1618

1719
import default_settings
1820
from settings_wrapper import SettingsWrapper
1921

22+
class FixedConcurrentRotatingFileHandler(ConcurrentRotatingFileHandler):
23+
"""
24+
修正windows多次调用进程间lock产生的死锁问题
25+
"""
26+
def acquire(self):
27+
""" Acquire thread and file locks. Re-opening log for 'degraded' mode.
28+
"""
29+
# handle thread lock
30+
logging.Handler.acquire(self)
31+
# Issue a file lock. (This is inefficient for multiple active threads
32+
# within a single process. But if you're worried about high-performance,
33+
# you probably aren't using this log handler.)
34+
if self.stream_lock:
35+
# If stream_lock=None, then assume close() was called or something
36+
# else weird and ignore all file-level locks.
37+
if self.stream_lock.closed:
38+
# Daemonization can close all open file descriptors, see
39+
# https://bugzilla.redhat.com/show_bug.cgi?id=952929
40+
# Try opening the lock file again. Should we warn() here?!?
41+
try:
42+
self._open_lockfile()
43+
except Exception:
44+
self.handleError(NullLogRecord())
45+
# Don't try to open the stream lock again
46+
self.stream_lock = None
47+
return
48+
unlock(self.stream_lock)
49+
lock(self.stream_lock, LOCK_EX)
50+
2051

2152
def failedpayloads_wrapper(max_iter_times, _raise=False):
2253

@@ -188,8 +219,12 @@ def set_logger(self, logger=None):
188219
except OSError as exception:
189220
if exception.errno != errno.EEXIST:
190221
raise
222+
if os.name == "nt":
223+
handler = FixedConcurrentRotatingFileHandler
224+
else:
225+
handler = ConcurrentRotatingFileHandler
191226
self.logger.set_handler(
192-
ConcurrentRotatingFileHandler(
227+
handler(
193228
os.path.join(my_dir, my_file),
194229
backupCount=my_backups,
195230
maxBytes=my_bytes))

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
except:
77
from distutils.core import setup
88

9-
VERSION = '1.0.2'
9+
VERSION = '1.0.3'
1010

1111
AUTHOR = "cn"
1212

test/default_settings.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# -*- coding:utf-8 -*-
2+
3+
4+
# 日志级别
5+
LOG_LEVEL = 'DEBUG'
6+
# 是否标准到控制台
7+
LOG_STDOUT = False
8+
# 是否输出json格式
9+
LOG_JSON = False
10+
# log文件目录
11+
LOG_DIR = "logs"
12+
# 每个log最大大小
13+
LOG_MAX_BYTES = '10MB'
14+
# log备份数量
15+
LOG_BACKUPS = 5
16+
# 是否发送到kafka
17+
TO_KAFKA = False
18+
# kafka地址
19+
KAFKA_HOSTS = "192.168.200.90:9092"
20+
# kafka topic
21+
TOPIC = "jay-cluster-logs"

test/test_concurrent.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# -*- coding:utf-8 -*-
2+
import os
3+
import sys
4+
sys.path.insert(0, "..")
5+
import errno
6+
import logging
7+
from log_to_kafka.logger import FixedConcurrentRotatingFileHandler
8+
my_dir = "logs"
9+
try:
10+
os.makedirs(my_dir)
11+
except OSError as exception:
12+
if exception.errno != errno.EEXIST:
13+
raise
14+
logger = logging.getLogger()
15+
logger.setLevel(logging.DEBUG)
16+
logger.addHandler(
17+
FixedConcurrentRotatingFileHandler(
18+
os.path.join(my_dir, "test.log"),
19+
backupCount=5))
20+
logger.info("this is a log. ")

test/test_lock.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from portalocker import lock, unlock, LOCK_EX, LOCK_NB, LockException
2+
3+
a = open("logs/test.lock", "w")
4+
unlock(a)
5+
unlock(a)
6+
print 11111111
7+
lock(a, LOCK_EX)
8+
print 222222
9+
unlock(a)

test/test_logger.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from log_to_kafka import Logger
2+
3+
class MyClass(Logger):
4+
name = "log_name"
5+
def __init__(self, settings_file):
6+
super(MyClass, self).__init__(settings_file)
7+
8+
9+
MC = MyClass("default_settings.py")
10+
MC.set_logger()
11+
MC.logger.debug("....")

test/test_stream.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
import logging
2+
import sys
3+
logger = logging.getLogger()
4+
logger.setLevel(logging.DEBUG)
5+
logger.addHandler(logging.StreamHandler(sys.stdout))
6+
logger.info('aaaaa')

0 commit comments

Comments
 (0)