-
Notifications
You must be signed in to change notification settings - Fork 167
/
Copy pathtest.py
677 lines (524 loc) · 19.3 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
#!/usr/bin/python3
# -*- coding: utf-8 -*-
# key-mapper - GUI for device specific keyboard mappings
# Copyright (C) 2021 sezanzeb <proxima@sezanzeb.de>
#
# This file is part of key-mapper.
#
# key-mapper is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# key-mapper is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with key-mapper. If not, see <https://www.gnu.org/licenses/>.
"""Sets up key-mapper for the tests and runs them."""
import os
import sys
import shutil
import time
import copy
import unittest
import subprocess
import multiprocessing
import asyncio
import psutil
from pickle import UnpicklingError
from unittest.mock import patch
import evdev
import gi
gi.require_version("Gtk", "3.0")
gi.require_version("GLib", "2.0")
from xmodmap import xmodmap
assert not os.getcwd().endswith("tests")
os.environ["UNITTEST"] = "1"
def grey_log(*msgs):
print(f'\033[90m{" ".join(msgs)}\033[0m')
def is_service_running():
"""Check if the daemon is running."""
try:
subprocess.check_output(["pgrep", "-f", "key-mapper-service"])
return True
except subprocess.CalledProcessError:
return False
def join_children():
"""Wait for child processes to exit. Stop them if it takes too long."""
this = psutil.Process(os.getpid())
i = 0
time.sleep(EVENT_READ_TIMEOUT)
children = this.children(recursive=True)
while len([c for c in children if c.status() != "zombie"]) > 0:
for child in children:
if i > 10:
child.kill()
grey_log(f"Killed pid {child.pid} because it didn't finish in time")
children = this.children(recursive=True)
time.sleep(EVENT_READ_TIMEOUT)
i += 1
if is_service_running():
# let tests control daemon existance
raise Exception("Expected the service not to be running already.")
# make sure the "tests" module visible
sys.path.append(os.getcwd())
# give tests some time to test stuff while the process
# is still running
EVENT_READ_TIMEOUT = 0.01
# based on experience how much time passes at most until
# the helper starts receiving previously pushed events after a
# call to start_reading
START_READING_DELAY = 0.05
# for joysticks
MIN_ABS = -(2 ** 15)
MAX_ABS = 2 ** 15
tmp = "/tmp/key-mapper-test"
uinput_write_history = []
# for tests that makes the injector create its processes
uinput_write_history_pipe = multiprocessing.Pipe()
pending_events = {}
if os.path.exists(tmp):
shutil.rmtree(tmp)
def read_write_history_pipe():
"""convert the write history from the pipe to some easier to manage list"""
history = []
while uinput_write_history_pipe[0].poll():
event = uinput_write_history_pipe[0].recv()
history.append((event.type, event.code, event.value))
return history
# key-mapper is only interested in devices that have EV_KEY, add some
# random other stuff to test that they are ignored.
phys_foo = "usb-0000:03:00.0-1/input2"
info_foo = evdev.device.DeviceInfo(1, 1, 1, 1)
keyboard_keys = sorted(evdev.ecodes.keys.keys())[:255]
fixtures = {
"/dev/input/event1": {
"capabilities": {
evdev.ecodes.EV_KEY: [evdev.ecodes.KEY_A],
},
"phys": "usb-0000:03:00.0-0/input1",
"info": info_foo,
"name": "Foo Device",
},
# Another "Foo Device", which will get an incremented key.
# If possible write tests using this one, because name != key here and
# that would be important to test as well. Otherwise the tests can't
# see if the groups correct attribute is used in functions and paths.
"/dev/input/event11": {
"capabilities": {
evdev.ecodes.EV_KEY: [evdev.ecodes.BTN_LEFT],
evdev.ecodes.EV_REL: [
evdev.ecodes.REL_X,
evdev.ecodes.REL_Y,
evdev.ecodes.REL_WHEEL,
evdev.ecodes.REL_HWHEEL,
],
},
"phys": f"{phys_foo}/input2",
"info": info_foo,
"name": "Foo Device foo",
"group_key": "Foo Device 2", # expected key
},
"/dev/input/event10": {
"capabilities": {evdev.ecodes.EV_KEY: keyboard_keys},
"phys": f"{phys_foo}/input3",
"info": info_foo,
"name": "Foo Device",
"group_key": "Foo Device 2",
},
"/dev/input/event13": {
"capabilities": {evdev.ecodes.EV_KEY: [], evdev.ecodes.EV_SYN: []},
"phys": f"{phys_foo}/input1",
"info": info_foo,
"name": "Foo Device",
"group_key": "Foo Device 2",
},
"/dev/input/event14": {
"capabilities": {evdev.ecodes.EV_SYN: []},
"phys": f"{phys_foo}/input0",
"info": info_foo,
"name": "Foo Device qux",
"group_key": "Foo Device 2",
},
# Bar Device
"/dev/input/event20": {
"capabilities": {evdev.ecodes.EV_KEY: keyboard_keys},
"phys": "usb-0000:03:00.0-2/input1",
"info": evdev.device.DeviceInfo(2, 1, 2, 1),
"name": "Bar Device",
},
"/dev/input/event30": {
"capabilities": {
evdev.ecodes.EV_SYN: [],
evdev.ecodes.EV_ABS: [
evdev.ecodes.ABS_X,
evdev.ecodes.ABS_Y,
evdev.ecodes.ABS_RX,
evdev.ecodes.ABS_RY,
evdev.ecodes.ABS_Z,
evdev.ecodes.ABS_RZ,
evdev.ecodes.ABS_HAT0X,
],
evdev.ecodes.EV_KEY: [evdev.ecodes.BTN_A],
},
"phys": "", # this is empty sometimes
"info": evdev.device.DeviceInfo(3, 1, 3, 1),
"name": "gamepad",
},
# device that is completely ignored
"/dev/input/event31": {
"capabilities": {evdev.ecodes.EV_SYN: []},
"phys": "usb-0000:03:00.0-4/input1",
"info": evdev.device.DeviceInfo(4, 1, 4, 1),
"name": "Power Button",
},
# key-mapper devices are not displayed in the ui, some instance
# of key-mapper started injecting apparently.
"/dev/input/event40": {
"capabilities": {evdev.ecodes.EV_KEY: keyboard_keys},
"phys": "key-mapper/input1",
"info": evdev.device.DeviceInfo(5, 1, 5, 1),
"name": "key-mapper Bar Device",
},
# denylisted
"/dev/input/event51": {
"capabilities": {evdev.ecodes.EV_KEY: keyboard_keys},
"phys": "usb-0000:03:00.0-5/input1",
"info": evdev.device.DeviceInfo(6, 1, 6, 1),
"name": "YuBiCofooYuBiKeYbar",
},
}
def setup_pipe(group_key):
"""Create a pipe that can be used to send events to the helper,
which in turn will be sent to the reader
"""
if pending_events.get(group_key) is None:
pending_events[group_key] = multiprocessing.Pipe()
# make sure those pipes exist before any process (the helper) gets forked,
# so that events can be pushed after the fork.
for fixture in fixtures.values():
if "group_key" in fixture:
setup_pipe(fixture["group_key"])
def get_events():
"""Get all events written by the injector."""
return uinput_write_history
def push_event(group_key, event):
"""Make a device act like it is reading events from evdev.
push_event is like hitting a key on a keyboard for stuff that reads from
evdev.InputDevice (which is patched in test.py to work that way)
Parameters
----------
group_key : string
For example 'Foo Device'
event : InputEvent
"""
setup_pipe(group_key)
pending_events[group_key][0].send(event)
def push_events(group_key, events):
"""Push multiple events"""
for event in events:
push_event(group_key, event)
def new_event(type, code, value, timestamp=None, offset=0):
"""Create a new input_event."""
if timestamp is None:
timestamp = time.time() + offset
sec = int(timestamp)
usec = timestamp % 1 * 1000000
event = evdev.InputEvent(sec, usec, type, code, value)
return event
def patch_paths():
from keymapper import paths
paths.CONFIG_PATH = "/tmp/key-mapper-test"
class InputDevice:
# expose as existing attribute, otherwise the patch for
# evdev < 1.0.0 will crash the test
path = None
def __init__(self, path):
if path != "justdoit" and path not in fixtures:
raise FileNotFoundError()
self.path = path
fixture = fixtures.get(path, {})
self.phys = fixture.get("phys", "unset")
self.info = fixture.get("info", evdev.device.DeviceInfo(None, None, None, None))
self.name = fixture.get("name", "unset")
# this property exists only for test purposes and is not part of
# the original evdev.InputDevice class
self.group_key = fixture.get("group_key", self.name)
# ensure a pipe exists to make this object act like
# it is reading events from a device
setup_pipe(self.group_key)
self.fd = pending_events[self.group_key][1].fileno()
def push_events(self, events):
push_events(self.group_key, events)
def fileno(self):
"""Compatibility to select.select."""
return self.fd
def log(self, key, msg):
grey_log(f'{msg} "{self.name}" "{self.path}" {key}')
def absinfo(self, *args):
raise Exception("Ubuntus version of evdev doesn't support .absinfo")
def grab(self):
grey_log("grab", self.name, self.path)
def ungrab(self):
grey_log("ungrab", self.name, self.path)
async def async_read_loop(self):
if pending_events.get(self.group_key) is None:
self.log("no events to read", self.group_key)
return
# consume all of them
while pending_events[self.group_key][1].poll():
result = pending_events[self.group_key][1].recv()
self.log(result, "async_read_loop")
yield result
await asyncio.sleep(0.01)
# doesn't loop endlessly in order to run tests for the injector in
# the main process
def read(self):
# the patched fake InputDevice objects read anything pending from
# that group.
# To be realistic it would have to check if the provided
# element is in its capabilities.
if self.group_key not in pending_events:
self.log("no events to read", self.group_key)
return
# consume all of them
while pending_events[self.group_key][1].poll():
event = pending_events[self.group_key][1].recv()
self.log(event, "read")
yield event
time.sleep(EVENT_READ_TIMEOUT)
def read_loop(self):
"""Endless loop that yields events."""
while True:
event = pending_events[self.group_key][1].recv()
if event is not None:
self.log(event, "read_loop")
yield event
time.sleep(EVENT_READ_TIMEOUT)
def read_one(self):
"""Read one event or none if nothing available."""
if pending_events.get(self.group_key) is None:
return None
if len(pending_events[self.group_key]) == 0:
return None
time.sleep(EVENT_READ_TIMEOUT)
try:
event = pending_events[self.group_key][1].recv()
except (UnpicklingError, EOFError):
# failed in tests sometimes
return None
self.log(event, "read_one")
return event
def capabilities(self, absinfo=True, verbose=False):
result = copy.deepcopy(fixtures[self.path]["capabilities"])
if absinfo and evdev.ecodes.EV_ABS in result:
absinfo_obj = evdev.AbsInfo(
value=None,
min=MIN_ABS,
fuzz=None,
flat=None,
resolution=None,
max=MAX_ABS,
)
result[evdev.ecodes.EV_ABS] = [
(stuff, absinfo_obj) for stuff in result[evdev.ecodes.EV_ABS]
]
return result
uinputs = {}
class UInput:
def __init__(self, events=None, name="unnamed", *args, **kwargs):
self.fd = 0
self.write_count = 0
self.device = InputDevice("justdoit")
self.name = name
self.events = events
self.write_history = []
global uinputs
uinputs[name] = self
def capabilities(self, *args, **kwargs):
return self.events
def write(self, type, code, value):
self.write_count += 1
event = new_event(type, code, value)
uinput_write_history.append(event)
uinput_write_history_pipe[1].send(event)
self.write_history.append(event)
grey_log(f"{(type, code, value)} written")
def syn(self):
pass
class InputEvent(evdev.InputEvent):
def __init__(self, sec, usec, type, code, value):
self.t = (type, code, value)
super().__init__(sec, usec, type, code, value)
def copy(self):
return InputEvent(self.sec, self.usec, self.type, self.code, self.value)
def patch_evdev():
def list_devices():
return fixtures.keys()
evdev.list_devices = list_devices
evdev.InputDevice = InputDevice
evdev.UInput = UInput
evdev.InputEvent = InputEvent
def patch_events():
# improve logging of stuff
evdev.InputEvent.__str__ = lambda self: (
f"InputEvent{(self.type, self.code, self.value)}"
)
def patch_os_system():
"""Avoid running pkexec."""
original_system = os.system
def system(command):
if "pkexec" in command:
# because it
# - will open a window for user input
# - has no knowledge of the fixtures and patches
raise Exception("Write patches to avoid running pkexec stuff")
return original_system(command)
os.system = system
def patch_check_output():
"""xmodmap -pke should always return a fixed set of symbols.
On some installations the `xmodmap` command might be missig completely,
which would break the tests.
"""
original_check_output = subprocess.check_output
def check_output(command, *args, **kwargs):
if "xmodmap" in command and "-pke" in command:
return xmodmap
return original_check_output(command, *args, **kwargs)
subprocess.check_output = check_output
def clear_write_history():
"""Empty the history in preparation for the next test."""
while len(uinput_write_history) > 0:
uinput_write_history.pop()
while uinput_write_history_pipe[0].poll():
uinput_write_history_pipe[0].recv()
# quickly fake some stuff before any other file gets a chance to import
# the original versions
patch_paths()
patch_evdev()
patch_events()
patch_os_system()
patch_check_output()
from keymapper.logger import update_verbosity
update_verbosity(True)
from keymapper.injection.injector import Injector
from keymapper.config import config
from keymapper.gui.reader import reader
from keymapper.groups import groups
from keymapper.state import system_mapping, custom_mapping
from keymapper.paths import get_config_path
from keymapper.injection.macros import macro_variables
from keymapper.injection.consumers.keycode_mapper import active_macros, unreleased
# no need for a high number in tests
Injector.regrab_timeout = 0.05
_fixture_copy = copy.deepcopy(fixtures)
environ_copy = copy.deepcopy(os.environ)
def send_event_to_reader(event):
"""Act like the helper and send input events to the reader."""
reader._results._unread.append(
{
"type": "event",
"message": (event.sec, event.usec, event.type, event.code, event.value),
}
)
def quick_cleanup(log=True):
"""Reset the applications state."""
if log:
print("quick cleanup")
for device in list(pending_events.keys()):
try:
while pending_events[device][1].poll():
pending_events[device][1].recv()
except (UnpicklingError, EOFError):
pass
# setup new pipes for the next test
pending_events[device] = None
setup_pipe(device)
try:
reader.terminate()
except (BrokenPipeError, OSError):
pass
try:
if asyncio.get_event_loop().is_running():
for task in asyncio.all_tasks():
task.cancel()
except RuntimeError:
# happens when the event loop disappears for magical reasons
# create a fresh event loop
asyncio.set_event_loop(asyncio.new_event_loop())
if not macro_variables.process.is_alive():
raise AssertionError("the SharedDict manager is not running anymore")
macro_variables._stop()
join_children()
macro_variables._start()
if os.path.exists(tmp):
shutil.rmtree(tmp)
config.path = os.path.join(get_config_path(), "config.json")
config.clear_config()
config.save_config()
system_mapping.populate()
custom_mapping.empty()
custom_mapping.clear_config()
custom_mapping.changed = False
clear_write_history()
for name in list(uinputs.keys()):
del uinputs[name]
for device in list(active_macros.keys()):
del active_macros[device]
for device in list(unreleased.keys()):
del unreleased[device]
for path in list(fixtures.keys()):
if path not in _fixture_copy:
del fixtures[path]
for path in list(_fixture_copy.keys()):
fixtures[path] = copy.deepcopy(_fixture_copy[path])
os.environ.update(environ_copy)
for device in list(os.environ.keys()):
if device not in environ_copy:
del os.environ[device]
reader.clear()
for _, pipe in pending_events.values():
assert not pipe.poll()
assert macro_variables.is_alive(1)
def cleanup():
"""Reset the applications state.
Using this is slower, usually quick_cleanup() is sufficient.
"""
print("cleanup")
os.system("pkill -f key-mapper-service")
os.system("pkill -f key-mapper-control")
time.sleep(0.05)
quick_cleanup(log=False)
groups.refresh()
def spy(obj, name):
"""Convenient wrapper for patch.object(..., ..., wraps=...)."""
return patch.object(obj, name, wraps=obj.__getattribute__(name))
def main():
cleanup()
modules = sys.argv[1:]
# discoverer is really convenient, but it can't find a specific test
# in all of the available tests like unittest.main() does...,
# so provide both options.
if len(modules) > 0:
# for example
# `tests/test.py test_integration.TestIntegration.test_can_start`
# or `tests/test.py test_integration test_daemon`
testsuite = unittest.defaultTestLoader.loadTestsFromNames(
[f"testcases.{module}" for module in modules]
)
else:
# run all tests by default
testsuite = unittest.defaultTestLoader.discover("testcases", pattern="*.py")
# add a newline to each "qux (foo.bar)..." output before each test,
# because the first log will be on the same line otherwise
original_start_test = unittest.TextTestResult.startTest
def start_test(self, test):
original_start_test(self, test)
print()
unittest.TextTestResult.startTest = start_test
unittest.TextTestRunner(verbosity=2).run(testsuite)
if __name__ == "__main__":
main()