diff options
Diffstat (limited to 'lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/test_process.py')
-rwxr-xr-x | lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/test_process.py | 711 |
1 files changed, 0 insertions, 711 deletions
diff --git a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/test_process.py b/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/test_process.py deleted file mode 100755 index c56c225a..00000000 --- a/lib/python2.7/site-packages/Twisted-12.2.0-py2.7-linux-x86_64.egg/twisted/internet/test/test_process.py +++ /dev/null @@ -1,711 +0,0 @@ -# Copyright (c) Twisted Matrix Laboratories. -# See LICENSE for details. - -""" -Tests for implementations of L{IReactorProcess}. -""" - -__metaclass__ = type - -import os, sys, signal, threading - -from twisted.trial.unittest import TestCase, SkipTest -from twisted.internet.test.reactormixins import ReactorBuilder -from twisted.python.compat import set -from twisted.python.log import msg, err -from twisted.python.runtime import platform -from twisted.python.filepath import FilePath -from twisted.internet import utils -from twisted.internet.interfaces import IReactorProcess, IProcessTransport -from twisted.internet.defer import Deferred, succeed -from twisted.internet.protocol import ProcessProtocol -from twisted.internet.error import ProcessDone, ProcessTerminated -from twisted.internet import _signals - - - -class _ShutdownCallbackProcessProtocol(ProcessProtocol): - """ - An L{IProcessProtocol} which fires a Deferred when the process it is - associated with ends. - - @ivar received: A C{dict} mapping file descriptors to lists of bytes - received from the child process on those file descriptors. - """ - def __init__(self, whenFinished): - self.whenFinished = whenFinished - self.received = {} - - - def childDataReceived(self, fd, bytes): - self.received.setdefault(fd, []).append(bytes) - - - def processEnded(self, reason): - self.whenFinished.callback(None) - - - -class ProcessTestsBuilderBase(ReactorBuilder): - """ - Base class for L{IReactorProcess} tests which defines some tests which - can be applied to PTY or non-PTY uses of C{spawnProcess}. - - Subclasses are expected to set the C{usePTY} attribute to C{True} or - C{False}. - """ - requiredInterfaces = [IReactorProcess] - - - def test_processTransportInterface(self): - """ - L{IReactorProcess.spawnProcess} connects the protocol passed to it - to a transport which provides L{IProcessTransport}. - """ - ended = Deferred() - protocol = _ShutdownCallbackProcessProtocol(ended) - - reactor = self.buildReactor() - transport = reactor.spawnProcess( - protocol, sys.executable, [sys.executable, "-c", ""], - usePTY=self.usePTY) - - # The transport is available synchronously, so we can check it right - # away (unlike many transport-based tests). This is convenient even - # though it's probably not how the spawnProcess interface should really - # work. - # We're not using verifyObject here because part of - # IProcessTransport is a lie - there are no getHost or getPeer - # methods. See #1124. - self.assertTrue(IProcessTransport.providedBy(transport)) - - # Let the process run and exit so we don't leave a zombie around. - ended.addCallback(lambda ignored: reactor.stop()) - self.runReactor(reactor) - - - def _writeTest(self, write): - """ - Helper for testing L{IProcessTransport} write functionality. This - method spawns a child process and gives C{write} a chance to write some - bytes to it. It then verifies that the bytes were actually written to - it (by relying on the child process to echo them back). - - @param write: A two-argument callable. This is invoked with a process - transport and some bytes to write to it. - """ - reactor = self.buildReactor() - - ended = Deferred() - protocol = _ShutdownCallbackProcessProtocol(ended) - - bytes = "hello, world" + os.linesep - program = ( - "import sys\n" - "sys.stdout.write(sys.stdin.readline())\n" - ) - - def startup(): - transport = reactor.spawnProcess( - protocol, sys.executable, [sys.executable, "-c", program]) - try: - write(transport, bytes) - except: - err(None, "Unhandled exception while writing") - transport.signalProcess('KILL') - reactor.callWhenRunning(startup) - - ended.addCallback(lambda ignored: reactor.stop()) - - self.runReactor(reactor) - self.assertEqual(bytes, "".join(protocol.received[1])) - - - def test_write(self): - """ - L{IProcessTransport.write} writes the specified C{str} to the standard - input of the child process. - """ - def write(transport, bytes): - transport.write(bytes) - self._writeTest(write) - - - def test_writeSequence(self): - """ - L{IProcessTransport.writeSequence} writes the specified C{list} of - C{str} to the standard input of the child process. - """ - def write(transport, bytes): - transport.writeSequence(list(bytes)) - self._writeTest(write) - - - def test_writeToChild(self): - """ - L{IProcessTransport.writeToChild} writes the specified C{str} to the - specified file descriptor of the child process. - """ - def write(transport, bytes): - transport.writeToChild(0, bytes) - self._writeTest(write) - - - def test_writeToChildBadFileDescriptor(self): - """ - L{IProcessTransport.writeToChild} raises L{KeyError} if passed a file - descriptor which is was not set up by L{IReactorProcess.spawnProcess}. - """ - def write(transport, bytes): - try: - self.assertRaises(KeyError, transport.writeToChild, 13, bytes) - finally: - # Just get the process to exit so the test can complete - transport.write(bytes) - self._writeTest(write) - - - def test_spawnProcessEarlyIsReaped(self): - """ - If, before the reactor is started with L{IReactorCore.run}, a - process is started with L{IReactorProcess.spawnProcess} and - terminates, the process is reaped once the reactor is started. - """ - reactor = self.buildReactor() - - # Create the process with no shared file descriptors, so that there - # are no other events for the reactor to notice and "cheat" with. - # We want to be sure it's really dealing with the process exiting, - # not some associated event. - if self.usePTY: - childFDs = None - else: - childFDs = {} - - # Arrange to notice the SIGCHLD. - signaled = threading.Event() - def handler(*args): - signaled.set() - signal.signal(signal.SIGCHLD, handler) - - # Start a process - before starting the reactor! - ended = Deferred() - reactor.spawnProcess( - _ShutdownCallbackProcessProtocol(ended), sys.executable, - [sys.executable, "-c", ""], usePTY=self.usePTY, childFDs=childFDs) - - # Wait for the SIGCHLD (which might have been delivered before we got - # here, but that's okay because the signal handler was installed above, - # before we could have gotten it). - signaled.wait(120) - if not signaled.isSet(): - self.fail("Timed out waiting for child process to exit.") - - # Capture the processEnded callback. - result = [] - ended.addCallback(result.append) - - if result: - # The synchronous path through spawnProcess / Process.__init__ / - # registerReapProcessHandler was encountered. There's no reason to - # start the reactor, because everything is done already. - return - - # Otherwise, though, start the reactor so it can tell us the process - # exited. - ended.addCallback(lambda ignored: reactor.stop()) - self.runReactor(reactor) - - # Make sure the reactor stopped because the Deferred fired. - self.assertTrue(result) - - if getattr(signal, 'SIGCHLD', None) is None: - test_spawnProcessEarlyIsReaped.skip = ( - "Platform lacks SIGCHLD, early-spawnProcess test can't work.") - - - def test_processExitedWithSignal(self): - """ - The C{reason} argument passed to L{IProcessProtocol.processExited} is a - L{ProcessTerminated} instance if the child process exits with a signal. - """ - sigName = 'TERM' - sigNum = getattr(signal, 'SIG' + sigName) - exited = Deferred() - source = ( - "import sys\n" - # Talk so the parent process knows the process is running. This is - # necessary because ProcessProtocol.makeConnection may be called - # before this process is exec'd. It would be unfortunate if we - # SIGTERM'd the Twisted process while it was on its way to doing - # the exec. - "sys.stdout.write('x')\n" - "sys.stdout.flush()\n" - "sys.stdin.read()\n") - - class Exiter(ProcessProtocol): - def childDataReceived(self, fd, data): - msg('childDataReceived(%d, %r)' % (fd, data)) - self.transport.signalProcess(sigName) - - def childConnectionLost(self, fd): - msg('childConnectionLost(%d)' % (fd,)) - - def processExited(self, reason): - msg('processExited(%r)' % (reason,)) - # Protect the Deferred from the failure so that it follows - # the callback chain. This doesn't use the errback chain - # because it wants to make sure reason is a Failure. An - # Exception would also make an errback-based test pass, and - # that would be wrong. - exited.callback([reason]) - - def processEnded(self, reason): - msg('processEnded(%r)' % (reason,)) - - reactor = self.buildReactor() - reactor.callWhenRunning( - reactor.spawnProcess, Exiter(), sys.executable, - [sys.executable, "-c", source], usePTY=self.usePTY) - - def cbExited((failure,)): - # Trapping implicitly verifies that it's a Failure (rather than - # an exception) and explicitly makes sure it's the right type. - failure.trap(ProcessTerminated) - err = failure.value - if platform.isWindows(): - # Windows can't really /have/ signals, so it certainly can't - # report them as the reason for termination. Maybe there's - # something better we could be doing here, anyway? Hard to - # say. Anyway, this inconsistency between different platforms - # is extremely unfortunate and I would remove it if I - # could. -exarkun - self.assertIdentical(err.signal, None) - self.assertEqual(err.exitCode, 1) - else: - self.assertEqual(err.signal, sigNum) - self.assertIdentical(err.exitCode, None) - - exited.addCallback(cbExited) - exited.addErrback(err) - exited.addCallback(lambda ign: reactor.stop()) - - self.runReactor(reactor) - - - def test_systemCallUninterruptedByChildExit(self): - """ - If a child process exits while a system call is in progress, the system - call should not be interfered with. In particular, it should not fail - with EINTR. - - Older versions of Twisted installed a SIGCHLD handler on POSIX without - using the feature exposed by the SA_RESTART flag to sigaction(2). The - most noticable problem this caused was for blocking reads and writes to - sometimes fail with EINTR. - """ - reactor = self.buildReactor() - - # XXX Since pygobject/pygtk wants to use signal.set_wakeup_fd, - # we aren't actually providing this functionality on the glib2 - # or gtk2 reactors yet. See #4286 for the possibility of - # improving this. - skippedReactors = ["Glib2Reactor", "Gtk2Reactor", "PortableGtkReactor"] - hasSigInterrupt = getattr(signal, "siginterrupt", None) is not None - reactorClassName = reactor.__class__.__name__ - if reactorClassName in skippedReactors and not hasSigInterrupt: - raise SkipTest( - "%s is not supported without siginterrupt" % reactorClassName) - if _signals.installHandler.__name__ == "_installHandlerUsingSignal": - raise SkipTest("_signals._installHandlerUsingSignal doesn't support this feature") - - result = [] - - def f(): - try: - f1 = os.popen('%s -c "import time; time.sleep(0.1)"' % - (sys.executable,)) - f2 = os.popen('%s -c "import time; time.sleep(0.5); print \'Foo\'"' % - (sys.executable,)) - # The read call below will blow up with an EINTR from the - # SIGCHLD from the first process exiting if we install a - # SIGCHLD handler without SA_RESTART. (which we used to do) - result.append(f2.read()) - finally: - reactor.stop() - - reactor.callWhenRunning(f) - self.runReactor(reactor) - self.assertEqual(result, ["Foo\n"]) - - - def test_openFileDescriptors(self): - """ - A spawned process has only stdin, stdout and stderr open - (file descriptor 3 is also reported as open, because of the call to - 'os.listdir()'). - """ - from twisted.python.runtime import platformType - if platformType != "posix": - raise SkipTest("Test only applies to POSIX platforms") - - here = FilePath(__file__) - top = here.parent().parent().parent().parent() - source = ( - "import sys", - "sys.path.insert(0, '%s')" % (top.path,), - "from twisted.internet import process", - "sys.stdout.write(str(process._listOpenFDs()))", - "sys.stdout.flush()") - - def checkOutput(output): - self.assertEqual('[0, 1, 2, 3]', output) - - reactor = self.buildReactor() - - class Protocol(ProcessProtocol): - def __init__(self): - self.output = [] - - def outReceived(self, data): - self.output.append(data) - - def processEnded(self, reason): - try: - checkOutput("".join(self.output)) - finally: - reactor.stop() - - proto = Protocol() - reactor.callWhenRunning( - reactor.spawnProcess, proto, sys.executable, - [sys.executable, "-Wignore", "-c", "\n".join(source)], - usePTY=self.usePTY) - self.runReactor(reactor) - - - def test_timelyProcessExited(self): - """ - If a spawned process exits, C{processExited} will be called in a - timely manner. - """ - reactor = self.buildReactor() - - class ExitingProtocol(ProcessProtocol): - exited = False - - def processExited(protoSelf, reason): - protoSelf.exited = True - reactor.stop() - self.assertEqual(reason.value.exitCode, 0) - - protocol = ExitingProtocol() - reactor.callWhenRunning( - reactor.spawnProcess, protocol, sys.executable, - [sys.executable, "-c", "raise SystemExit(0)"], - usePTY=self.usePTY) - - # This will timeout if processExited isn't called: - self.runReactor(reactor, timeout=30) - self.assertEqual(protocol.exited, True) - - - -class ProcessTestsBuilder(ProcessTestsBuilderBase): - """ - Builder defining tests relating to L{IReactorProcess} for child processes - which do not have a PTY. - """ - usePTY = False - - keepStdioOpenProgram = FilePath(__file__).sibling('process_helper.py').path - if platform.isWindows(): - keepStdioOpenArg = "windows" - else: - # Just a value that doesn't equal "windows" - keepStdioOpenArg = "" - - # Define this test here because PTY-using processes only have stdin and - # stdout and the test would need to be different for that to work. - def test_childConnectionLost(self): - """ - L{IProcessProtocol.childConnectionLost} is called each time a file - descriptor associated with a child process is closed. - """ - connected = Deferred() - lost = {0: Deferred(), 1: Deferred(), 2: Deferred()} - - class Closer(ProcessProtocol): - def makeConnection(self, transport): - connected.callback(transport) - - def childConnectionLost(self, childFD): - lost[childFD].callback(None) - - source = ( - "import os, sys\n" - "while 1:\n" - " line = sys.stdin.readline().strip()\n" - " if not line:\n" - " break\n" - " os.close(int(line))\n") - - reactor = self.buildReactor() - reactor.callWhenRunning( - reactor.spawnProcess, Closer(), sys.executable, - [sys.executable, "-c", source], usePTY=self.usePTY) - - def cbConnected(transport): - transport.write('2\n') - return lost[2].addCallback(lambda ign: transport) - connected.addCallback(cbConnected) - - def lostSecond(transport): - transport.write('1\n') - return lost[1].addCallback(lambda ign: transport) - connected.addCallback(lostSecond) - - def lostFirst(transport): - transport.write('\n') - connected.addCallback(lostFirst) - connected.addErrback(err) - - def cbEnded(ignored): - reactor.stop() - connected.addCallback(cbEnded) - - self.runReactor(reactor) - - - # This test is here because PTYProcess never delivers childConnectionLost. - def test_processEnded(self): - """ - L{IProcessProtocol.processEnded} is called after the child process - exits and L{IProcessProtocol.childConnectionLost} is called for each of - its file descriptors. - """ - ended = Deferred() - lost = [] - - class Ender(ProcessProtocol): - def childDataReceived(self, fd, data): - msg('childDataReceived(%d, %r)' % (fd, data)) - self.transport.loseConnection() - - def childConnectionLost(self, childFD): - msg('childConnectionLost(%d)' % (childFD,)) - lost.append(childFD) - - def processExited(self, reason): - msg('processExited(%r)' % (reason,)) - - def processEnded(self, reason): - msg('processEnded(%r)' % (reason,)) - ended.callback([reason]) - - reactor = self.buildReactor() - reactor.callWhenRunning( - reactor.spawnProcess, Ender(), sys.executable, - [sys.executable, self.keepStdioOpenProgram, "child", - self.keepStdioOpenArg], - usePTY=self.usePTY) - - def cbEnded((failure,)): - failure.trap(ProcessDone) - self.assertEqual(set(lost), set([0, 1, 2])) - ended.addCallback(cbEnded) - - ended.addErrback(err) - ended.addCallback(lambda ign: reactor.stop()) - - self.runReactor(reactor) - - - # This test is here because PTYProcess.loseConnection does not actually - # close the file descriptors to the child process. This test needs to be - # written fairly differently for PTYProcess. - def test_processExited(self): - """ - L{IProcessProtocol.processExited} is called when the child process - exits, even if file descriptors associated with the child are still - open. - """ - exited = Deferred() - allLost = Deferred() - lost = [] - - class Waiter(ProcessProtocol): - def childDataReceived(self, fd, data): - msg('childDataReceived(%d, %r)' % (fd, data)) - - def childConnectionLost(self, childFD): - msg('childConnectionLost(%d)' % (childFD,)) - lost.append(childFD) - if len(lost) == 3: - allLost.callback(None) - - def processExited(self, reason): - msg('processExited(%r)' % (reason,)) - # See test_processExitedWithSignal - exited.callback([reason]) - self.transport.loseConnection() - - reactor = self.buildReactor() - reactor.callWhenRunning( - reactor.spawnProcess, Waiter(), sys.executable, - [sys.executable, self.keepStdioOpenProgram, "child", - self.keepStdioOpenArg], - usePTY=self.usePTY) - - def cbExited((failure,)): - failure.trap(ProcessDone) - msg('cbExited; lost = %s' % (lost,)) - self.assertEqual(lost, []) - return allLost - exited.addCallback(cbExited) - - def cbAllLost(ignored): - self.assertEqual(set(lost), set([0, 1, 2])) - exited.addCallback(cbAllLost) - - exited.addErrback(err) - exited.addCallback(lambda ign: reactor.stop()) - - self.runReactor(reactor) - - - def makeSourceFile(self, sourceLines): - """ - Write the given list of lines to a text file and return the absolute - path to it. - """ - script = self.mktemp() - scriptFile = file(script, 'wt') - scriptFile.write(os.linesep.join(sourceLines) + os.linesep) - scriptFile.close() - return os.path.abspath(script) - - - def test_shebang(self): - """ - Spawning a process with an executable which is a script starting - with an interpreter definition line (#!) uses that interpreter to - evaluate the script. - """ - SHEBANG_OUTPUT = 'this is the shebang output' - - scriptFile = self.makeSourceFile([ - "#!%s" % (sys.executable,), - "import sys", - "sys.stdout.write('%s')" % (SHEBANG_OUTPUT,), - "sys.stdout.flush()"]) - os.chmod(scriptFile, 0700) - - reactor = self.buildReactor() - - def cbProcessExited((out, err, code)): - msg("cbProcessExited((%r, %r, %d))" % (out, err, code)) - self.assertEqual(out, SHEBANG_OUTPUT) - self.assertEqual(err, "") - self.assertEqual(code, 0) - - def shutdown(passthrough): - reactor.stop() - return passthrough - - def start(): - d = utils.getProcessOutputAndValue(scriptFile, reactor=reactor) - d.addBoth(shutdown) - d.addCallback(cbProcessExited) - d.addErrback(err) - - reactor.callWhenRunning(start) - self.runReactor(reactor) - - - def test_processCommandLineArguments(self): - """ - Arguments given to spawnProcess are passed to the child process as - originally intended. - """ - source = ( - # On Windows, stdout is not opened in binary mode by default, - # so newline characters are munged on writing, interfering with - # the tests. - 'import sys, os\n' - 'try:\n' - ' import msvcrt\n' - ' msvcrt.setmode(sys.stdout.fileno(), os.O_BINARY)\n' - 'except ImportError:\n' - ' pass\n' - 'for arg in sys.argv[1:]:\n' - ' sys.stdout.write(arg + chr(0))\n' - ' sys.stdout.flush()') - - args = ['hello', '"', ' \t|<>^&', r'"\\"hello\\"', r'"foo\ bar baz\""'] - # Ensure that all non-NUL characters can be passed too. - args.append(''.join(map(chr, xrange(1, 256)))) - - reactor = self.buildReactor() - - def processFinished(output): - output = output.split('\0') - # Drop the trailing \0. - output.pop() - self.assertEqual(args, output) - - def shutdown(result): - reactor.stop() - return result - - def spawnChild(): - d = succeed(None) - d.addCallback(lambda dummy: utils.getProcessOutput( - sys.executable, ['-c', source] + args, reactor=reactor)) - d.addCallback(processFinished) - d.addBoth(shutdown) - - reactor.callWhenRunning(spawnChild) - self.runReactor(reactor) -globals().update(ProcessTestsBuilder.makeTestCaseClasses()) - - - -class PTYProcessTestsBuilder(ProcessTestsBuilderBase): - """ - Builder defining tests relating to L{IReactorProcess} for child processes - which have a PTY. - """ - usePTY = True - - if platform.isWindows(): - skip = "PTYs are not supported on Windows." - elif platform.isMacOSX(): - skippedReactors = { - "twisted.internet.pollreactor.PollReactor": - "OS X's poll() does not support PTYs"} -globals().update(PTYProcessTestsBuilder.makeTestCaseClasses()) - - - -class PotentialZombieWarningTests(TestCase): - """ - Tests for L{twisted.internet.error.PotentialZombieWarning}. - """ - def test_deprecated(self): - """ - Accessing L{PotentialZombieWarning} via the - I{PotentialZombieWarning} attribute of L{twisted.internet.error} - results in a deprecation warning being emitted. - """ - from twisted.internet import error - error.PotentialZombieWarning - - warnings = self.flushWarnings([self.test_deprecated]) - self.assertEqual(warnings[0]['category'], DeprecationWarning) - self.assertEqual( - warnings[0]['message'], - "twisted.internet.error.PotentialZombieWarning was deprecated in " - "Twisted 10.0.0: There is no longer any potential for zombie " - "process.") - self.assertEqual(len(warnings), 1) |