[wwwsearch-commits] r43826 - in wwwsearch/mechanize/trunk: . test-tools

jjlee at codespeak.net jjlee at codespeak.net
Mon May 28 23:06:57 CEST 2007


Author: jjlee
Date: Mon May 28 23:06:56 2007
New Revision: 43826

Added:
   wwwsearch/mechanize/trunk/test-tools/testprogram.py
   wwwsearch/mechanize/trunk/test-tools/twisted-localserver.py
Modified:
   wwwsearch/mechanize/trunk/functional_tests.py
   wwwsearch/mechanize/trunk/test.py
Log:
Add an -l option to run the functional tests against a local twisted.web2-based server.  There are still a few tests that always run against the wwwsearch.sf.net site -- need to extend the local srver a bit to cover these.

Modified: wwwsearch/mechanize/trunk/functional_tests.py
==============================================================================
--- wwwsearch/mechanize/trunk/functional_tests.py	(original)
+++ wwwsearch/mechanize/trunk/functional_tests.py	Mon May 28 23:06:56 2007
@@ -2,7 +2,9 @@
 
 # These tests access the network.
 
-import os
+# thanks Moof (aka Giles Antonio Radford) for some of these
+
+import os, sys
 from unittest import TestCase
 
 import mechanize
@@ -11,6 +13,14 @@
      HTTPHandler, HTTPRefreshProcessor, \
      HTTPEquivProcessor, HTTPRedirectHandler, \
      HTTPRedirectDebugProcessor, HTTPResponseDebugProcessor
+from mechanize._rfc3986 import urljoin
+
+# XXX
+# document twisted.web2 install (I forgot how I did it -- reinstall!)
+# implement remaining stuff used by functional_tests.py
+# in twisted-localserver.py:
+#   - 302 followed by 404 response
+#   - helper cgi script for cookies &c.
 
 #from cookielib import CookieJar
 #from urllib2 import build_opener, install_opener, urlopen
@@ -20,8 +30,9 @@
 
 ## import logging
 ## logger = logging.getLogger("mechanize")
-## logger.addHandler(logging.StreamHandler())
-## logger.setLevel(logging.DEBUG)
+## logger.addHandler(logging.StreamHandler(sys.stdout))
+## #logger.setLevel(logging.DEBUG)
+## logger.setLevel(logging.INFO)
 
 
 def sanepathname2url(path):
@@ -39,7 +50,7 @@
         self.browser = mechanize.Browser()
 
     def test_simple(self):
-        self.browser.open('http://wwwsearch.sourceforge.net/')
+        self.browser.open(self.uri)
         self.assertEqual(self.browser.title(), 'Python bits')
         # relative URL
         self.browser.open('/mechanize/')
@@ -52,7 +63,7 @@
         import urllib2
         self.assertRaises(
             urllib2.HTTPError,
-            self.browser.open, "http://wwwsearch.sf.net/doesnotexist"
+            self.browser.open, "http://wwwsearch.sf.net/doesnotexist",
             )
 
     def test_reread(self):
@@ -60,7 +71,7 @@
         # be true for e.g. mechanize.OpenerDirector when mechanize's own
         # handlers are in use, but is guaranteed to be true for
         # mechanize.Browser)
-        r = self.browser.open('http://wwwsearch.sourceforge.net/')
+        r = self.browser.open(self.uri)
         data = r.read()
         r.close()
         r.seek(0)
@@ -70,12 +81,12 @@
     def test_error_recovery(self):
         self.assertRaises(OSError, self.browser.open,
                           'file:///c|thisnoexistyiufheiurgbueirgbue')
-        self.browser.open('http://wwwsearch.sourceforge.net/')
+        self.browser.open(self.uri)
         self.assertEqual(self.browser.title(), 'Python bits')
 
     def test_redirect(self):
         # 301 redirect due to missing final '/'
-        r = self.browser.open('http://wwwsearch.sourceforge.net/bits')
+        r = self.browser.open(urljoin(self.uri, "bits"))
         self.assertEqual(r.code, 200)
         self.assert_("GeneralFAQ.html" in r.read(2048))
 
@@ -92,7 +103,7 @@
             self.assertRaises(mechanize.BrowserStateError, br.back)
         test_state(self.browser)
         # note this involves a redirect, which should itself be non-visiting
