Skip to content

Commit 8300739

Browse files
author
Anselm Kruis
committed
Added sightly modified TestCase classes from Stackless/unittests/support.py from the current 2.7-slp source
1 parent 248116b commit 8300739

File tree

1 file changed

+91
-0
lines changed

1 file changed

+91
-0
lines changed

stackless_testsuite/util.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424

2525
import types
2626
import inspect
27+
import sys
28+
import unittest
29+
import stackless
2730

2831
FUNCTION = object()
2932
ROUTINE = object()
@@ -81,3 +84,91 @@ def create_type_tests_for_class(ns, api):
8184
template.__globals__,
8285
test_name,
8386
(getContainer, name, expected_type))
87+
88+
89+
try:
90+
import threading
91+
withThreads = True
92+
except:
93+
withThreads = False
94+
95+
96+
class StacklessTestCase(unittest.TestCase):
97+
__preexisting_threads = None
98+
99+
def setUp(self):
100+
self.assertEqual(stackless.getruncount(), 1, "Leakage from other tests, with %d tasklets still in the scheduler" % (stackless.getruncount() - 1))
101+
if withThreads:
102+
active_count = threading.active_count()
103+
if self.__preexisting_threads is None:
104+
self.__preexisting_threads = frozenset(threading.enumerate())
105+
expected_thread_count = len(self.__preexisting_threads)
106+
self.assertEqual(active_count, expected_thread_count, "Leakage from other threads, with %d threads running (%d expected)" % (active_count, expected_thread_count))
107+
108+
def tearDown(self):
109+
# Tasklets created in pickling tests can be left in the scheduler when they finish. We can feel free to
110+
# clean them up for the tests. Any tests that expect to exit with no leaked tasklets should do explicit
111+
# assertions to check.
112+
mainTasklet = stackless.getmain()
113+
current = mainTasklet.next
114+
while current is not None and current is not mainTasklet:
115+
next_ = current.next
116+
current.kill()
117+
current = next_
118+
self.assertEqual(stackless.getruncount(
119+
), 1, "Leakage from this test, with %d tasklets still in the scheduler" % (stackless.getruncount() - 1))
120+
if withThreads:
121+
expected_thread_count = len(self.__preexisting_threads)
122+
active_count = threading.active_count()
123+
if active_count > expected_thread_count:
124+
activeThreads = set(threading.enumerate())
125+
activeThreads -= self.__preexisting_threads
126+
self.assertNotIn(threading.current_thread(), activeThreads, "tearDown runs on the wrong thread.")
127+
while activeThreads:
128+
activeThreads.pop().join(0.5)
129+
active_count = threading.active_count()
130+
self.assertEqual(active_count, expected_thread_count, "Leakage from other threads, with %d threads running (%d expected)" % (active_count, expected_thread_count))
131+
132+
SAFE_TESTCASE_ATTRIBUTES = unittest.TestCase(
133+
methodName='run').__dict__.keys()
134+
135+
def _addSkip(self, result, reason):
136+
# Remove non standard attributes. They could render the test case object unpickleable.
137+
# This is a hack, but it works fairly well.
138+
for k in self.__dict__.keys():
139+
if k not in self.SAFE_TESTCASE_ATTRIBUTES and \
140+
not isinstance(self.__dict__[k], (types.NoneType, basestring, int, long, float)):
141+
del self.__dict__[k]
142+
super(StacklessTestCase, self)._addSkip(result, reason)
143+
144+
145+
class AsTaskletTestCase(StacklessTestCase):
146+
"""A test case class, that runs tests as tasklets"""
147+
148+
def setUp(self):
149+
self._ran_AsTaskletTestCase_setUp = True
150+
if stackless.enable_softswitch(None):
151+
self.assertEqual(stackless.current.nesting_level, 0) # @UndefinedVariable
152+
153+
# yes, its intended: call setUp on the grand parent class
154+
super(StacklessTestCase, self).setUp()
155+
self.assertEqual(stackless.getruncount(
156+
), 1, "Leakage from other tests, with %d tasklets still in the scheduler" % (stackless.getruncount() - 1))
157+
if withThreads:
158+
self.assertEqual(threading.activeCount(
159+
), 1, "Leakage from other threads, with %d threads running (1 expected)" % (threading.activeCount()))
160+
161+
def run(self, result=None):
162+
c = stackless.channel()
163+
c.preference = 1 # sender priority
164+
self._ran_AsTaskletTestCase_setUp = False
165+
166+
def helper():
167+
try:
168+
c.send(super(AsTaskletTestCase, self).run(result))
169+
except:
170+
c.send_throw(*sys.exc_info())
171+
stackless.tasklet(helper)()
172+
result = c.receive()
173+
assert self._ran_AsTaskletTestCase_setUp
174+
return result

0 commit comments

Comments
 (0)