forked from eWaterCycle/amuse-framework
-
Notifications
You must be signed in to change notification settings - Fork 0
/
monitor.py
174 lines (139 loc) · 4.94 KB
/
monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
from __future__ import print_function
import time
try: # Python 2
import urlparse
from StringIO import StringIO
except ImportError: # Python 3
from urllib import parse as urlparse
from io import StringIO
import threading
import traceback
import json
import nose
import sys
import linecache
import inspect
import os.path
from mpi4py import MPI
from nose.plugins.capture import Capture
from nose.plugins.skip import Skip, SkipTest
from nose.core import TestProgram
from multiprocessing import Process, Queue
from optparse import OptionParser
from subprocess import call, Popen, PIPE
class MonitoredFile(object):
def __init__(self, path, container):
self.path = path
self.container = container
self.timestamp = self.get_last_modification_time()
def is_file(self):
return True
def check(self, monitor):
if not os.path.exists(self.path):
self.container.remove(self)
monitor.deleted(self)
return
measured_timestamp = self.get_last_modification_time()
if measured_timestamp < 0:
self.container.remove(self)
monitor.errored(self)
return
if self.timestamp < measured_timestamp:
self.timestamp = measured_timestamp
monitor.updated(self)
return
monitor.unchanged(self)
def get_last_modification_time(self):
try:
statinfo = os.stat(self.path)
return statinfo.st_mtime
except:
return -1
def walk(self, monitor):
monitor.found(self)
class MonitoredDirectory(object):
def __init__(self, path, container = None):
self.path = path
self.elements = []
self.container = container
self.path_to_element = {}
self.setup_from_filesystem()
def is_file(self):
return False
def setup_from_filesystem(self):
names = os.listdir(self.path)
for name in names:
path = os.path.join(self.path, name)
if os.path.islink(path):
continue
element = self.new_element(path)
self.elements.append(element)
self.path_to_element[path] = element
def new_element(self, path):
if os.path.isdir(path):
return MonitoredDirectory(path, self)
else:
return MonitoredFile(path, self)
def remove(self, element):
self.elements.remove(element)
del self.path_to_element[element.path]
def check(self, monitor):
if not os.path.exists(self.path):
if not self.container is None:
self.container.remove(self)
monitor.deleted(self)
return
for x in self.elements:
x.check(monitor)
names = os.listdir(self.path)
for name in names:
path = os.path.join(self.path, name)
if not path in self.path_to_element:
element = self.new_element(path)
monitor.created(element)
self.elements.append(element)
self.path_to_element[path] = element
def walk(self, monitor):
monitor.found(self)
for x in self.elements:
x.walk(monitor)
class MonitorDirectories(object):
def __init__(self, paths):
self.elements = [MonitoredDirectory(path) for path in paths]
self.changed = False
self.updated_elements = []
def check(self):
self.changed = False
self.updated_elements = []
for x in self.elements:
x.check(self)
def deleted(self, monitored_element):
if not self.must_monitor_file(monitored_element):
return
self.changed = True
def created(self, monitored_element):
if not self.must_monitor_file(monitored_element):
return
self.changed = True
def unchanged(self, monitored_element):
pass
def errored(self, monitored_element):
print("error while monitoring file: ", monitored_element.path)
def updated(self, monitored_element):
if not self.must_monitor_file(monitored_element):
return
self.changed = True
self.updated_elements.append(monitored_element)
def walk(self, callback_function):
self.callback_function = callback_function
for x in self.elements:
x.walk(self)
def found(self, monitored_element):
if not self.must_monitor_file(monitored_element):
return
self.callback_function(monitored_element)
def must_monitor_file(self, monitored_element):
return (
monitored_element.path.endswith('.py') and
not os.path.basename(monitored_element.path).startswith('.')
)