-        r = self.browser.open_novisit("http://wwwsearch.sourceforge.net/bits")
+        r = self.browser.open_novisit(urljoin(self.uri, "bits"))
         test_state(self.browser)
         self.assert_("GeneralFAQ.html" in r.read(2048))
 
@@ -102,7 +113,7 @@
         ua = mechanize.UserAgent()
         ua.set_seekable_responses(False)
         ua.set_handle_equiv(False)
-        response = ua.open('http://wwwsearch.sourceforge.net/')
+        response = ua.open(self.uri)
         self.failIf(hasattr(response, "seek"))
         data = response.read()
         self.assert_("Python bits" in data)
@@ -112,7 +123,7 @@
 
     def test_seek(self):
         br = mechanize.Browser()
-        r = br.open("http://wwwsearch.sourceforge.net/")
+        r = br.open(self.uri)
         html = r.read()
         r.seek(0)
         self.assertEqual(r.read(), html)
@@ -120,7 +131,7 @@
     def test_seekable_response_opener(self):
         opener = mechanize.OpenerFactory(
             mechanize.SeekableResponseOpener).build_opener()
-        r = opener.open("http://wwwsearch.sourceforge.net/bits/cctest2.txt")
+        r = opener.open(urljoin(self.uri, "bits/cctest2.txt"))
         r.read()
         r.seek(0)
         self.assertEqual(r.read(),
@@ -130,11 +141,10 @@
     def test_no_seek(self):
         # should be possible to turn off UserAgent's .seek() functionality
         def check_no_seek(opener):
-            r = opener.open(
-                "http://wwwsearch.sourceforge.net/bits/cctest2.txt")
+            r = opener.open(urljoin(self.uri, "bits/cctest2.txt"))
             self.assert_(not hasattr(r, "seek"))
             try:
-                opener.open("http://wwwsearch.sourceforge.net/nonexistent")
+                opener.open(urljoin(self.uri, "nonexistent"))
             except mechanize.HTTPError, exc:
                 self.assert_(not hasattr(exc, "seek"))
 
@@ -154,13 +164,12 @@
         # .seek() method, then raised HTTPError exceptions should also have the
         # .seek() method
         def check(opener, excs_also):
-            r = opener.open(
-                "http://wwwsearch.sourceforge.net/bits/cctest2.txt")
+            r = opener.open(urljoin(self.uri, "bits/cctest2.txt"))
             data = r.read()
             r.seek(0)
             self.assertEqual(data, r.read(), r.get_data())
             try:
-                opener.open("http://wwwsearch.sourceforge.net/nonexistent")
+                opener.open(urljoin(self.uri, "nonexistent"))
             except mechanize.HTTPError, exc:
                 data = exc.read()
                 if excs_also:
@@ -189,7 +198,7 @@
 
     def test_set_response(self):
         br = mechanize.Browser()
-        r = br.open("http://wwwsearch.sourceforge.net/")
+        r = br.open(self.uri)
         html = r.read()
         self.assertEqual(br.title(), "Python bits")
 
@@ -226,7 +235,7 @@
         import pickle
 
         b = mechanize.Browser()
-        r = b.open("http://wwwsearch.sourceforge.net/bits/cctest2.txt")
+        r = b.open(urljoin(self.uri, "bits/cctest2.txt"))
         r.read()
 
         r.close()
@@ -301,11 +310,11 @@
         plain_opener = mechanize.build_opener(mechanize.HTTPRobotRulesProcessor)
         browser = mechanize.Browser()
         for opener in plain_opener, browser:
-            r = opener.open("http://wwwsearch.sourceforge.net/robots")
+            r = opener.open(urljoin(self.uri, "robots"))
             self.assertEqual(r.code, 200)
             self.assertRaises(
                 mechanize.RobotExclusionError,
-                opener.open, "http://wwwsearch.sourceforge.net/norobots")
+                opener.open, urljoin(self.uri, "norobots"))
 
     def test_urlretrieve(self):
         url = "http://www.python.org/"
@@ -341,11 +350,10 @@
     def test_reload_read_incomplete(self):
         from mechanize import Browser
         browser = Browser()
-        r1 = browser.open(
-            "http://wwwsearch.sourceforge.net/bits/mechanize_reload_test.html")
+        r1 = browser.open(urljoin(self.uri, "bits/mechanize_reload_test.html"))
         # if we don't do anything and go straight to another page, most of the
         # last page's response won't be .read()...
