[wwwsearch-commits] r27777 - in wwwsearch/mechanize/trunk: . mechanize test

jjlee at codespeak.net jjlee at codespeak.net
Sun May 28 00:03:40 CEST 2006


Author: jjlee
Date: Sun May 28 00:03:38 2006
New Revision: 27777

Added:
   wwwsearch/mechanize/trunk/test/test_password_manager.doctest
Modified:
   wwwsearch/mechanize/trunk/mechanize/_auth.py
   wwwsearch/mechanize/trunk/mechanize/_urllib2.py
   wwwsearch/mechanize/trunk/mechanize/_urllib2_support.py
   wwwsearch/mechanize/trunk/mechanize/_useragent.py
   wwwsearch/mechanize/trunk/test.py
   wwwsearch/mechanize/trunk/test/test_mechanize.py
Log:
Add support for client certificates; Teach password manager classes about default ports (this avoids having to register passwords & certificates twice, e.g. redirects from :443 --> no explicit port (i.e. default port)); Import some auth stuff from _auth that was accidentally still coming from urllib2

Modified: wwwsearch/mechanize/trunk/mechanize/_auth.py
==============================================================================
--- wwwsearch/mechanize/trunk/mechanize/_auth.py	(original)
+++ wwwsearch/mechanize/trunk/mechanize/_auth.py	Sun May 28 00:03:38 2006
@@ -11,10 +11,11 @@
 
 """
 
-import re, base64, urlparse, posixpath, md5, sha
+import re, base64, urlparse, posixpath, md5, sha, sys
 
 from urllib2 import BaseHandler
-from urllib import getproxies, unquote, splittype, splituser, splitpasswd
+from urllib import getproxies, unquote, splittype, splituser, splitpasswd, \
+     splitport
 
 
 def _parse_proxy(proxy):
@@ -135,32 +136,50 @@
         # uri could be a single URI or a sequence
         if isinstance(uri, basestring):
             uri = [uri]
-        uri = tuple(map(self.reduce_uri, uri))
         if not realm in self.passwd:
             self.passwd[realm] = {}
-        self.passwd[realm][uri] = (user, passwd)
+        for default_port in True, False:
+            reduced_uri = tuple(
+                [self.reduce_uri(u, default_port) for u in uri])
+            self.passwd[realm][reduced_uri] = (user, passwd)
 
     def find_user_password(self, realm, authuri):
         domains = self.passwd.get(realm, {})
-        authuri = self.reduce_uri(authuri)
-        for uris, authinfo in domains.iteritems():
-            for uri in uris:
-                if self.is_suburi(uri, authuri):
-                    return authinfo
+        for default_port in True, False:
+            reduced_authuri = self.reduce_uri(authuri, default_port)
+            for uris, authinfo in domains.iteritems():
+                for uri in uris:
+                    if self.is_suburi(uri, reduced_authuri):
+                        return authinfo
         return None, None
 
-    def reduce_uri(self, uri):
-        """Accept netloc or URI and extract only the netloc and path"""
+    def reduce_uri(self, uri, default_port=True):
+        """Accept authority or URI and extract only the authority and path."""
+        # note HTTP URLs do not have a userinfo component
         parts = urlparse.urlsplit(uri)
         if parts[1]:
             # URI
-            return parts[1], parts[2] or '/'
+            scheme = parts[0]
+            authority = parts[1]
+            path = parts[2] or '/'
         elif parts[0]:
             # host:port
-            return uri, '/'
+            scheme = None
+            authority = uri
+            path = '/'
         else:
             # host
-            return parts[2], '/'
+            scheme = None
+            authority = parts[2]
+            path = '/'
+        host, port = splitport(authority)
+        if default_port and port is None and scheme is not None:
+            dport = {"http": 80,
+                     "https": 443,
+                     }.get(scheme)
+            if dport is not None:
+                authority = "%s:%d" % (host, dport)
+        return authority, path
 
     def is_suburi(self, base, test):
         """Check if test is below base in a URI tree
@@ -425,7 +444,7 @@
         return retry
 
 
