Skip to content

Commit 28071cd

Browse files
author
Anselm Kruis
committed
Test some functions of the module stackless
1 parent cfdfbf3 commit 28071cd

File tree

1 file changed

+191
-0
lines changed

1 file changed

+191
-0
lines changed
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright (c) 2014 by Anselm Kruis
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
#
18+
19+
from __future__ import absolute_import, print_function, division
20+
21+
import unittest
22+
import stackless
23+
import sys
24+
import thread
25+
import threading
26+
import subprocess
27+
from stackless_testsuite.util import StacklessTestCase
28+
29+
if __name__ == '__main__':
30+
import stackless_testsuite.v3_1 # @NoMove @UnusedImport
31+
__package__ = "stackless_testsuite.v3_1" # @ReservedAssignment
32+
33+
if hasattr(threading, "main_thread"):
34+
# Python 3.4 and later
35+
def main_thread_id():
36+
return threading.main_thread().ident # @UndefinedVariable
37+
else:
38+
def main_thread_id():
39+
return threading._shutdown.__self__.ident # @UndefinedVariable
40+
41+
42+
class AtomicTest(StacklessTestCase):
43+
"""Test the atomic context manager
44+
"""
45+
46+
def testAtomicCtxt(self):
47+
old = stackless.getcurrent().atomic
48+
stackless.getcurrent().set_atomic(False)
49+
try:
50+
with stackless.atomic():
51+
self.assertTrue(stackless.getcurrent().atomic)
52+
finally:
53+
stackless.getcurrent().set_atomic(old)
54+
55+
def testAtomicNopCtxt(self):
56+
old = stackless.getcurrent().atomic
57+
stackless.getcurrent().set_atomic(True)
58+
try:
59+
with stackless.atomic():
60+
self.assertTrue(stackless.getcurrent().atomic)
61+
finally:
62+
stackless.getcurrent().set_atomic(old)
63+
64+
65+
class OtherTestCases(StacklessTestCase):
66+
"""Test various functions and computed attributes of the module stackless"""
67+
68+
def testStackless(self):
69+
# test for the reference to itself
70+
self.assertIs(stackless, stackless.stackless)
71+
72+
def testMain(self):
73+
"""test stackless.main"""
74+
main1 = stackless.main # @UndefinedVariable
75+
main2 = stackless.getmain()
76+
self.assertIs(main1, main2)
77+
self.assertIsInstance(main1, stackless.tasklet)
78+
79+
def testCurrent(self):
80+
"""test stackless.current - part 1"""
81+
current1 = stackless.current # @UndefinedVariable
82+
current2 = stackless.getcurrent()
83+
self.assertIs(current1, current2)
84+
self.assertIsInstance(current1, stackless.tasklet)
85+
86+
def testCurrent2(self):
87+
"""test stackless.current - part 2"""
88+
current = []
89+
90+
def f():
91+
current.append(stackless.current) # @UndefinedVariable
92+
93+
task = stackless.tasklet().bind(f, ())
94+
task.run()
95+
self.assertIs(current[0], task)
96+
self.assertIsNot(task, stackless.current) # @UndefinedVariable
97+
98+
def testMainIsCurrentInThread(self):
99+
"""test, that stackless.main is stackless.current in a new thread"""
100+
def f():
101+
self.assertIs(stackless.main, stackless.current) # @UndefinedVariable
102+
t = threading.Thread(target=f)
103+
t.start()
104+
t.join()
105+
106+
def testMainIsCurrentInMainThread(self):
107+
"""test, that stackless.main is stackless.current in a new interpreter process"""
108+
output = subprocess.check_output([sys.executable, "-E", "-c", "import stackless; print(stackless.main is stackless.current)"])
109+
self.assertEqual(output[:4], b"True")
110+
111+
def testRuncount(self):
112+
"""Test stackless.runcount. It is a per thread value"""
113+
rc1 = stackless.runcount # @UndefinedVariable
114+
rc2 = stackless.getruncount()
115+
self.assertIsInstance(rc1, int)
116+
self.assertIsInstance(rc2, int)
117+
self.assertEqual(rc1, rc2)
118+
self.assertGreaterEqual(rc1, 1)
119+
120+
c = []
121+
122+
def f():
123+
c.append(stackless.runcount) # @UndefinedVariable
124+
c.append(stackless.getruncount())
125+
126+
tlet = stackless.tasklet(f)()
127+
128+
# runcount is now at least 2
129+
self.assertEqual(stackless.runcount, rc1 + 1) # @UndefinedVariable
130+
self.assertEqual(stackless.getruncount(), rc1 + 1)
131+
132+
t = threading.Thread(target=f)
133+
t.start()
134+
t.join()
135+
136+
self.assertListEqual(c, [1, 1])
137+
138+
tlet.run()
139+
140+
self.assertListEqual(c, [1, 1, rc1 + 1, rc1 + 1])
141+
self.assertEqual(stackless.runcount, rc1) # @UndefinedVariable
142+
self.assertEqual(stackless.getruncount(), rc1)
143+
144+
145+
class GetThreadInfoTest(StacklessTestCase):
146+
147+
def testWithTasklets(self):
148+
threadid = thread.get_ident()
149+
info = []
150+
151+
def f():
152+
info.extend(stackless.get_thread_info(threadid))
153+
154+
task = stackless.tasklet().bind(f, ())
155+
task.run()
156+
self.assertIs(info[0], stackless.main) # @UndefinedVariable
157+
self.assertIs(info[1], task)
158+
self.assertEqual(info[2], 2)
159+
160+
def testAllThreads(self):
161+
for threadid in sys._current_frames():
162+
info = stackless.get_thread_info(threadid)
163+
self.assertIsInstance(info, tuple)
164+
self.assertEqual(len(info), 3)
165+
self.assertIsInstance(info[0], stackless.tasklet)
166+
self.assertIs(info[0], info[1])
167+
self.assertEqual(info[2], 1)
168+
169+
def testThreads_main(self):
170+
mti = main_thread_id()
171+
st = stackless.threads # @UndefinedVariable
172+
self.assertIsInstance(st, list)
173+
# must contain at least the main thread id at index 0
174+
self.assertGreaterEqual(len(st), 1)
175+
self.assertEqual(st[0], mti)
176+
self.assertSetEqual(
177+
frozenset(st), frozenset(sys._current_frames().keys()))
178+
179+
180+
class SchedulerTest(StacklessTestCase):
181+
def testRun_args(self):
182+
"""Test, that run accepts the correct arguments
183+
184+
timeout=0, threadblock=False, soft=False, ignore_nesting=False, totaltimeout=False
185+
186+
Fortunately, we can call stackless.run with an empty queue for this test.
187+
"""
188+
stackless.run()
189+
stackless.run(**dict(timeout=0, threadblock=False, soft=False, ignore_nesting=False, totaltimeout=False))
190+
stackless.run(0, False, False, False, False)
191+
self.assertRaisesRegexp(TypeError, r"takes at most 5 arguments", stackless.run, 0, False, False, False, False, None)

0 commit comments

Comments
 (0)