-        r2 = browser.open("http://wwwsearch.sourceforge.net/mechanize")
+        r2 = browser.open(urljoin(self.uri, "mechanize"))
         self.assert_(len(r1.get_data()) < 4097)  # we only .read() a little bit
         # ...so if we then go back, .follow_link() for a link near the end (a
         # few kb in, past the point that always gets read in HTML files because
@@ -378,5 +386,29 @@
 
 
 if __name__ == "__main__":
-    import unittest
-    unittest.main()
+    import sys
+    sys.path.insert(0, "test-tools")
+    import testprogram
+    USAGE_EXAMPLES = """
+Examples:
+  %(progName)s
+                 - run all tests
+  %(progName)s functional_tests.SimpleTests
+                 - run all 'test*' test methods in class SimpleTests
+  %(progName)s functional_tests.SimpleTests.test_redirect
+                 - run SimpleTests.test_redirect
+
+  %(progName)s -l
+                 - start a local Twisted HTTP server and run the functional
+                   tests against that, rather than against SourceForge
+                   (quicker!)
+                   Note not all the functional tests use the local server yet
+                   -- some currently always access the internet regardless of
+                   this option and the --uri option.
+"""
+    prog = testprogram.TestProgram(
+        ["functional_tests"],
+        localServerProcess=testprogram.TwistedServerProcess(),
+        usageExamples=USAGE_EXAMPLES,
+        )
+    result = prog.runTests()