-
+# XXX ugly implementation, should probably not bother deriving
 class HTTPProxyPasswordMgr(HTTPPasswordMgr):
     # has default realm and host/port
     def add_password(self, realm, uri, user, passwd):
@@ -436,32 +455,34 @@
             uris = uri
         passwd_by_domain = self.passwd.setdefault(realm, {})
         for uri in uris:
-            uri = self.reduce_uri(uri)
-            passwd_by_domain[uri] = (user, passwd)
+            for default_port in True, False:
+                reduced_uri = self.reduce_uri(uri, default_port)
+                passwd_by_domain[reduced_uri] = (user, passwd)
 
     def find_user_password(self, realm, authuri):
-        perms = [(realm, authuri), (None, authuri)]
+        attempts = [(realm, authuri), (None, authuri)]
         # bleh, want default realm to take precedence over default
         # URI/authority, hence this outer loop
         for default_uri in False, True:
-            for realm, authuri in perms:
+            for realm, authuri in attempts:
                 authinfo_by_domain = self.passwd.get(realm, {})
-                reduced_authuri = self.reduce_uri(authuri)
-                for uri, authinfo in authinfo_by_domain.iteritems():
-                    if uri is None and not default_uri:
-                        continue
-                    if self.is_suburi(uri, reduced_authuri):
-                        return authinfo
-                user, password = None, None
+                for default_port in True, False:
+                    reduced_authuri = self.reduce_uri(authuri, default_port)
+                    for uri, authinfo in authinfo_by_domain.iteritems():
+                        if uri is None and not default_uri:
+                            continue
+                        if self.is_suburi(uri, reduced_authuri):
+                            return authinfo
+                    user, password = None, None
 
-                if user is not None:
-                    break
+                    if user is not None:
+                        break
         return user, password
 
-    def reduce_uri(self, uri):
+    def reduce_uri(self, uri, default_port=True):
         if uri is None:
             return None
-        return HTTPPasswordMgr.reduce_uri(self, uri)
+        return HTTPPasswordMgr.reduce_uri(self, uri, default_port)
 
     def is_suburi(self, base, test):
         if base is None:
@@ -469,3 +490,11 @@
             hostport, path = test
             base = (hostport, "/")
         return HTTPPasswordMgr.is_suburi(self, base, test)
+
+
+class HTTPSClientCertMgr(HTTPPasswordMgr):
+    # implementation inheritance: this is not a proper subclass
+    def add_key_cert(self, uri, key_file, cert_file):
+        self.add_password(None, uri, key_file, cert_file)
+    def find_key_cert(self, authuri):
+        return HTTPPasswordMgr.find_user_password(self, None, authuri)

Modified: wwwsearch/mechanize/trunk/mechanize/_urllib2.py
==============================================================================
--- wwwsearch/mechanize/trunk/mechanize/_urllib2.py	(original)
+++ wwwsearch/mechanize/trunk/mechanize/_urllib2.py	Sun May 28 00:03:38 2006
@@ -3,20 +3,21 @@
 from urllib2 import \
      URLError, \
      HTTPError, \
-     GopherError, \
-     HTTPPasswordMgr, \
-     HTTPPasswordMgrWithDefaultRealm, \
-     AbstractBasicAuthHandler, \
-     AbstractDigestAuthHandler
+     GopherError
 # ...and from mechanize
 from _opener import OpenerDirector
 from _auth import \
+     HTTPPasswordMgr, \
+     HTTPPasswordMgrWithDefaultRealm, \
+     AbstractBasicAuthHandler, \
+     AbstractDigestAuthHandler, \
      HTTPProxyPasswordMgr, \
      ProxyHandler, \
      ProxyBasicAuthHandler, \
      ProxyDigestAuthHandler, \
      HTTPBasicAuthHandler, \
-     HTTPDigestAuthHandler
+     HTTPDigestAuthHandler, \
+     HTTPSClientCertMgr
 from _urllib2_support import \
      Request, \
      build_opener, install_opener, urlopen, \

