This repository has been archived by the owner on Jan 23, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 110
/
Copy pathtest_process_misc.py
63 lines (52 loc) · 1.65 KB
/
test_process_misc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import pytest
import time
import fiber
from fiber import Process
from fiber.queues import Pipe, SimpleQueue
import docker
def get_current_pid(q):
import fiber
pid = q.get()
assert fiber.current_process().pid == pid
@pytest.fixture(scope="module")
def client():
return docker.from_env()
class TestMisc():
@pytest.fixture(autouse=True)
def check_leak(self, request, client):
assert fiber.active_children() == []
if fiber.config.default_backend != "docker":
yield
return
pre_workers = client.containers.list()
yield
post_workers = client.containers.list()
assert pre_workers == post_workers
def test_active_children(self):
# Reset fiber module so other processes won't inteference with
# current run
import importlib
importlib.reload(fiber)
fiber.process._children = set()
try:
p1 = Process(target=time.sleep, args=(1,), name="test_active_children1")
p1.start()
p2 = Process(target=time.sleep, args=(1,), name="test_active_children2")
p2.start()
assert len(fiber.active_children()) == 2
finally:
p1.join()
p2.join()
def test_cpu_count(self):
c = fiber.cpu_count()
assert c > 0 and c != 0
def test_current_process(self):
c = fiber.current_process()
assert c.pid > 0
def test_current_process_nested(self):
q = SimpleQueue()
p = Process(target=get_current_pid, args=(q,), name="test_active_children")
p.start()
q.put(p.pid)
p.join()
assert p.exitcode == 0