Added: wwwsearch/mechanize/trunk/test-tools/testprogram.py
==============================================================================
--- (empty file)
+++ wwwsearch/mechanize/trunk/test-tools/testprogram.py	Mon May 28 23:06:56 2007
@@ -0,0 +1,311 @@
+"""Local server and cgitb support."""
+
+import cgitb
+#cgitb.enable(format="text")
+
+import sys, os, traceback, logging, glob, time
+from unittest import defaultTestLoader, TextTestRunner, TestSuite, TestCase, \
+     _TextTestResult
+
+
+class ServerProcess:
+
+    def __init__(self, filename, name=None):
+        if filename is None:
+            raise ValueError('filename arg must be a string')
+        if name is None:
+            name = filename
+        self.name = os.path.basename(name)
+        self.port = None
+        self.report_hook = lambda msg: None
+        self._filename = filename
+
+    def _get_args(self):
+        """Return list of command line arguments.
+
+        Override me.
+        """
+        return []
+
+    def start(self):
+        self.report_hook("starting (%s)" % (
+            [sys.executable, self._filename]+self._get_args()))
+        self._pid = os.spawnv(
+            os.P_NOWAIT,
+            sys.executable,
+            [sys.executable, self._filename]+self._get_args())
+        self.report_hook("waiting for startup")
+        self._wait_for_startup()
+        self.report_hook("running")
+
+    def _wait_for_startup(self):
+        import socket
+        def connect():
+            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            sock.settimeout(1.0)
+            try:
+                sock.connect(('127.0.0.1', self.port))
+            finally:
+                sock.close()
+        backoff(connect, (socket.error,))
+
+    def stop(self):
+        """Kill process (forcefully if necessary)."""
+        if os.name == 'nt':
+            kill_windows(self._pid, self.report_hook)
+        else:
+            kill_posix(self._pid, self.report_hook)
+
+def backoff(func, errors,
+            initial_timeout=1., hard_timeout=60., factor=1.2):
+    starttime = time.time()
+    timeout = initial_timeout
+    while time.time() < starttime + hard_timeout - 0.01:
+        try:
+            func()
+        except errors, exc:
+            time.sleep(timeout)
+            timeout *= factor
+            hard_limit = hard_timeout - (time.time() - starttime)
+            timeout = min(timeout, hard_limit)
+        else:
+            break
+
+def kill_windows(handle, report_hook):
+    try:
+        import win32api
+    except ImportError:
+        import ctypes
+        ctypes.windll.kernel32.TerminateProcess(int(handle), -1)
+    else:
+        win32api.TerminateProcess(int(handle), -1)
+
+def kill_posix(pid, report_hook):
+    import signal
+    os.kill(pid, signal.SIGTERM)
+
+    timeout = 10.
+    starttime = time.time()
+    report_hook("waiting for exit")
+    def do_nothing(*args):
+        pass
+    old_handler = signal.signal(signal.SIGCHLD, do_nothing)
+    try:
+        while time.time() < starttime + timeout - 0.01:
+            pid, sts = os.waitpid(pid, os.WNOHANG)
+            if pid != 0:
+                # exited, or error
+                break
+            newtimeout = timeout - (time.time() - starttime) - 1.
+            time.sleep(newtimeout)  # wait for signal
+        else:
+            report_hook("forcefully killing")
+            try:
+                os.kill(pid, signal.SIGKILL)
+            except OSError, exc:
+                if exc.errno != errno.ECHILD:
+                    raise
+    finally:
+        signal.signal(signal.SIGCHLD, old_handler)
+
+class TwistedServerProcess(ServerProcess):
+
+    def __init__(self, name=None):
+        top_level_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
+        path = os.path.join(top_level_dir, "test-tools/twisted-localserver.py")
+        ServerProcess.__init__(self, path, name)
+
+    def _get_args(self):
+        return [str(self.port)]
+
+
+class CgitbTextResult(_TextTestResult):
+    def _exc_info_to_string(self, err, test):
+        """Converts a sys.exc_info()-style tuple of values into a string."""
+        exctype, value, tb = err
+        # Skip test runner traceback levels
+        while tb and self._is_relevant_tb_level(tb):
+            tb = tb.tb_next
+        if exctype is test.failureException:
+            # Skip assert*() traceback levels
+            length = self._count_relevant_tb_levels(tb)
+            return cgitb.text((exctype, value, tb))
+        return cgitb.text((exctype, value, tb))
+
+class CgitbTextTestRunner(TextTestRunner):
+    def _makeResult(self):
+        return CgitbTextResult(self.stream, self.descriptions, self.verbosity)
+
+def add_uri_attribute_to_test_cases(suite, uri):
+    for test in suite._tests:
+        if isinstance(test, TestCase):
+            test.uri = uri
+        else:
+            try:
+                add_uri_attribute_to_test_cases(test, uri)
+            except AttributeError:
+                pass
+
+
+class TestProgram:
+    """A command-line program that runs a set of tests; this is primarily
+       for making test modules conveniently executable.
+    """
+    USAGE = """\
+Usage: %(progName)s [options] [test] [...]
+
+Note not all the functional tests take note of the --uri argument yet --
+some currently always access the internet regardless of the --uri and
+--run-local-server options.
+
+Options:
+  -l, --run-local-server
+                   Run a local Twisted HTTP server for the functional
+                   tests.  You need Twisted installed for this to work.
+                   The server is run on the port given in the --uri
+                   option.  If --run-local-server is given but no --uri is
+                   given, http://127.0.0.1:8000 is used as the base URI.
+                   Also, if you're on Windows and don't have pywin32 or
+                   ctypes installed, this option won't work, and you'll
+                   have to start up test-tools/localserver.py manually.
+  --uri=URL        Base URI for functional tests
+                   (test.py does not access the network, unless you tell
+                   it to run module functional_tests;
+                   functional_tests.py does access the network)
+                   e.g. --uri=http://127.0.0.1:8000/
+  -h, --help       Show this message
+  -v, --verbose    Verbose output
+  -q, --quiet      Minimal output
+
+The following options are only available through test.py (you can still run the
+functional tests through test.py, just give 'functional_tests' as the module
+name to run):
+
+  -u               Skip plain (non-doctest) unittests
+  -d               Skip doctests
+  -c               Run coverage (requires coverage.py, seems buggy)
+  -t               Display tracebacks using cgitb's text mode
+
+"""
+    USAGE_EXAMPLES = """
+Examples:
+  %(progName)s
+                 - run all tests
+  %(progName)s test_cookies
+                 - run module 'test_cookies'
+  %(progName)s test_cookies.CookieTests
+                 - run all 'test*' test methods in test_cookies.CookieTests
+  %(progName)s test_cookies.CookieTests.test_expires
+                 - run test_cookies.CookieTests.test_expires
+
+  %(progName)s functional_tests
+                 - run the functional tests
+  %(progName)s -l functional_tests
+                 - start a local Twisted HTTP server and run the functional
+                   tests against that, rather than against SourceForge
+                   (quicker!)
+"""
+    def __init__(self, moduleNames, localServerProcess, defaultTest=None,
+                 argv=None, testRunner=None, testLoader=defaultTestLoader,
+                 defaultUri="http://wwwsearch.sf.net/",
+                 usageExamples=USAGE_EXAMPLES,
+                 ):
+        self.modules = []
+        for moduleName in moduleNames:
+            module = __import__(moduleName)
+            for part in moduleName.split('.')[1:]:
+                module = getattr(module, part)
+            self.modules.append(module)
+        self.uri = None
+        self._defaultUri = defaultUri
+        if argv is None:
+            argv = sys.argv
+        self.verbosity = 1
+        self.defaultTest = defaultTest
+        self.testRunner = testRunner
+        self.testLoader = testLoader
+        self.progName = os.path.basename(argv[0])
+        self.usageExamples = usageExamples
+        self.runLocalServer = False
+        self.parseArgs(argv)
+        if self.runLocalServer:
+            import urllib
+            from mechanize._rfc3986 import urlsplit
+            authority = urlsplit(self.uri)[1]
+            host, port = urllib.splitport(authority)
+            if port is None:
+                port = "80"
+            try:
+                port = int(port)
+            except:
+                self.usageExit("port in --uri value must be an integer "
+                               "(try --uri=http://127.0.0.1:8000/)")
+            self._serverProcess = localServerProcess
+            def report(msg):
+                print "%s: %s" % (localServerProcess.name, msg)
+            localServerProcess.port = port
+            localServerProcess.report_hook = report
+
+    def usageExit(self, msg=None):
+        if msg: print msg
+        print (self.USAGE + self.usageExamples) % self.__dict__
+        sys.exit(2)
+
+    def parseArgs(self, argv):
+        import getopt
+        try:
+            options, args = getopt.getopt(
+                argv[1:],
+                'hHvql',
+                ['help','verbose','quiet', 'uri=', 'run-local-server'],
+                )
+            uri = None
+            for opt, value in options:
+                if opt in ('-h','-H','--help'):
+                    self.usageExit()
+                if opt in ('--uri',):
+                    uri = value
+                if opt in ('-q','--quiet'):
+                    self.verbosity = 0
+                if opt in ('-v','--verbose'):
+                    self.verbosity = 2
+                if opt in ('-l', '--run-local-server'):
+                    self.runLocalServer = True
+            if uri is None:
+                if self.runLocalServer:
+                    uri = "http://127.0.0.1:8000"
+                else:
+                    uri = self._defaultUri
+            self.uri = uri
+            if len(args) == 0 and self.defaultTest is None:
+                suite = TestSuite()
+                for module in self.modules:
+                    test = self.testLoader.loadTestsFromModule(module)
+                    suite.addTest(test)
+                self.test = suite
+                add_uri_attribute_to_test_cases(self.test, self.uri)
+                return
+            if len(args) > 0:
+                self.testNames = args
+            else:
+                self.testNames = (self.defaultTest,)
+            self.createTests()
+            add_uri_attribute_to_test_cases(self.test, self.uri)
+        except getopt.error, msg:
+            self.usageExit(msg)
+
+    def createTests(self):
+        self.test = self.testLoader.loadTestsFromNames(self.testNames)
+
+    def runTests(self):
+        if self.testRunner is None:
+            self.testRunner = TextTestRunner(verbosity=self.verbosity)
+
+        if self.runLocalServer:
+            self._serverProcess.start()
+        try:
+            result = self.testRunner.run(self.test)
+        finally:
+            if self.runLocalServer:
+                self._serverProcess.stop()
+        return result