Modified: wwwsearch/mechanize/trunk/mechanize/_urllib2_support.py
==============================================================================
--- wwwsearch/mechanize/trunk/mechanize/_urllib2_support.py	(original)
+++ wwwsearch/mechanize/trunk/mechanize/_urllib2_support.py	Sun May 28 00:03:38 2006
@@ -623,9 +623,29 @@
     http_request = AbstractHTTPHandler.do_request_
 
 if hasattr(httplib, 'HTTPS'):
+
+    class HTTPSConnectionFactory:
+        def __init__(self, key_file, cert_file):
+            self._key_file = key_file
+            self._cert_file = cert_file
+        def __call__(self, hostport):
+            return httplib.HTTPSConnection(
+                hostport,
+                key_file=self._key_file, cert_file=self._cert_file)
+
     class HTTPSHandler(AbstractHTTPHandler):
+        def __init__(self, client_cert_manager=None):
+            AbstractHTTPHandler.__init__(self)
+            self.client_cert_manager = client_cert_manager
+
         def https_open(self, req):
-            return self.do_open(httplib.HTTPSConnection, req)
+            if self.client_cert_manager is not None:
+                key_file, cert_file = self.client_cert_manager.find_key_cert(
+                    req.get_full_url())
+                conn_factory = HTTPSConnectionFactory(key_file, cert_file)
+            else:
+                conn_factory = httplib.HTTPSConnection
+            return self.do_open(conn_factory, req)
 
         https_request = AbstractHTTPHandler.do_request_
 

Modified: wwwsearch/mechanize/trunk/mechanize/_useragent.py
==============================================================================
--- wwwsearch/mechanize/trunk/mechanize/_useragent.py	(original)
+++ wwwsearch/mechanize/trunk/mechanize/_useragent.py	Sun May 28 00:03:38 2006
@@ -130,6 +130,10 @@
             ppm = _auth.HTTPProxyPasswordMgr()
         self.set_password_manager(pm)
         self.set_proxy_password_manager(ppm)
+        # set default certificate manager
+        if "https" in ua_handlers:
+            cm = _urllib2.HTTPSClientCertMgr()
+            self.set_client_cert_manager(cm)
 
         # special case, requires extra support from mechanize.Browser
         self._handle_referer = True
@@ -200,6 +204,25 @@
         self._proxy_password_manager.add_password(
             realm, hostport, user, password)
 
+    def add_client_certificate(self, url, key_file, cert_file):
+        """Add an SSL client certificate, for HTTPS client auth.
+
+        key_file and cert_file must be filenames of the key and certificate
+        files, in PEM format.  You can use e.g. OpenSSL to convert a p12 (PKCS
+        12) file to PEM format:
+
+        openssl pkcs12 -clcerts -nokeys -in cert.p12 -out cert.pem
+        openssl pkcs12 -nocerts -in cert.p12 -out key.pem
+
+
+        Note that client certificate password input is very inflexible ATM.  At
+        the moment this seems to be console only, which is presumably the
+        default behaviour of libopenssl.  In future mechanize may support
+        third-party libraries that (I assume) allow more options here.
+
+        """
+        self._client_cert_manager.add_key_cert(url, key_file, cert_file)
+
     # the following are rarely useful -- use add_password / add_proxy_password
     # instead
     def set_password_manager(self, password_manager):
@@ -212,6 +235,11 @@
         self._proxy_password_manager = password_manager
         self._set_handler("_proxy_basicauth", obj=password_manager)
         self._set_handler("_proxy_digestauth", obj=password_manager)
+    def set_client_cert_manager(self, cert_manager):
+        """Set a mechanize.HTTPClientCertMgr, or None."""
+        self._client_cert_manager = cert_manager
+        handler = self._ua_handlers["https"]
+        handler.client_cert_manager = cert_manager
 
     # these methods all take a boolean parameter
     def set_handle_robots(self, handle):

