forked from donnemartin/gitsome
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcommands_cache.py
441 lines (390 loc) · 15.1 KB
/
commands_cache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
# -*- coding: utf-8 -*-
"""Module for caching command & alias names as well as for predicting whether
a command will be able to be run in the background.
A background predictor is a function that accepts a single argument list
and returns whether or not the process can be run in the background (returns
True) or must be run the foreground (returns False).
"""
import os
import time
import builtins
import argparse
import collections.abc as cabc
from xonsh.platform import ON_WINDOWS, ON_POSIX, pathbasename
from xonsh.tools import executables_in
from xonsh.lazyasd import lazyobject
class CommandsCache(cabc.Mapping):
"""A lazy cache representing the commands available on the file system.
The keys are the command names and the values a tuple of (loc, has_alias)
where loc is either a str pointing to the executable on the file system or
None (if no executable exists) and has_alias is a boolean flag for whether
the command has an alias.
"""
def __init__(self):
self._cmds_cache = {}
self._path_checksum = None
self._alias_checksum = None
self._path_mtime = -1
self.threadable_predictors = default_threadable_predictors()
def __contains__(self, key):
_ = self.all_commands
return self.lazyin(key)
def __iter__(self):
for cmd, (path, is_alias) in self.all_commands.items():
if ON_WINDOWS and path is not None:
# All command keys are stored in uppercase on Windows.
# This ensures the original command name is returned.
cmd = pathbasename(path)
yield cmd
def __len__(self):
return len(self.all_commands)
def __getitem__(self, key):
_ = self.all_commands
return self.lazyget(key)
def is_empty(self):
"""Returns whether the cache is populated or not."""
return len(self._cmds_cache) == 0
@staticmethod
def get_possible_names(name):
"""Generates the possible `PATHEXT` extension variants of a given executable
name on Windows as a list, conserving the ordering in `PATHEXT`.
Returns a list as `name` being the only item in it on other platforms."""
if ON_WINDOWS:
pathext = builtins.__xonsh__.env.get("PATHEXT", [])
name = name.upper()
return [name + ext for ext in ([""] + pathext)]
else:
return [name]
@staticmethod
def remove_dups(p):
ret = list()
for e in p:
if e not in ret:
ret.append(e)
return ret
@property
def all_commands(self):
paths = builtins.__xonsh__.env.get("PATH", [])
paths = CommandsCache.remove_dups(paths)
path_immut = tuple(x for x in paths if os.path.isdir(x))
# did PATH change?
path_hash = hash(path_immut)
cache_valid = path_hash == self._path_checksum
self._path_checksum = path_hash
# did aliases change?
alss = getattr(builtins, "aliases", dict())
al_hash = hash(frozenset(alss))
cache_valid = cache_valid and al_hash == self._alias_checksum
self._alias_checksum = al_hash
# did the contents of any directory in PATH change?
max_mtime = 0
for path in path_immut:
mtime = os.stat(path).st_mtime
if mtime > max_mtime:
max_mtime = mtime
cache_valid = cache_valid and (max_mtime <= self._path_mtime)
self._path_mtime = max_mtime
if cache_valid:
return self._cmds_cache
allcmds = {}
for path in reversed(path_immut):
# iterate backwards so that entries at the front of PATH overwrite
# entries at the back.
for cmd in executables_in(path):
key = cmd.upper() if ON_WINDOWS else cmd
allcmds[key] = (os.path.join(path, cmd), alss.get(key, None))
for cmd in alss:
if cmd not in allcmds:
key = cmd.upper() if ON_WINDOWS else cmd
allcmds[key] = (cmd, True)
self._cmds_cache = allcmds
return allcmds
def cached_name(self, name):
"""Returns the name that would appear in the cache, if it exists."""
if name is None:
return None
cached = pathbasename(name)
if ON_WINDOWS:
keys = self.get_possible_names(cached)
cached = next((k for k in keys if k in self._cmds_cache), None)
return cached
def lazyin(self, key):
"""Checks if the value is in the current cache without the potential to
update the cache. It just says whether the value is known *now*. This
may not reflect precisely what is on the $PATH.
"""
return self.cached_name(key) in self._cmds_cache
def lazyiter(self):
"""Returns an iterator over the current cache contents without the
potential to update the cache. This may not reflect what is on the
$PATH.
"""
return iter(self._cmds_cache)
def lazylen(self):
"""Returns the length of the current cache contents without the
potential to update the cache. This may not reflect precisely
what is on the $PATH.
"""
return len(self._cmds_cache)
def lazyget(self, key, default=None):
"""A lazy value getter."""
return self._cmds_cache.get(self.cached_name(key), default)
def locate_binary(self, name, ignore_alias=False):
"""Locates an executable on the file system using the cache.
Arguments
---------
name : str
name of binary to search for
ignore_alias : bool, optional
Force return of binary path even if alias of ``name`` exists
(default ``False``)
"""
# make sure the cache is up to date by accessing the property
_ = self.all_commands
return self.lazy_locate_binary(name, ignore_alias)
def lazy_locate_binary(self, name, ignore_alias=False):
"""Locates an executable in the cache, without checking its validity.
Arguments
---------
name : str
name of binary to search for
ignore_alias : bool, optional
Force return of binary path even if alias of ``name`` exists
(default ``False``)
"""
possibilities = self.get_possible_names(name)
if ON_WINDOWS:
# Windows users expect to be able to execute files in the same
# directory without `./`
local_bin = next((fn for fn in possibilities if os.path.isfile(fn)), None)
if local_bin:
return os.path.abspath(local_bin)
cached = next((cmd for cmd in possibilities if cmd in self._cmds_cache), None)
if cached:
(path, alias) = self._cmds_cache[cached]
ispure = path == pathbasename(path)
if alias and ignore_alias and ispure:
# pure alias, which we are ignoring
return None
else:
return path
elif os.path.isfile(name) and name != pathbasename(name):
return name
def is_only_functional_alias(self, name):
"""Returns whether or not a command is only a functional alias, and has
no underlying executable. For example, the "cd" command is only available
as a functional alias.
"""
_ = self.all_commands
return self.lazy_is_only_functional_alias(name)
def lazy_is_only_functional_alias(self, name):
"""Returns whether or not a command is only a functional alias, and has
no underlying executable. For example, the "cd" command is only available
as a functional alias. This search is performed lazily.
"""
val = self._cmds_cache.get(name, None)
if val is None:
return False
return (
val == (name, True) and self.locate_binary(name, ignore_alias=True) is None
)
def predict_threadable(self, cmd):
"""Predicts whether a command list is able to be run on a background
thread, rather than the main thread.
"""
name = self.cached_name(cmd[0])
predictors = self.threadable_predictors
if ON_WINDOWS:
# On all names (keys) are stored in upper case so instead
# we get the original cmd or alias name
path, _ = self.lazyget(name, (None, None))
if path is None:
return True
else:
name = pathbasename(path)
if name not in predictors:
pre, ext = os.path.splitext(name)
if pre in predictors:
predictors[name] = predictors[pre]
if name not in predictors:
predictors[name] = self.default_predictor(name, cmd[0])
predictor = predictors[name]
return predictor(cmd[1:])
#
# Background Predictors (as methods)
#
def default_predictor(self, name, cmd0):
if ON_POSIX:
return self.default_predictor_readbin(
name, cmd0, timeout=0.1, failure=predict_true
)
else:
return predict_true
def default_predictor_readbin(self, name, cmd0, timeout, failure):
"""Make a default predictor by
analyzing the content of the binary. Should only works on POSIX.
Return failure if the analysis fails.
"""
fname = cmd0 if os.path.isabs(cmd0) else None
fname = cmd0 if fname is None and os.sep in cmd0 else fname
fname = self.lazy_locate_binary(name) if fname is None else fname
if fname is None:
return failure
if not os.path.isfile(fname):
return failure
try:
fd = os.open(fname, os.O_RDONLY | os.O_NONBLOCK)
except Exception:
return failure # opening error
search_for = {
(b"ncurses",): [False],
(b"libgpm",): [False],
(b"isatty", b"tcgetattr", b"tcsetattr"): [False, False, False],
}
tstart = time.time()
block = b""
while time.time() < tstart + timeout:
previous_block = block
try:
block = os.read(fd, 2048)
except Exception:
# should not occur, except e.g. if a file is deleted a a dir is
# created with the same name between os.path.isfile and os.open
os.close(fd)
return failure
if len(block) == 0:
os.close(fd)
return predict_true # no keys of search_for found
analyzed_block = previous_block + block
for k, v in search_for.items():
for i in range(len(k)):
if v[i]:
continue
if k[i] in analyzed_block:
v[i] = True
if all(v):
os.close(fd)
return predict_false # use one key of search_for
os.close(fd)
return failure # timeout
#
# Background Predictors
#
def predict_true(args):
"""Always say the process is threadable."""
return True
def predict_false(args):
"""Never say the process is threadable."""
return False
@lazyobject
def SHELL_PREDICTOR_PARSER():
p = argparse.ArgumentParser("shell", add_help=False)
p.add_argument("-c", nargs="?", default=None)
p.add_argument("filename", nargs="?", default=None)
return p
def predict_shell(args):
"""Predict the backgroundability of the normal shell interface, which
comes down to whether it is being run in subproc mode.
"""
ns, _ = SHELL_PREDICTOR_PARSER.parse_known_args(args)
if ns.c is None and ns.filename is None:
pred = False
else:
pred = True
return pred
@lazyobject
def HELP_VER_PREDICTOR_PARSER():
p = argparse.ArgumentParser("cmd", add_help=False)
p.add_argument("-h", "--help", dest="help", action="store_true", default=None)
p.add_argument(
"-v", "-V", "--version", dest="version", action="store_true", default=None
)
return p
def predict_help_ver(args):
"""Predict the backgroundability of commands that have help & version
switches: -h, --help, -v, -V, --version. If either of these options is
present, the command is assumed to print to stdout normally and is therefore
threadable. Otherwise, the command is assumed to not be threadable.
This is useful for commands, like top, that normally enter alternate mode
but may not in certain circumstances.
"""
ns, _ = HELP_VER_PREDICTOR_PARSER.parse_known_args(args)
pred = ns.help is not None or ns.version is not None
return pred
@lazyobject
def HG_PREDICTOR_PARSER():
p = argparse.ArgumentParser("hg", add_help=False)
p.add_argument("command")
p.add_argument(
"-i", "--interactive", action="store_true", default=False, dest="interactive"
)
return p
def predict_hg(args):
"""Predict if mercurial is about to be run in interactive mode.
If it is interactive, predict False. If it isn't, predict True.
Also predict False for certain commands, such as split.
"""
ns, _ = HG_PREDICTOR_PARSER.parse_known_args(args)
if ns.command == "split":
return False
else:
return not ns.interactive
def default_threadable_predictors():
"""Generates a new defaultdict for known threadable predictors.
The default is to predict true.
"""
# alphabetical, for what it is worth.
predictors = {
"aurman": predict_false,
"bash": predict_shell,
"csh": predict_shell,
"clear": predict_false,
"cls": predict_false,
"cmd": predict_shell,
"cryptop": predict_false,
"curl": predict_true,
"ex": predict_false,
"emacsclient": predict_false,
"fish": predict_shell,
"gvim": predict_help_ver,
"hg": predict_hg,
"htop": predict_help_ver,
"ipython": predict_shell,
"ksh": predict_shell,
"less": predict_help_ver,
"ls": predict_true,
"man": predict_help_ver,
"more": predict_help_ver,
"mvim": predict_help_ver,
"mutt": predict_help_ver,
"nano": predict_help_ver,
"nvim": predict_false,
"ponysay": predict_help_ver,
"psql": predict_false,
"python": predict_shell,
"python2": predict_shell,
"python3": predict_shell,
"repo": predict_help_ver,
"ranger": predict_help_ver,
"rview": predict_false,
"rvim": predict_false,
"scp": predict_false,
"sh": predict_shell,
"ssh": predict_false,
"startx": predict_false,
"sudo": predict_help_ver,
"tcsh": predict_shell,
"telnet": predict_false,
"top": predict_help_ver,
"vi": predict_false,
"view": predict_false,
"vim": predict_false,
"vimpager": predict_help_ver,
"weechat": predict_help_ver,
"xclip": predict_help_ver,
"xo": predict_help_ver,
"xonsh": predict_shell,
"xon.sh": predict_shell,
"zsh": predict_shell,
}
return predictors