Added: wwwsearch/mechanize/trunk/test-tools/twisted-localserver.py
==============================================================================
--- (empty file)
+++ wwwsearch/mechanize/trunk/test-tools/twisted-localserver.py	Mon May 28 23:06:56 2007
@@ -0,0 +1,91 @@
+#!/usr/bin/env python
+"""
+%prog port
+
+e.g. %prog 8000
+
+Runs a local server to point the mechanize functional tests at.  Example:
+
+python test-tools/twisted-localserver.py 8042
+python functional_tests.py --uri=http://localhost:8042/
+
+You need Twisted XXX version to run it:
+
+XXX installation instructions
+"""
+
+import os, sys, re, time
+from twisted.web2 import server, http, resource, channel, \
+     static, http_headers, responsecode
+
+from twisted.internet import reactor
+
+def html(title=None):
+    f = open("README.html", "r")
+    html = f.read()
+    if title is not None:
+        html = re.sub("<title>(.*)</title>", "<title>%s</title>" % title, html)
+    return html
+
+MECHANIZE_HTML = html()
+ROOT_HTML = html("Python bits")
+RELOAD_TEST_HTML = """\
+<html>
+<head><title>Title</title></head>
+<body>
+
+<a href="/mechanize">near the start</a>
+
+<p>Now some data to prevent HEAD parsing from reading the link near
+the end.
+
+<pre>
+%s</pre>
+
+<a href="/mechanize">near the end</a>
+
+</body>
+
+</html>""" % (("0123456789ABCDEF"*4+"\n")*61)
+
+
+class Page(resource.Resource):
+
+  addSlash = True
+  content_type = http_headers.MimeType("text", "html")
+
+  def render(self, ctx):
+    return http.Response(
+        responsecode.OK,
+        {"content-type": self.content_type},
+        self.text)
+
+def make_page(root, name, text,
+              content_type="text/html"):
+    page = Page()
+    page.text = text
+    base_type, specific_type = content_type.split("/")
+    page.content_type = http_headers.MimeType(base_type, specific_type)
+    setattr(root, "child_"+name, page)
+    return page
+
+def main():
+    root = Page()
+    root.text = ROOT_HTML
+    make_page(root, "mechanize", MECHANIZE_HTML)
+    bits = make_page(root, "robots.txt",
+                     "User-Agent: *\nDisallow: /norobots",
+                     "text/plain")
+    bits = make_page(root, "robots", "Hello, robots.", "text/plain")
+    bits = make_page(root, "norobots", "Hello, non-robots.", "text/plain")
+    bits = make_page(root, "bits", "GeneralFAQ.html")
+    make_page(bits, "cctest2.txt",
+              "Hello ClientCookie functional test suite.",
+              "text/plain")
+    make_page(bits, "mechanize_reload_test.html", RELOAD_TEST_HTML)
+
+    site = server.Site(root)
+    reactor.listenTCP(int(sys.argv[1]), channel.HTTPFactory(site))
+    reactor.run()
+
+main()