Modified: wwwsearch/mechanize/trunk/test.py
==============================================================================
--- wwwsearch/mechanize/trunk/test.py	(original)
+++ wwwsearch/mechanize/trunk/test.py	Sun May 28 00:03:38 2006
@@ -102,12 +102,34 @@
 
 
 if __name__ == "__main__":
+##     sys.path.insert(0, '/home/john/comp/dev/rl/jjlee/lib/python')
+##     import jjl
+##     import __builtin__
+##     __builtin__.jjl = jjl
+
     # XXX temporary stop-gap to run doctests
     assert os.path.isdir('test')
     sys.path.insert(0, 'test')
+    # needed for recent doctest / linecache -- this is only for testing
+    # purposes, these don't get installed
+    # doctest.py revision 45701 and linecache.py revision 45940.  Since
+    # linecache is used by Python itself, linecache.py is renamed
+    # linecache_copy.py, and this copy of doctest is modified (only) to use
+    # that renamed module.
+    sys.path.insert(0, 'test-tools')
     import doctest
-    import test_mechanize
-    doctest.testmod(test_mechanize)
+    import mechanize
+    common_globs = {"mechanize": mechanize}
+    for globs in [
+        {"mgr_class": mechanize.HTTPPasswordMgr},
+        {"mgr_class": mechanize.HTTPProxyPasswordMgr},
+        ]:
+        globs.update(common_globs)
+        doctest.testfile(
+            os.path.join('test', 'test_password_manager.doctest'),
+            #os.path.join('test', 'test_scratch.doctest'),
+            globs=globs,
+            )
     from mechanize import _headersutil, _auth, _clientcookie, _pullparser
     doctest.testmod(_headersutil)
     doctest.testmod(_auth)

Modified: wwwsearch/mechanize/trunk/test/test_mechanize.py
==============================================================================
--- wwwsearch/mechanize/trunk/test/test_mechanize.py	(original)
+++ wwwsearch/mechanize/trunk/test/test_mechanize.py	Sun May 28 00:03:38 2006
@@ -15,104 +15,6 @@
     FACTORY_CLASSES.append(mechanize.RobustFactory)
 
 
-def test_password_manager(self):
-    """
-    >>> mgr = mechanize.HTTPProxyPasswordMgr()
-    >>> add = mgr.add_password
-
-    >>> add("Some Realm", "http://example.com/", "joe", "password")
-    >>> add("Some Realm", "http://example.com/ni", "ni", "ni")
-    >>> add("c", "http://example.com/foo", "foo", "ni")
-    >>> add("c", "http://example.com/bar", "bar", "nini")
-    >>> add("b", "http://example.com/", "first", "blah")
-    >>> add("b", "http://example.com/", "second", "spam")
-    >>> add("a", "http://example.com", "1", "a")
-    >>> add("Some Realm", "http://c.example.com:3128", "3", "c")
-    >>> add("Some Realm", "d.example.com", "4", "d")
-    >>> add("Some Realm", "e.example.com:3128", "5", "e")
-
-    >>> mgr.find_user_password("Some Realm", "example.com")
-    ('joe', 'password')
-    >>> mgr.find_user_password("Some Realm", "http://example.com")
-    ('joe', 'password')
-    >>> mgr.find_user_password("Some Realm", "http://example.com/")
-    ('joe', 'password')
-    >>> mgr.find_user_password("Some Realm", "http://example.com/spam")
-    ('joe', 'password')
-    >>> mgr.find_user_password("Some Realm", "http://example.com/spam/spam")
-    ('joe', 'password')
-    >>> mgr.find_user_password("c", "http://example.com/foo")
-    ('foo', 'ni')
-    >>> mgr.find_user_password("c", "http://example.com/bar")
-    ('bar', 'nini')
-
-    Currently, we use the highest-level path where more than one match:
-
-    >>> mgr.find_user_password("Some Realm", "http://example.com/ni")
-    ('joe', 'password')
-
-    Use latest add_password() in case of conflict:
-
-    >>> mgr.find_user_password("b", "http://example.com/")
-    ('second', 'spam')
-
-    No special relationship between a.example.com and example.com:
-
-    >>> mgr.find_user_password("a", "http://example.com/")
-    ('1', 'a')
-    >>> mgr.find_user_password("a", "http://a.example.com/")
-    (None, None)
-
-    Ports:
-
-    >>> mgr.find_user_password("Some Realm", "c.example.com")
-    (None, None)
-    >>> mgr.find_user_password("Some Realm", "c.example.com:3128")
-    ('3', 'c')
-    >>> mgr.find_user_password("Some Realm", "http://c.example.com:3128")
-    ('3', 'c')
-    >>> mgr.find_user_password("Some Realm", "d.example.com")
-    ('4', 'd')
-    >>> mgr.find_user_password("Some Realm", "e.example.com:3128")
-    ('5', 'e')
-
-
-    Now features specific to HTTPProxyPasswordMgr.
-
-    Default realm:
-
-    >>> mgr.find_user_password("d", "f.example.com")
-    (None, None)
-    >>> add(None, "f.example.com", "6", "f")
-    >>> mgr.find_user_password("d", "f.example.com")
-    ('6', 'f')
-
-    Default host/port:
-
-    >>> mgr.find_user_password("e", "g.example.com")
-    (None, None)
-    >>> add("e", None, "7", "g")
-    >>> mgr.find_user_password("e", "g.example.com")
-    ('7', 'g')
-
-    Default realm and host/port:
-
-    >>> mgr.find_user_password("f", "h.example.com")
-    (None, None)
-    >>> add(None, None, "8", "h")
-    >>> mgr.find_user_password("f", "h.example.com")
-    ('8', 'h')
-
-    Default realm beats default host/port:
-
-    >>> add("d", None, "9", "i")
-    >>> mgr.find_user_password("d", "f.example.com")
-    ('6', 'f')
-
-    """
-    pass
-
-
 class CachingGeneratorFunctionTests(TestCase):
 
     def _get_simple_cgenf(self, log):
