path: root/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_threads.py
diff options
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_threads.py')
1 files changed, 0 insertions, 412 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_threads.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_threads.py
deleted file mode 100755
index 0b218a39..00000000
--- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/test/test_threads.py
+++ /dev/null
@@ -1,412 +0,0 @@
-# Copyright (c) Twisted Matrix Laboratories.
-# See LICENSE for details.
-Test methods in twisted.internet.threads and reactor thread APIs.
-import sys, os, time
-from twisted.trial import unittest
-from twisted.internet import reactor, defer, interfaces, threads, protocol, error
-from twisted.python import failure, threadable, log, threadpool
-class ReactorThreadsTestCase(unittest.TestCase):
- """
- Tests for the reactor threading API.
- """
- def test_suggestThreadPoolSize(self):
- """
- Try to change maximum number of threads.
- """
- reactor.suggestThreadPoolSize(34)
- self.assertEqual(reactor.threadpool.max, 34)
- reactor.suggestThreadPoolSize(4)
- self.assertEqual(reactor.threadpool.max, 4)
- def _waitForThread(self):
- """
- The reactor's threadpool is only available when the reactor is running,
- so to have a sane behavior during the tests we make a dummy
- L{threads.deferToThread} call.
- """
- return threads.deferToThread(time.sleep, 0)
- def test_callInThread(self):
- """
- Test callInThread functionality: set a C{threading.Event}, and check
- that it's not in the main thread.
- """
- def cb(ign):
- waiter = threading.Event()
- result = []
- def threadedFunc():
- result.append(threadable.isInIOThread())
- waiter.set()
- reactor.callInThread(threadedFunc)
- waiter.wait(120)
- if not waiter.isSet():
- self.fail("Timed out waiting for event.")
- else:
- self.assertEqual(result, [False])
- return self._waitForThread().addCallback(cb)
- def test_callFromThread(self):
- """
- Test callFromThread functionality: from the main thread, and from
- another thread.
- """
- def cb(ign):
- firedByReactorThread = defer.Deferred()
- firedByOtherThread = defer.Deferred()
- def threadedFunc():
- reactor.callFromThread(firedByOtherThread.callback, None)
- reactor.callInThread(threadedFunc)
- reactor.callFromThread(firedByReactorThread.callback, None)
- return defer.DeferredList(
- [firedByReactorThread, firedByOtherThread],
- fireOnOneErrback=True)
- return self._waitForThread().addCallback(cb)
- def test_wakerOverflow(self):
- """
- Try to make an overflow on the reactor waker using callFromThread.
- """
- def cb(ign):
- self.failure = None
- waiter = threading.Event()
- def threadedFunction():
- # Hopefully a hundred thousand queued calls is enough to
- # trigger the error condition
- for i in xrange(100000):
- try:
- reactor.callFromThread(lambda: None)
- except:
- self.failure = failure.Failure()
- break
- waiter.set()
- reactor.callInThread(threadedFunction)
- waiter.wait(120)
- if not waiter.isSet():
- self.fail("Timed out waiting for event")
- if self.failure is not None:
- return defer.fail(self.failure)
- return self._waitForThread().addCallback(cb)
- def _testBlockingCallFromThread(self, reactorFunc):
- """
- Utility method to test L{threads.blockingCallFromThread}.
- """
- waiter = threading.Event()
- results = []
- errors = []
- def cb1(ign):
- def threadedFunc():
- try:
- r = threads.blockingCallFromThread(reactor, reactorFunc)
- except Exception, e:
- errors.append(e)
- else:
- results.append(r)
- waiter.set()
- reactor.callInThread(threadedFunc)
- return threads.deferToThread(waiter.wait, self.getTimeout())
- def cb2(ign):
- if not waiter.isSet():
- self.fail("Timed out waiting for event")
- return results, errors
- return self._waitForThread().addCallback(cb1).addBoth(cb2)
- def test_blockingCallFromThread(self):
- """
- Test blockingCallFromThread facility: create a thread, call a function
- in the reactor using L{threads.blockingCallFromThread}, and verify the
- result returned.
- """
- def reactorFunc():
- return defer.succeed("foo")
- def cb(res):
- self.assertEqual(res[0][0], "foo")
- return self._testBlockingCallFromThread(reactorFunc).addCallback(cb)
- def test_asyncBlockingCallFromThread(self):
- """
- Test blockingCallFromThread as above, but be sure the resulting
- Deferred is not already fired.
- """
- def reactorFunc():
- d = defer.Deferred()
- reactor.callLater(0.1, d.callback, "egg")
- return d
- def cb(res):
- self.assertEqual(res[0][0], "egg")
- return self._testBlockingCallFromThread(reactorFunc).addCallback(cb)
- def test_errorBlockingCallFromThread(self):
- """
- Test error report for blockingCallFromThread.
- """
- def reactorFunc():
- return defer.fail(RuntimeError("bar"))
- def cb(res):
- self.assert_(isinstance(res[1][0], RuntimeError))
- self.assertEqual(res[1][0].args[0], "bar")
- return self._testBlockingCallFromThread(reactorFunc).addCallback(cb)
- def test_asyncErrorBlockingCallFromThread(self):
- """
- Test error report for blockingCallFromThread as above, but be sure the
- resulting Deferred is not already fired.
- """
- def reactorFunc():
- d = defer.Deferred()
- reactor.callLater(0.1, d.errback, RuntimeError("spam"))
- return d
- def cb(res):
- self.assert_(isinstance(res[1][0], RuntimeError))
- self.assertEqual(res[1][0].args[0], "spam")
- return self._testBlockingCallFromThread(reactorFunc).addCallback(cb)
-class Counter:
- index = 0
- problem = 0
- def add(self):
- """A non thread-safe method."""
- next = self.index + 1
- # another thread could jump in here and increment self.index on us
- if next != self.index + 1:
- self.problem = 1
- raise ValueError
- # or here, same issue but we wouldn't catch it. We'd overwrite
- # their results, and the index will have lost a count. If
- # several threads get in here, we will actually make the count
- # go backwards when we overwrite it.
- self.index = next
-class DeferredResultTestCase(unittest.TestCase):
- """
- Test twisted.internet.threads.
- """
- def setUp(self):
- reactor.suggestThreadPoolSize(8)
- def tearDown(self):
- reactor.suggestThreadPoolSize(0)
- def testCallMultiple(self):
- L = []
- N = 10
- d = defer.Deferred()
- def finished():
- self.assertEqual(L, range(N))
- d.callback(None)
- threads.callMultipleInThread([
- (L.append, (i,), {}) for i in xrange(N)
- ] + [(reactor.callFromThread, (finished,), {})])
- return d
- def test_deferredResult(self):
- """
- L{threads.deferToThread} executes the function passed, and correctly
- handles the positional and keyword arguments given.
- """
- d = threads.deferToThread(lambda x, y=5: x + y, 3, y=4)
- d.addCallback(self.assertEqual, 7)
- return d
- def test_deferredFailure(self):
- """
- Check that L{threads.deferToThread} return a failure object
- with an appropriate exception instance when the called
- function raises an exception.
- """
- class NewError(Exception):
- pass
- def raiseError():
- raise NewError()
- d = threads.deferToThread(raiseError)
- return self.assertFailure(d, NewError)
- def test_deferredFailureAfterSuccess(self):
- """
- Check that a successfull L{threads.deferToThread} followed by a one
- that raises an exception correctly result as a failure.
- """
- # set up a condition that causes cReactor to hang. These conditions
- # can also be set by other tests when the full test suite is run in
- # alphabetical order (test_flow.FlowTest.testThreaded followed by
- # test_internet.ReactorCoreTestCase.testStop, to be precise). By
- # setting them up explicitly here, we can reproduce the hang in a
- # single precise test case instead of depending upon side effects of
- # other tests.
- #
- # alas, this test appears to flunk the default reactor too
- d = threads.deferToThread(lambda: None)
- d.addCallback(lambda ign: threads.deferToThread(lambda: 1//0))
- return self.assertFailure(d, ZeroDivisionError)
-class DeferToThreadPoolTestCase(unittest.TestCase):
- """
- Test L{twisted.internet.threads.deferToThreadPool}.
- """
- def setUp(self):
- self.tp = threadpool.ThreadPool(0, 8)
- self.tp.start()
- def tearDown(self):
- self.tp.stop()
- def test_deferredResult(self):
- """
- L{threads.deferToThreadPool} executes the function passed, and
- correctly handles the positional and keyword arguments given.
- """
- d = threads.deferToThreadPool(reactor, self.tp,
- lambda x, y=5: x + y, 3, y=4)
- d.addCallback(self.assertEqual, 7)
- return d
- def test_deferredFailure(self):
- """
- Check that L{threads.deferToThreadPool} return a failure object with an
- appropriate exception instance when the called function raises an
- exception.
- """
- class NewError(Exception):
- pass
- def raiseError():
- raise NewError()
- d = threads.deferToThreadPool(reactor, self.tp, raiseError)
- return self.assertFailure(d, NewError)
-_callBeforeStartupProgram = """
-import time
-import %(reactor)s
-from twisted.internet import reactor
-def threadedCall():
- print 'threaded call'
-# Spin very briefly to try to give the thread a chance to run, if it
-# is going to. Is there a better way to achieve this behavior?
-for i in xrange(100):
- time.sleep(0.0)
-class ThreadStartupProcessProtocol(protocol.ProcessProtocol):
- def __init__(self, finished):
- self.finished = finished
- self.out = []
- self.err = []
- def outReceived(self, out):
- self.out.append(out)
- def errReceived(self, err):
- self.err.append(err)
- def processEnded(self, reason):
- self.finished.callback((self.out, self.err, reason))
-class StartupBehaviorTestCase(unittest.TestCase):
- """
- Test cases for the behavior of the reactor threadpool near startup
- boundary conditions.
- In particular, this asserts that no threaded calls are attempted
- until the reactor starts up, that calls attempted before it starts
- are in fact executed once it has started, and that in both cases,
- the reactor properly cleans itself up (which is tested for
- somewhat implicitly, by requiring a child process be able to exit,
- something it cannot do unless the threadpool has been properly
- torn down).
- """
- def testCallBeforeStartupUnexecuted(self):
- progname = self.mktemp()
- progfile = file(progname, 'w')
- progfile.write(_callBeforeStartupProgram % {'reactor': reactor.__module__})
- progfile.close()
- def programFinished((out, err, reason)):
- if reason.check(error.ProcessTerminated):
- self.fail("Process did not exit cleanly (out: %s err: %s)" % (out, err))
- if err:
- log.msg("Unexpected output on standard error: %s" % (err,))
- self.failIf(out, "Expected no output, instead received:\n%s" % (out,))
- def programTimeout(err):
- err.trap(error.TimeoutError)
- proto.signalProcess('KILL')
- return err
- env = os.environ.copy()
- env['PYTHONPATH'] = os.pathsep.join(sys.path)
- d = defer.Deferred().addCallbacks(programFinished, programTimeout)
- proto = ThreadStartupProcessProtocol(d)
- reactor.spawnProcess(proto, sys.executable, ('python', progname), env)
- return d
-if interfaces.IReactorThreads(reactor, None) is None:
- for cls in (ReactorThreadsTestCase,
- DeferredResultTestCase,
- StartupBehaviorTestCase):
- cls.skip = "No thread support, nothing to test here."
- import threading
-if interfaces.IReactorProcess(reactor, None) is None:
- for cls in (StartupBehaviorTestCase,):
- cls.skip = "No process support, cannot run subprocess thread tests."