Modified: wwwsearch/mechanize/trunk/test.py
==============================================================================
--- wwwsearch/mechanize/trunk/test.py	(original)
+++ wwwsearch/mechanize/trunk/test.py	Mon May 28 23:06:56 2007
@@ -8,9 +8,6 @@
 
 """
 
-import cgitb
-#cgitb.enable(format="text")
-
 # Modules containing tests to run -- a test is anything named *Tests, which
 # should be classes deriving from unittest.TestCase.
 MODULE_NAMES = ["test_date", "test_browser", "test_response", "test_cookies",
@@ -18,9 +15,7 @@
                 "test_useragent", "test_html", "test_opener",
                 ]
 
-import sys, os, traceback, logging, glob
-from unittest import defaultTestLoader, TextTestRunner, TestSuite, TestCase, \
-     _TextTestResult
+import sys, os, logging, glob
 
 #level = logging.DEBUG
 #level = logging.INFO
@@ -30,109 +25,12 @@
 #logging.getLogger("mechanize").addHandler(logging.StreamHandler(sys.stdout))
 
 
-class CgitbTextResult(_TextTestResult):
-    def _exc_info_to_string(self, err, test):
-        """Converts a sys.exc_info()-style tuple of values into a string."""
-        exctype, value, tb = err
-        # Skip test runner traceback levels
-        while tb and self._is_relevant_tb_level(tb):
-            tb = tb.tb_next
-        if exctype is test.failureException:
-            # Skip assert*() traceback levels
-            length = self._count_relevant_tb_levels(tb)
-            return cgitb.text((exctype, value, tb))
-        return cgitb.text((exctype, value, tb))
-
-class CgitbTextTestRunner(TextTestRunner):
-    def _makeResult(self):
-        return CgitbTextResult(self.stream, self.descriptions, self.verbosity)
-
-
-class TestProgram:
-    """A command-line program that runs a set of tests; this is primarily
-       for making test modules conveniently executable.
-    """
-    USAGE = """\
-Usage: %(progName)s [options] [test] [...]
-
-Options:
-  -h, --help       Show this message
-  -v, --verbose    Verbose output
-  -q, --quiet      Minimal output
-
-Examples:
-  %(progName)s                               - run default set of tests
-  %(progName)s MyTestSuite                   - run suite 'MyTestSuite'
-  %(progName)s MyTestCase.testSomething      - run MyTestCase.testSomething
-  %(progName)s MyTestCase                    - run all 'test*' test methods
-                                               in MyTestCase
-"""
-    def __init__(self, moduleNames, defaultTest=None,
-                 argv=None, testRunner=None, testLoader=defaultTestLoader):
-        self.modules = []
-        for moduleName in moduleNames:
-            module = __import__(moduleName)
-            for part in moduleName.split('.')[1:]:
-                module = getattr(module, part)
-            self.modules.append(module)
-        if argv is None:
-            argv = sys.argv
-        self.verbosity = 1
-        self.defaultTest = defaultTest
-        self.testRunner = testRunner
-        self.testLoader = testLoader
-        self.progName = os.path.basename(argv[0])
-        self.parseArgs(argv)
-
-    def usageExit(self, msg=None):
-        if msg: print msg
-        print self.USAGE % self.__dict__
-        sys.exit(2)
-
-    def parseArgs(self, argv):
-        import getopt
-        try:
-            options, args = getopt.getopt(argv[1:], 'hHvq',
-                                          ['help','verbose','quiet'])
-            for opt, value in options:
-                if opt in ('-h','-H','--help'):
-                    self.usageExit()
-                if opt in ('-q','--quiet'):
-                    self.verbosity = 0
-                if opt in ('-v','--verbose'):
-                    self.verbosity = 2
-            if len(args) == 0 and self.defaultTest is None:
-                suite = TestSuite()
-                for module in self.modules:
-                    test = self.testLoader.loadTestsFromModule(module)
-                    suite.addTest(test)
-                self.test = suite
-                return
-            if len(args) > 0:
-                self.testNames = args
-            else:
-                self.testNames = (self.defaultTest,)
-            self.createTests()
-        except getopt.error, msg:
-            self.usageExit(msg)
-
-    def createTests(self):
-        self.test = self.testLoader.loadTestsFromNames(self.testNames)
-
-    def runTests(self):
-        if self.testRunner is None:
-            self.testRunner = TextTestRunner(verbosity=self.verbosity)
-        result = self.testRunner.run(self.test)
-        return result
-
-
 if __name__ == "__main__":
-##     sys.path.insert(0, '/home/john/comp/dev/rl/jjlee/lib/python')
-##     import jjl
-##     import __builtin__
-##     __builtin__.jjl = jjl
+    # XXX
+    # temporary stop-gap to run doctests &c.
+    # should switch to nose or something
 
-    # XXX temporary stop-gap to run doctests
+    top_level_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
 
     # XXXX coverage output seems incorrect ATM
     run_coverage = "-c" in sys.argv
@@ -159,6 +57,7 @@
     # that renamed module.
     sys.path.insert(0, "test-tools")
     import doctest
+    import testprogram
 
     import coverage
     if run_coverage:
@@ -176,7 +75,8 @@
     if run_doctests:
         # run .doctest files needing special support
         common_globs = {"mechanize": mechanize}
-        pm_doctest_filename = os.path.join("test", "test_password_manager.doctest")
+        pm_doctest_filename = os.path.join(
+            "test", "test_password_manager.doctest")
         for globs in [
             {"mgr_class": mechanize.HTTPPasswordMgr},
             {"mgr_class": mechanize.HTTPProxyPasswordMgr},
@@ -217,8 +117,12 @@
         sys.path.insert(0, test_path)
         test_runner = None
         if use_cgitb:
-            test_runner = CgitbTextTestRunner()
-        prog = TestProgram(MODULE_NAMES, testRunner=test_runner)
+            test_runner = testprogram.CgitbTextTestRunner()
+        prog = testprogram.TestProgram(
+            MODULE_NAMES,
+            testRunner=test_runner,
+            localServerProcess=testprogram.TwistedServerProcess(),
+            )
         result = prog.runTests()
 
     if run_coverage:


More information about the wwwsearch-commits mailing list