@@ -843,8 +745,5 @@
         ua._set_handler("_blah", True)
 
 if __name__ == "__main__":
-    import test_mechanize
-    import doctest
-    doctest.testmod(test_mechanize)
     import unittest
     unittest.main()

Added: wwwsearch/mechanize/trunk/test/test_password_manager.doctest
==============================================================================
--- (empty file)
+++ wwwsearch/mechanize/trunk/test/test_password_manager.doctest	Sun May 28 00:03:38 2006
@@ -0,0 +1,148 @@
+Features common to HTTPPasswordMgr and HTTPProxyPasswordMgr
+===========================================================
+
+(mgr_class gets here through globs argument)
+
+>>> mgr = mgr_class()
+>>> add = mgr.add_password
+
+>>> add("Some Realm", "http://example.com/", "joe", "password")
+>>> add("Some Realm", "http://example.com/ni", "ni", "ni")
+>>> add("c", "http://example.com/foo", "foo", "ni")
+>>> add("c", "http://example.com/bar", "bar", "nini")
+>>> add("b", "http://example.com/", "first", "blah")
+>>> add("b", "http://example.com/", "second", "spam")
+>>> add("a", "http://example.com", "1", "a")
+>>> add("Some Realm", "http://c.example.com:3128", "3", "c")
+>>> add("Some Realm", "d.example.com", "4", "d")
+>>> add("Some Realm", "e.example.com:3128", "5", "e")
+
+>>> mgr.find_user_password("Some Realm", "example.com")
+('joe', 'password')
+>>> mgr.find_user_password("Some Realm", "http://example.com")
+('joe', 'password')
+>>> mgr.find_user_password("Some Realm", "http://example.com/")
+('joe', 'password')
+>>> mgr.find_user_password("Some Realm", "http://example.com/spam")
+('joe', 'password')
+>>> mgr.find_user_password("Some Realm", "http://example.com/spam/spam")
+('joe', 'password')
+>>> mgr.find_user_password("c", "http://example.com/foo")
+('foo', 'ni')
+>>> mgr.find_user_password("c", "http://example.com/bar")
+('bar', 'nini')
+
+Actually, this is really undefined ATM
+#Currently, we use the highest-level path where more than one match:
+#
+#>>> mgr.find_user_password("Some Realm", "http://example.com/ni")
+#('joe', 'password')
+
+Use latest add_password() in case of conflict:
+
+>>> mgr.find_user_password("b", "http://example.com/")
+('second', 'spam')
+
+No special relationship between a.example.com and example.com:
+
+>>> mgr.find_user_password("a", "http://example.com/")
+('1', 'a')
+>>> mgr.find_user_password("a", "http://a.example.com/")
+(None, None)
+
+Ports:
+
+>>> mgr.find_user_password("Some Realm", "c.example.com")
+(None, None)
+>>> mgr.find_user_password("Some Realm", "c.example.com:3128")
+('3', 'c')
+>>> mgr.find_user_password("Some Realm", "http://c.example.com:3128")
+('3', 'c')
+>>> mgr.find_user_password("Some Realm", "d.example.com")
+('4', 'd')
+>>> mgr.find_user_password("Some Realm", "e.example.com:3128")
+('5', 'e')
+
+
+Default port tests
+------------------
+
+>>> mgr = mgr_class()
+>>> add = mgr.add_password
+
+The point to note here is that we can't guess the default port if there's
+no scheme.  This applies to both add_password and find_user_password.
+
+>>> add("f", "http://g.example.com:80", "10", "j")
+>>> add("g", "http://h.example.com", "11", "k")
+>>> add("h", "i.example.com:80", "12", "l")
+>>> add("i", "j.example.com", "13", "m")
+>>> mgr.find_user_password("f", "g.example.com:100")
+(None, None)
+>>> mgr.find_user_password("f", "g.example.com:80")
+('10', 'j')
+>>> mgr.find_user_password("f", "g.example.com")
+(None, None)
+>>> mgr.find_user_password("f", "http://g.example.com:100")
+(None, None)
+>>> mgr.find_user_password("f", "http://g.example.com:80")
+('10', 'j')
+>>> mgr.find_user_password("f", "http://g.example.com")
+('10', 'j')
+>>> mgr.find_user_password("g", "h.example.com")
+('11', 'k')
+>>> mgr.find_user_password("g", "h.example.com:80")
+('11', 'k')
+>>> mgr.find_user_password("g", "http://h.example.com:80")
+('11', 'k')
+>>> mgr.find_user_password("h", "i.example.com")
+(None, None)
+>>> mgr.find_user_password("h", "i.example.com:80")
+('12', 'l')
+>>> mgr.find_user_password("h", "http://i.example.com:80")
+('12', 'l')
+>>> mgr.find_user_password("i", "j.example.com")
+('13', 'm')
+>>> mgr.find_user_password("i", "j.example.com:80")
+(None, None)
+>>> mgr.find_user_password("i", "http://j.example.com")
+('13', 'm')
+>>> mgr.find_user_password("i", "http://j.example.com:80")
+(None, None)
+
+
+Features specific to HTTPProxyPasswordMgr
+=========================================
+
+Default realm:
+
+>>> mgr = mechanize.HTTPProxyPasswordMgr()
+>>> add = mgr.add_password
+
+>>> mgr.find_user_password("d", "f.example.com")
+(None, None)
+>>> add(None, "f.example.com", "6", "f")
+>>> mgr.find_user_password("d", "f.example.com")
+('6', 'f')
+
+Default host/port:
+
+>>> mgr.find_user_password("e", "g.example.com")
+(None, None)
+>>> add("e", None, "7", "g")
+>>> mgr.find_user_password("e", "g.example.com")
+('7', 'g')
+
+Default realm and host/port:
+
+>>> mgr.find_user_password("f", "h.example.com")
+(None, None)
+>>> add(None, None, "8", "h")
+>>> mgr.find_user_password("f", "h.example.com")
+('8', 'h')
+
+Default realm beats default host/port:
+
+>>> add("d", None, "9", "i")
+>>> mgr.find_user_password("d", "f.example.com")
+('6', 'f')


More information about the wwwsearch-commits mailing list