-
Notifications
You must be signed in to change notification settings - Fork 0
/
docker_events.py
97 lines (73 loc) · 2.79 KB
/
docker_events.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import json
import os
import subprocess
import threading
import docker
import yaml
from logger import logger
handing_event = False
def load_config():
"""加载配置文件"""
path_search = (
'config.yaml',
'/config/config.yaml'
)
file_path = list(path for path in path_search if os.path.exists(path))
if len(file_path) == 0:
raise '配置文件不存在'
with open(file_path[0], 'r', encoding='utf-8') as file:
config = yaml.safe_load(file)
parse_config(config=config)
def parse_config(config: json):
"""解析配置文件"""
# 所有容器
containers = config['containers']
valid_entity = {}
for container in containers:
name: str = container['name']
events: {str: str} = container['events']
# 过滤脚本文件存在的事件
valid_events = {key: value for key, value in events.items() if os.path.exists(value)}
if len(valid_events) == 0:
logger.debug(f'容器{name}没有有效的事件')
else:
valid_entity[name] = valid_events
logger.debug(f'有效事件 {name}: {valid_events}')
bind_event(names=list(valid_entity.keys()), valid_events=valid_entity)
def bind_event(names: [str], valid_events: {}):
"""绑定事件"""
global handing_event
client = docker.from_env()
events = client.events(filters={"container": names}, decode=True)
try:
for event in events:
# 检查是否正在处理事件,如果是,则跳过后续事件处理
if handing_event:
logger.debug('事件处理中,跳过...')
continue
thread = threading.Thread(target=handle_event, args=(event, names, valid_events))
thread.start()
except KeyboardInterrupt:
logger.info('程序已停止')
def handle_event(event, names, valid_events):
"""处理事件"""
logger.debug(f'事件:{event}')
global handing_event
name = event['Actor']['Attributes']['name']
if name in names:
event_model = valid_events[name]
if event['status'] in list(event_model.keys()):
logger.debug(f'捕获事件:%s, %s, %s', name, event.get('status'), event_model[event["status"]])
handing_event = True
run_command(f'chmod 777 {event_model[event["status"]]}')
run_command(f'sh {event_model[event["status"]]}')
handing_event = False
def run_command(command):
"""运行命令"""
try:
result = subprocess.run(command, shell=True, capture_output=True, text=True)
logger.debug('Command Output: %s', result.stdout)
if result.stderr:
logger.error("Command Error: %s", result.stderr)
except Exception as e:
logger.error("Command Exception: %s", str(e))