[wwwsearch-commits] r27434 - in wwwsearch/mechanize/trunk: mechanize test
jjlee at codespeak.net
jjlee at codespeak.net
Fri May 19 02:04:54 CEST 2006
Author: jjlee
Date: Fri May 19 02:04:51 2006
New Revision: 27434
Added:
wwwsearch/mechanize/trunk/mechanize/_auth.py
wwwsearch/mechanize/trunk/mechanize/_urllib2.py
Modified:
wwwsearch/mechanize/trunk/mechanize/__init__.py
wwwsearch/mechanize/trunk/mechanize/_useragent.py
wwwsearch/mechanize/trunk/test/test_urllib2.py
Log:
Make use of fact that we now export urllib2 interface; Pull in bugfixed auth/proxy support code from Python 2.5
Modified: wwwsearch/mechanize/trunk/mechanize/__init__.py
==============================================================================
--- wwwsearch/mechanize/trunk/mechanize/__init__.py (original)
+++ wwwsearch/mechanize/trunk/mechanize/__init__.py Fri May 19 02:04:51 2006
@@ -13,59 +13,8 @@
FormsFactory, LinksFactory, TitleFactory, \
RobustFormsFactory, RobustLinksFactory, RobustTitleFactory
-# urllib2 work-alike interface
-# ...from urllib2...
-from urllib2 import \
- URLError, \
- HTTPError, \
- GopherError, \
- HTTPPasswordMgr, \
- HTTPPasswordMgrWithDefaultRealm, \
- AbstractBasicAuthHandler, \
- AbstractDigestAuthHandler
-# ...and from mechanize
-from _Opener import OpenerDirector
-from _useragent import HTTPProxyPasswordMgr
-from _urllib2_support import \
- Request, \
- build_opener, install_opener, urlopen, \
- OpenerFactory, urlretrieve, \
- RobotExclusionError
-
-# handlers...
-# ...from urllib2...
-from urllib2 import \
- BaseHandler, \
- ProxyHandler, \
- ProxyBasicAuthHandler, \
- ProxyDigestAuthHandler, \
- HTTPBasicAuthHandler, \
- HTTPDigestAuthHandler, \
- HTTPDefaultErrorHandler, \
- UnknownHandler, \
- FTPHandler, \
- CacheFTPHandler, \
- FileHandler, \
- GopherHandler
-# ...and from mechanize
-from _urllib2_support import \
- HTTPHandler, \
- HTTPRedirectHandler, \
- HTTPRequestUpgradeProcessor, \
- HTTPEquivProcessor, \
- SeekableProcessor, \
- HTTPCookieProcessor, \
- HTTPRefererProcessor, \
- HTTPRefreshProcessor, \
- HTTPErrorProcessor, \
- HTTPResponseDebugProcessor, \
- HTTPRedirectDebugProcessor, \
- HTTPRobotRulesProcessor
-import httplib
-if hasattr(httplib, 'HTTPS'):
- from _urllib2_support import HTTPSHandler
-del httplib
-#from _gzip import HTTPGzipProcessor
+# urllib2 work-alike interface (part from mechanize, part from urllib2)
+from _urllib2 import *
# misc
from _Util import http2time as str2time
@@ -76,6 +25,8 @@
from _urllib2_support import XHTMLCompatibleHeadParser
except ImportError:
pass
+#from _gzip import HTTPGzipProcessor # crap ATM
+
# cookies
from _ClientCookie import Cookie, CookiePolicy, DefaultCookiePolicy, \
Added: wwwsearch/mechanize/trunk/mechanize/_auth.py
==============================================================================
--- (empty file)
+++ wwwsearch/mechanize/trunk/mechanize/_auth.py Fri May 19 02:04:51 2006
@@ -0,0 +1,471 @@
+"""HTTP Authentication and Proxy support.
+
+All but HTTPProxyPasswordMgr come from Python 2.5.
+
+
+Copyright 2006 John J. Lee <jjl at pobox.com>
+
+This code is free software; you can redistribute it and/or modify it under
+the terms of the BSD or ZPL 2.1 licenses (see the file COPYING.txt
+included with the distribution).
+
+"""
+
+import re, base64, urlparse, posixpath, md5, sha
+
+from urllib2 import BaseHandler
+from urllib import getproxies, unquote, splittype, splituser, splitpasswd
+
+
+def _parse_proxy(proxy):
+ """Return (scheme, user, password, host/port) given a URL or an authority.
+
+ If a URL is supplied, it must have an authority (host:port) component.
+ According to RFC 3986, having an authority component means the URL must
+ have two slashes after the scheme:
+
+ >>> _parse_proxy('file:/ftp.example.com/')
+ Traceback (most recent call last):
+ ValueError: proxy URL with no authority: 'file:/ftp.example.com/'
+
+ The first three items of the returned tuple may be None.
+
+ Examples of authority parsing:
+
+ >>> _parse_proxy('proxy.example.com')
+ (None, None, None, 'proxy.example.com')
+ >>> _parse_proxy('proxy.example.com:3128')
+ (None, None, None, 'proxy.example.com:3128')
+
+ The authority component may optionally include userinfo (assumed to be
+ username:password):
+
+ >>> _parse_proxy('joe:password at proxy.example.com')
+ (None, 'joe', 'password', 'proxy.example.com')
+ >>> _parse_proxy('joe:password at proxy.example.com:3128')
+ (None, 'joe', 'password', 'proxy.example.com:3128')
+
+ Same examples, but with URLs instead:
+
+ >>> _parse_proxy('http://proxy.example.com/')
+ ('http', None, None, 'proxy.example.com')
+ >>> _parse_proxy('http://proxy.example.com:3128/')
+ ('http', None, None, 'proxy.example.com:3128')
+ >>> _parse_proxy('http://joe:password@proxy.example.com/')
+ ('http', 'joe', 'password', 'proxy.example.com')
+ >>> _parse_proxy('http://joe:password@proxy.example.com:3128')
+ ('http', 'joe', 'password', 'proxy.example.com:3128')
+
+ Everything after the authority is ignored:
+
+ >>> _parse_proxy('ftp://joe:password@proxy.example.com/rubbish:3128')
+ ('ftp', 'joe', 'password', 'proxy.example.com')
+
+ Test for no trailing '/' case:
+
+ >>> _parse_proxy('http://joe:password@proxy.example.com')
+ ('http', 'joe', 'password', 'proxy.example.com')
+
+ """
+ scheme, r_scheme = splittype(proxy)
+ if not r_scheme.startswith("/"):
+ # authority
+ scheme = None
+ authority = proxy
+ else:
+ # URL
+ if not r_scheme.startswith("//"):
+ raise ValueError("proxy URL with no authority: %r" % proxy)
+ # We have an authority, so for RFC 3986-compliant URLs (by ss 3.
+ # and 3.3.), path is empty or starts with '/'
+ end = r_scheme.find("/", 2)
+ if end == -1:
+ end = None
+ authority = r_scheme[2:end]
+ userinfo, hostport = splituser(authority)
+ if userinfo is not None:
+ user, password = splitpasswd(userinfo)
+ else:
+ user = password = None
+ return scheme, user, password, hostport
+
+class ProxyHandler(BaseHandler):
+ # Proxies must be in front
+ handler_order = 100
+
+ def __init__(self, proxies=None):
+ if proxies is None:
+ proxies = getproxies()
+ assert hasattr(proxies, 'has_key'), "proxies must be a mapping"
+ self.proxies = proxies
+ for type, url in proxies.items():
+ setattr(self, '%s_open' % type,
+ lambda r, proxy=url, type=type, meth=self.proxy_open: \
+ meth(r, proxy, type))
+
+ def proxy_open(self, req, proxy, type):
+ orig_type = req.get_type()
+ proxy_type, user, password, hostport = _parse_proxy(proxy)
+ if proxy_type is None:
+ proxy_type = orig_type
+ if user and password:
+ user_pass = '%s:%s' % (unquote(user), unquote(password))
+ creds = base64.encodestring(user_pass).strip()
+ req.add_header('Proxy-authorization', 'Basic ' + creds)
+ hostport = unquote(hostport)
+ req.set_proxy(hostport, proxy_type)
+ if orig_type == proxy_type:
+ # let other handlers take care of it
+ return None
+ else:
+ # need to start over, because the other handlers don't
+ # grok the proxy's URL type
+ # e.g. if we have a constructor arg proxies like so:
+ # {'http': 'ftp://proxy.example.com'}, we may end up turning
+ # a request for http://acme.example.com/a into one for
+ # ftp://proxy.example.com/a
+ return self.parent.open(req)
+
+class HTTPPasswordMgr:
+
+ def __init__(self):
+ self.passwd = {}
+
+ def add_password(self, realm, uri, user, passwd):
+ # uri could be a single URI or a sequence
+ if isinstance(uri, basestring):
+ uri = [uri]
+ uri = tuple(map(self.reduce_uri, uri))
+ if not realm in self.passwd:
+ self.passwd[realm] = {}
+ self.passwd[realm][uri] = (user, passwd)
+
+ def find_user_password(self, realm, authuri):
+ domains = self.passwd.get(realm, {})
+ authuri = self.reduce_uri(authuri)
+ for uris, authinfo in domains.iteritems():
+ for uri in uris:
+ if self.is_suburi(uri, authuri):
+ return authinfo
+ return None, None
+
+ def reduce_uri(self, uri):
+ """Accept netloc or URI and extract only the netloc and path"""
+ parts = urlparse.urlsplit(uri)
+ if parts[1]:
+ # URI
+ return parts[1], parts[2] or '/'
+ elif parts[0]:
+ # host:port
+ return uri, '/'
+ else:
+ # host
+ return parts[2], '/'
+
+ def is_suburi(self, base, test):
+ """Check if test is below base in a URI tree
+
+ Both args must be URIs in reduced form.
+ """
+ if base == test:
+ return True
+ if base[0] != test[0]:
+ return False
+ common = posixpath.commonprefix((base[1], test[1]))
+ if len(common) == len(base[1]):
+ return True
+ return False
+
+
+class HTTPPasswordMgrWithDefaultRealm(HTTPPasswordMgr):
+
+ def find_user_password(self, realm, authuri):
+ user, password = HTTPPasswordMgr.find_user_password(self, realm,
+ authuri)
+ if user is not None:
+ return user, password
+ return HTTPPasswordMgr.find_user_password(self, None, authuri)
+
+
+class AbstractBasicAuthHandler:
+
+ rx = re.compile('[ \t]*([^ \t]+)[ \t]+realm="([^"]*)"', re.I)
+
+ # XXX there can actually be multiple auth-schemes in a
+ # www-authenticate header. should probably be a lot more careful
+ # in parsing them to extract multiple alternatives
+
+ def __init__(self, password_mgr=None):
+ if password_mgr is None:
+ password_mgr = HTTPPasswordMgr()
+ self.passwd = password_mgr
+ self.add_password = self.passwd.add_password
+
+ def http_error_auth_reqed(self, authreq, host, req, headers):
+ # host may be an authority (without userinfo) or a URL with an
+ # authority
+ # XXX could be multiple headers
+ authreq = headers.get(authreq, None)
+ if authreq:
+ mo = AbstractBasicAuthHandler.rx.search(authreq)
+ if mo:
+ scheme, realm = mo.groups()
+ if scheme.lower() == 'basic':
+ return self.retry_http_basic_auth(host, req, realm)
+
+ def retry_http_basic_auth(self, host, req, realm):
+ user, pw = self.passwd.find_user_password(realm, host)
+ if pw is not None:
+ raw = "%s:%s" % (user, pw)
+ auth = 'Basic %s' % base64.encodestring(raw).strip()
+ if req.headers.get(self.auth_header, None) == auth:
+ return None
+ req.add_header(self.auth_header, auth)
+ return self.parent.open(req)
+ else:
+ return None
+
+
+class HTTPBasicAuthHandler(AbstractBasicAuthHandler, BaseHandler):
+
+ auth_header = 'Authorization'
+
+ def http_error_401(self, req, fp, code, msg, headers):
+ url = req.get_full_url()
+ return self.http_error_auth_reqed('www-authenticate',
+ url, req, headers)
+
+
+class ProxyBasicAuthHandler(AbstractBasicAuthHandler, BaseHandler):
+
+ auth_header = 'Proxy-authorization'
+
+ def http_error_407(self, req, fp, code, msg, headers):
+ # http_error_auth_reqed requires that there is no userinfo component in
+ # authority. Assume there isn't one, since urllib2 does not (and
+ # should not, RFC 3986 s. 3.2.1) support requests for URLs containing
+ # userinfo.
+ authority = req.get_host()
+ return self.http_error_auth_reqed('proxy-authenticate',
+ authority, req, headers)
+
+
+def randombytes(n):
+ """Return n random bytes."""
+ # Use /dev/urandom if it is available. Fall back to random module
+ # if not. It might be worthwhile to extend this function to use
+ # other platform-specific mechanisms for getting random bytes.
+ if os.path.exists("/dev/urandom"):
+ f = open("/dev/urandom")
+ s = f.read(n)
+ f.close()
+ return s
+ else:
+ L = [chr(random.randrange(0, 256)) for i in range(n)]
+ return "".join(L)
+
+class AbstractDigestAuthHandler:
+ # Digest authentication is specified in RFC 2617.
+
+ # XXX The client does not inspect the Authentication-Info header
+ # in a successful response.
+
+ # XXX It should be possible to test this implementation against
+ # a mock server that just generates a static set of challenges.
+
+ # XXX qop="auth-int" supports is shaky
+
+ def __init__(self, passwd=None):
+ if passwd is None:
+ passwd = HTTPPasswordMgr()
+ self.passwd = passwd
+ self.add_password = self.passwd.add_password
+ self.retried = 0
+ self.nonce_count = 0
+
+ def reset_retry_count(self):
+ self.retried = 0
+
+ def http_error_auth_reqed(self, auth_header, host, req, headers):
+ authreq = headers.get(auth_header, None)
+ if self.retried > 5:
+ # Don't fail endlessly - if we failed once, we'll probably
+ # fail a second time. Hm. Unless the Password Manager is
+ # prompting for the information. Crap. This isn't great
+ # but it's better than the current 'repeat until recursion
+ # depth exceeded' approach <wink>
+ raise HTTPError(req.get_full_url(), 401, "digest auth failed",
+ headers, None)
+ else:
+ self.retried += 1
+ if authreq:
+ scheme = authreq.split()[0]
+ if scheme.lower() == 'digest':
+ return self.retry_http_digest_auth(req, authreq)
+
+ def retry_http_digest_auth(self, req, auth):
+ token, challenge = auth.split(' ', 1)
+ chal = parse_keqv_list(parse_http_list(challenge))
+ auth = self.get_authorization(req, chal)
+ if auth:
+ auth_val = 'Digest %s' % auth
+ if req.headers.get(self.auth_header, None) == auth_val:
+ return None
+ req.add_unredirected_header(self.auth_header, auth_val)
+ resp = self.parent.open(req)
+ return resp
+
+ def get_cnonce(self, nonce):
+ # The cnonce-value is an opaque
+ # quoted string value provided by the client and used by both client
+ # and server to avoid chosen plaintext attacks, to provide mutual
+ # authentication, and to provide some message integrity protection.
+ # This isn't a fabulous effort, but it's probably Good Enough.
+ dig = sha.new("%s:%s:%s:%s" % (self.nonce_count, nonce, time.ctime(),
+ randombytes(8))).hexdigest()
+ return dig[:16]
+
+ def get_authorization(self, req, chal):
+ try:
+ realm = chal['realm']
+ nonce = chal['nonce']
+ qop = chal.get('qop')
+ algorithm = chal.get('algorithm', 'MD5')
+ # mod_digest doesn't send an opaque, even though it isn't
+ # supposed to be optional
+ opaque = chal.get('opaque', None)
+ except KeyError:
+ return None
+
+ H, KD = self.get_algorithm_impls(algorithm)
+ if H is None:
+ return None
+
+ user, pw = self.passwd.find_user_password(realm, req.get_full_url())
+ if user is None:
+ return None
+
+ # XXX not implemented yet
+ if req.has_data():
+ entdig = self.get_entity_digest(req.get_data(), chal)
+ else:
+ entdig = None
+
+ A1 = "%s:%s:%s" % (user, realm, pw)
+ A2 = "%s:%s" % (req.get_method(),
+ # XXX selector: what about proxies and full urls
+ req.get_selector())
+ if qop == 'auth':
+ self.nonce_count += 1
+ ncvalue = '%08x' % self.nonce_count
+ cnonce = self.get_cnonce(nonce)
+ noncebit = "%s:%s:%s:%s:%s" % (nonce, ncvalue, cnonce, qop, H(A2))
+ respdig = KD(H(A1), noncebit)
+ elif qop is None:
+ respdig = KD(H(A1), "%s:%s" % (nonce, H(A2)))
+ else:
+ # XXX handle auth-int.
+ pass
+
+ # XXX should the partial digests be encoded too?
+
+ base = 'username="%s", realm="%s", nonce="%s", uri="%s", ' \
+ 'response="%s"' % (user, realm, nonce, req.get_selector(),
+ respdig)
+ if opaque:
+ base += ', opaque="%s"' % opaque
+ if entdig:
+ base += ', digest="%s"' % entdig
+ base += ', algorithm="%s"' % algorithm
+ if qop:
+ base += ', qop=auth, nc=%s, cnonce="%s"' % (ncvalue, cnonce)
+ return base
+
+ def get_algorithm_impls(self, algorithm):
+ # lambdas assume digest modules are imported at the top level
+ if algorithm == 'MD5':
+ H = lambda x: md5.new(x).hexdigest()
+ elif algorithm == 'SHA':
+ H = lambda x: sha.new(x).hexdigest()
+ # XXX MD5-sess
+ KD = lambda s, d: H("%s:%s" % (s, d))
+ return H, KD
+
+ def get_entity_digest(self, data, chal):
+ # XXX not implemented yet
+ return None
+
+
+class HTTPDigestAuthHandler(BaseHandler, AbstractDigestAuthHandler):
+ """An authentication protocol defined by RFC 2069
+
+ Digest authentication improves on basic authentication because it
+ does not transmit passwords in the clear.
+ """
+
+ auth_header = 'Authorization'
+
+ def http_error_401(self, req, fp, code, msg, headers):
+ host = urlparse.urlparse(req.get_full_url())[1]
+ retry = self.http_error_auth_reqed('www-authenticate',
+ host, req, headers)
+ self.reset_retry_count()
+ return retry
+
+
+class ProxyDigestAuthHandler(BaseHandler, AbstractDigestAuthHandler):
+
+ auth_header = 'Proxy-Authorization'
+
+ def http_error_407(self, req, fp, code, msg, headers):
+ host = req.get_host()
+ retry = self.http_error_auth_reqed('proxy-authenticate',
+ host, req, headers)
+ self.reset_retry_count()
+ return retry
+
+
+
+class HTTPProxyPasswordMgr(HTTPPasswordMgr):
+ # has default realm and host/port
+ def add_password(self, realm, uri, user, passwd):
+ # uri could be a single URI or a sequence
+ if uri is None or isinstance(uri, basestring):
+ uris = [uri]
+ else:
+ uris = uri
+ passwd_by_domain = self.passwd.setdefault(realm, {})
+ for uri in uris:
+ uri = self.reduce_uri(uri)
+ passwd_by_domain[uri] = (user, passwd)
+
+ def find_user_password(self, realm, authuri):
+ perms = [(realm, authuri), (None, authuri)]
+ # bleh, want default realm to take precedence over default
+ # URI/authority, hence this outer loop
+ for default_uri in False, True:
+ for realm, authuri in perms:
+ authinfo_by_domain = self.passwd.get(realm, {})
+ reduced_authuri = self.reduce_uri(authuri)
+ for uri, authinfo in authinfo_by_domain.iteritems():
+ if uri is None and not default_uri:
+ continue
+ if self.is_suburi(uri, reduced_authuri):
+ return authinfo
+ user, password = None, None
+
+ if user is not None:
+ break
+ return user, password
+
+ def reduce_uri(self, uri):
+ if uri is None:
+ return None
+ return HTTPPasswordMgr.reduce_uri(self, uri)
+
+ def is_suburi(self, base, test):
+ if base is None:
+ # default to the proxy's host/port
+ hostport, path = test
+ base = (hostport, "/")
+ return HTTPPasswordMgr.is_suburi(self, base, test)
Added: wwwsearch/mechanize/trunk/mechanize/_urllib2.py
==============================================================================
--- (empty file)
+++ wwwsearch/mechanize/trunk/mechanize/_urllib2.py Fri May 19 02:04:51 2006
@@ -0,0 +1,53 @@
+# urllib2 work-alike interface
+# ...from urllib2...
+from urllib2 import \
+ URLError, \
+ HTTPError, \
+ GopherError, \
+ HTTPPasswordMgr, \
+ HTTPPasswordMgrWithDefaultRealm, \
+ AbstractBasicAuthHandler, \
+ AbstractDigestAuthHandler
+# ...and from mechanize
+from _Opener import OpenerDirector
+from _auth import \
+ HTTPProxyPasswordMgr, \
+ ProxyHandler, \
+ ProxyBasicAuthHandler, \
+ ProxyDigestAuthHandler, \
+ HTTPBasicAuthHandler, \
+ HTTPDigestAuthHandler
+from _urllib2_support import \
+ Request, \
+ build_opener, install_opener, urlopen, \
+ OpenerFactory, urlretrieve, \
+ RobotExclusionError
+
+# handlers...
+# ...from urllib2...
+from urllib2 import \
+ BaseHandler, \
+ HTTPDefaultErrorHandler, \
+ UnknownHandler, \
+ FTPHandler, \
+ CacheFTPHandler, \
+ FileHandler, \
+ GopherHandler
+# ...and from mechanize
+from _urllib2_support import \
+ HTTPHandler, \
+ HTTPRedirectHandler, \
+ HTTPRequestUpgradeProcessor, \
+ HTTPEquivProcessor, \
+ SeekableProcessor, \
+ HTTPCookieProcessor, \
+ HTTPRefererProcessor, \
+ HTTPRefreshProcessor, \
+ HTTPErrorProcessor, \
+ HTTPResponseDebugProcessor, \
+ HTTPRedirectDebugProcessor, \
+ HTTPRobotRulesProcessor
+import httplib
+if hasattr(httplib, 'HTTPS'):
+ from _urllib2_support import HTTPSHandler
+del httplib
Modified: wwwsearch/mechanize/trunk/mechanize/_useragent.py
==============================================================================
--- wwwsearch/mechanize/trunk/mechanize/_useragent.py (original)
+++ wwwsearch/mechanize/trunk/mechanize/_useragent.py Fri May 19 02:04:51 2006
@@ -14,16 +14,13 @@
import sys, warnings, urllib2
from _Opener import OpenerDirector
-if sys.version_info[:2] >= (2, 4):
- from urllib2 import BaseHandler, HTTPErrorProcessor
-else:
- from _urllib2_support import BaseHandler, HTTPErrorProcessor
-import _urllib2_support
+import _urllib2
+import _auth
import _gzip
-class HTTPRefererProcessor(BaseHandler):
+class HTTPRefererProcessor(_urllib2.BaseHandler):
def http_request(self, request):
# See RFC 2616 14.36. The only times we know the source of the
# request URI has a URI associated with it are redirect, and
@@ -38,51 +35,6 @@
https_request = http_request
-class HTTPProxyPasswordMgr(urllib2.HTTPPasswordMgr):
- # has default realm and host/port
- def add_password(self, realm, uri, user, passwd):
- # uri could be a single URI or a sequence
- if uri is None or isinstance(uri, basestring):
- uris = [uri]
- else:
- uris = uri
- passwd_by_domain = self.passwd.setdefault(realm, {})
- for uri in uris:
- uri = self.reduce_uri(uri)
- passwd_by_domain[uri] = (user, passwd)
-
- def find_user_password(self, realm, authuri):
- perms = [(realm, authuri), (None, authuri)]
- # bleh, want default realm to take precedence over default
- # URI/authority, hence this outer loop
- for default_uri in False, True:
- for realm, authuri in perms:
- authinfo_by_domain = self.passwd.get(realm, {})
- reduced_authuri = self.reduce_uri(authuri)
- for uri, authinfo in authinfo_by_domain.iteritems():
- if uri is None and not default_uri:
- continue
- if self.is_suburi(uri, reduced_authuri):
- return authinfo
- user, password = None, None
-
- if user is not None:
- break
- return user, password
-
- def reduce_uri(self, uri):
- if uri is None:
- return None
- return urllib2.HTTPPasswordMgr.reduce_uri(self, uri)
-
- def is_suburi(self, base, test):
- if base is None:
- # default to the proxy's host/port
- hostport, path = test
- base = (hostport, "/")
- return urllib2.HTTPPasswordMgr.is_suburi(self, base, test)
-
-
class UserAgent(OpenerDirector):
"""Convenient user-agent class.
@@ -103,36 +55,37 @@
handler_classes = {
# scheme handlers
- "http": _urllib2_support.HTTPHandler,
- "ftp": urllib2.FTPHandler, # CacheFTPHandler is buggy in 2.3
- "file": urllib2.FileHandler,
- "gopher": urllib2.GopherHandler,
+ "http": _urllib2.HTTPHandler,
+ # CacheFTPHandler is buggy, at least in 2.3, so we don't use it
+ "ftp": _urllib2.FTPHandler,
+ "file": _urllib2.FileHandler,
+ "gopher": _urllib2.GopherHandler,
# other handlers
- "_unknown": urllib2.UnknownHandler,
+ "_unknown": _urllib2.UnknownHandler,
# HTTP{S,}Handler depend on HTTPErrorProcessor too
- "_http_error": HTTPErrorProcessor,
- "_http_request_upgrade": _urllib2_support.HTTPRequestUpgradeProcessor,
- "_http_default_error": urllib2.HTTPDefaultErrorHandler,
+ "_http_error": _urllib2.HTTPErrorProcessor,
+ "_http_request_upgrade": _urllib2.HTTPRequestUpgradeProcessor,
+ "_http_default_error": _urllib2.HTTPDefaultErrorHandler,
# feature handlers
- "_basicauth": urllib2.HTTPBasicAuthHandler,
- "_digestauth": urllib2.HTTPDigestAuthHandler,
- "_redirect": _urllib2_support.HTTPRedirectHandler,
- "_cookies": _urllib2_support.HTTPCookieProcessor,
- "_refresh": _urllib2_support.HTTPRefreshProcessor,
+ "_basicauth": _urllib2.HTTPBasicAuthHandler,
+ "_digestauth": _urllib2.HTTPDigestAuthHandler,
+ "_redirect": _urllib2.HTTPRedirectHandler,
+ "_cookies": _urllib2.HTTPCookieProcessor,
+ "_refresh": _urllib2.HTTPRefreshProcessor,
"_referer": HTTPRefererProcessor, # from this module, note
- "_equiv": _urllib2_support.HTTPEquivProcessor,
- "_seek": _urllib2_support.SeekableProcessor,
- "_proxy": urllib2.ProxyHandler,
- "_proxy_basicauth": urllib2.ProxyBasicAuthHandler,
- "_proxy_digestauth": urllib2.ProxyDigestAuthHandler,
- "_robots": _urllib2_support.HTTPRobotRulesProcessor,
+ "_equiv": _urllib2.HTTPEquivProcessor,
+ "_seek": _urllib2.SeekableProcessor,
+ "_proxy": _urllib2.ProxyHandler,
+ "_proxy_basicauth": _urllib2.ProxyBasicAuthHandler,
+ "_proxy_digestauth": _urllib2.ProxyDigestAuthHandler,
+ "_robots": _urllib2.HTTPRobotRulesProcessor,
"_gzip": _gzip.HTTPGzipProcessor, # experimental!
# debug handlers
- "_debug_redirect": _urllib2_support.HTTPRedirectDebugProcessor,
- "_debug_response_body": _urllib2_support.HTTPResponseDebugProcessor,
+ "_debug_redirect": _urllib2.HTTPRedirectDebugProcessor,
+ "_debug_response_body": _urllib2.HTTPResponseDebugProcessor,
}
default_schemes = ["http", "ftp", "file", "gopher"]
@@ -145,8 +98,8 @@
"_proxy", "_proxy_basicauth", "_proxy_digestauth",
"_seek", "_robots",
]
- if hasattr(_urllib2_support, 'HTTPSHandler'):
- handler_classes["https"] = _urllib2_support.HTTPSHandler
+ if hasattr(_urllib2, 'HTTPSHandler'):
+ handler_classes["https"] = _urllib2.HTTPSHandler
default_schemes.append("https")
def __init__(self):
@@ -171,10 +124,10 @@
# Ensure default password managers are installed.
pm = ppm = None
if "_basicauth" in ua_handlers or "_digestauth" in ua_handlers:
- pm = urllib2.HTTPPasswordMgrWithDefaultRealm()
+ pm = _urllib2.HTTPPasswordMgrWithDefaultRealm()
if ("_proxy_basicauth" in ua_handlers or
"_proxy_digestauth" in ua_handlers):
- ppm = HTTPProxyPasswordMgr()
+ ppm = _auth.HTTPProxyPasswordMgr()
self.set_password_manager(pm)
self.set_proxy_password_manager(ppm)
@@ -250,7 +203,7 @@
# the following are rarely useful -- use add_password / add_proxy_password
# instead
def set_password_manager(self, password_manager):
- """Set a urllib2.HTTPPasswordMgrWithDefaultRealm, or None."""
+ """Set a mechanize.HTTPPasswordMgrWithDefaultRealm, or None."""
self._password_manager = password_manager
self._set_handler("_basicauth", obj=password_manager)
self._set_handler("_digestauth", obj=password_manager)
Modified: wwwsearch/mechanize/trunk/test/test_urllib2.py
==============================================================================
--- wwwsearch/mechanize/trunk/test/test_urllib2.py (original)
+++ wwwsearch/mechanize/trunk/test/test_urllib2.py Fri May 19 02:04:51 2006
@@ -11,7 +11,8 @@
import unittest, StringIO, os, sys, UserDict
-import urllib2
+import mechanize
+
from mechanize._urllib2_support import Request, AbstractHTTPHandler, \
build_opener, parse_head, urlopen
from mechanize._Util import startswith
@@ -21,8 +22,8 @@
HTTPErrorProcessor, HTTPHandler
from mechanize import OpenerDirector
-## from mechanize import getLogger, DEBUG
-## l = getLogger("ClientCookie")
+## from logging import getLogger, DEBUG
+## l = getLogger("mechanize")
## l.setLevel(DEBUG)
class MockOpener:
@@ -94,7 +95,7 @@
res = MockResponse(200, "OK", {}, "")
return self.parent.error("http", args[0], res, code, "", {})
elif action == "raise":
- raise urllib2.URLError("blah")
+ raise mechanize.URLError("blah")
assert False
def close(self): pass
def add_parent(self, parent):
@@ -115,13 +116,14 @@
for meths in meth_spec:
class MockHandlerSubclass(MockHandler): pass
h = MockHandlerSubclass(meths)
- h.handler_order = h.processor_order = count
+ h.handler_order = h.processor_order = 101+count
h.add_parent(opener)
count = count + 1
handlers.append(h)
opener.add_handler(h)
return handlers
+
class OpenerDirectorTests(unittest.TestCase):
def test_handled(self):
@@ -209,7 +211,7 @@
handlers = add_ordered_mock_handlers(o, meth_spec)
req = Request("http://example.com/")
- self.assertRaises(urllib2.URLError, o.open, req)
+ self.assertRaises(mechanize.URLError, o.open, req)
self.assert_(o.calls == [(handlers[0], "http_open", (req,), {})])
## def test_error(self):
@@ -361,7 +363,7 @@
self.filename, self.filetype = filename, filetype
return StringIO.StringIO(self.data), len(self.data)
-class NullFTPHandler(urllib2.FTPHandler):
+class NullFTPHandler(mechanize.FTPHandler):
def __init__(self, data): self.data = data
def connect_ftp(self, user, passwd, host, port, dirs):
self.user, self.passwd = user, passwd
@@ -395,6 +397,17 @@
self.calls.append(("can_fetch", ua, url))
return self._can_fetch
+class MockPasswordManager:
+ def add_password(self, realm, uri, user, password):
+ self.realm = realm
+ self.url = uri
+ self.user = user
+ self.password = password
+ def find_user_password(self, realm, authuri):
+ self.target_realm = realm
+ self.target_url = authuri
+ return self.user, self.password
+
class HandlerTests(unittest.TestCase):
if hasattr(sys, "version_info") and sys.version_info > (2, 1, 3, "final", 0):
@@ -434,7 +447,7 @@
def test_file(self):
import time, rfc822, socket
- h = urllib2.FileHandler()
+ h = mechanize.FileHandler()
o = h.parent = MockOpener()
#TESTFN = test_support.TESTFN
@@ -489,12 +502,12 @@
finally:
f.close()
- self.assertRaises(urllib2.URLError,
+ self.assertRaises(mechanize.URLError,
h.file_open, Request(url))
finally:
os.remove(TESTFN)
- h = urllib2.FileHandler()
+ h = mechanize.FileHandler()
o = h.parent = MockOpener()
# XXXX why does // mean ftp (and /// mean not ftp!), and where
# is file: scheme specified? I think this is really a bug, and
@@ -513,7 +526,7 @@
try:
h.file_open(req)
# XXXX remove OSError when bug fixed
- except (urllib2.URLError, OSError):
+ except (mechanize.URLError, OSError):
self.assert_(not ftp)
else:
self.assert_(o.req is req)
@@ -550,7 +563,7 @@
# check socket.error converted to URLError
http.raise_on_endheaders = True
- self.assertRaises(urllib2.URLError, h.do_open, http, req)
+ self.assertRaises(mechanize.URLError, h.do_open, http, req)
# check adding of standard headers
o.addheaders = [("Spam", "eggs")]
@@ -583,6 +596,7 @@
self.assert_(req.unredirected_hdrs["Spam"] == "foo")
def test_request_upgrade(self):
+ import urllib2
new_req_class = hasattr(urllib2.Request, "has_header")
h = HTTPRequestUpgradeProcessor()
@@ -688,7 +702,7 @@
req = Request(url)
try:
h.http_request(req)
- except urllib2.HTTPError, e:
+ except mechanize.HTTPError, e:
self.assert_(e.request == req)
self.assert_(e.code == 403)
# new host: reload robots.txt (even though the host and port are
@@ -737,7 +751,7 @@
h = SeekableProcessor()
o = h.parent = MockOpener()
- req = urllib2.Request("http://example.com/")
+ req = mechanize.Request("http://example.com/")
class MockUnseekableResponse:
code = 200
msg = "OK"
@@ -795,7 +809,7 @@
try:
method(req, MockFile(), code, "Blah",
MockHeaders({"location": to_url}))
- except urllib2.HTTPError:
+ except mechanize.HTTPError:
# 307 in response to POST requires user OK
self.assert_(code == 307 and data is not None)
self.assert_(o.req.get_full_url() == to_url)
@@ -822,7 +836,7 @@
while 1:
redirect(h, req, "http://example.com/")
count = count + 1
- except urllib2.HTTPError:
+ except mechanize.HTTPError:
# don't stop until max_repeats, because cookies may introduce state
self.assert_(count == HTTPRedirectHandler.max_repeats)
@@ -834,9 +848,133 @@
while 1:
redirect(h, req, "http://example.com/%d" % count)
count = count + 1
- except urllib2.HTTPError:
+ except mechanize.HTTPError:
self.assert_(count == HTTPRedirectHandler.max_redirections)
+ def test_cookie_redirect(self):
+ # cookies shouldn't leak into redirected requests
+ import mechanize
+ from mechanize import CookieJar, build_opener, HTTPHandler, \
+ HTTPCookieProcessor, HTTPError, HTTPDefaultErrorHandler, \
+ HTTPRedirectHandler
+
+ from test_cookies import interact_netscape
+
+ cj = CookieJar()
+ interact_netscape(cj, "http://www.example.com/", "spam=eggs")
+ hh = MockHTTPHandler(302, "Location: http://www.cracker.com/\r\n\r\n")
+ hdeh = HTTPDefaultErrorHandler()
+ hrh = HTTPRedirectHandler()
+ cp = HTTPCookieProcessor(cj)
+ o = build_test_opener(hh, hdeh, hrh, cp)
+ o.open("http://www.example.com/")
+ self.assert_(not hh.req.has_header("Cookie"))
+
+ def test_proxy(self):
+ o = OpenerDirector()
+ ph = mechanize.ProxyHandler(dict(http="proxy.example.com:3128"))
+ o.add_handler(ph)
+ meth_spec = [
+ [("http_open", "return response")]
+ ]
+ handlers = add_ordered_mock_handlers(o, meth_spec)
+
+ o._maybe_reindex_handlers()
+
+ req = Request("http://acme.example.com/")
+ self.assertEqual(req.get_host(), "acme.example.com")
+ r = o.open(req)
+ self.assertEqual(req.get_host(), "proxy.example.com:3128")
+
+ self.assertEqual([(handlers[0], "http_open")],
+ [tup[0:2] for tup in o.calls])
+
+ def test_basic_auth(self):
+ opener = OpenerDirector()
+ password_manager = MockPasswordManager()
+ auth_handler = mechanize.HTTPBasicAuthHandler(password_manager)
+ realm = "ACME Widget Store"
+ http_handler = MockHTTPHandler(
+ 401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
+ self._test_basic_auth(opener, auth_handler, "Authorization",
+ realm, http_handler, password_manager,
+ "http://acme.example.com/protected",
+ "http://acme.example.com/protected",
+ )
+
+ def test_proxy_basic_auth(self):
+ opener = OpenerDirector()
+ ph = mechanize.ProxyHandler(dict(http="proxy.example.com:3128"))
+ opener.add_handler(ph)
+ password_manager = MockPasswordManager()
+ auth_handler = mechanize.ProxyBasicAuthHandler(password_manager)
+ realm = "ACME Networks"
+ http_handler = MockHTTPHandler(
+ 407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
+ self._test_basic_auth(opener, auth_handler, "Proxy-authorization",
+ realm, http_handler, password_manager,
+ "http://acme.example.com:3128/protected",
+ "proxy.example.com:3128",
+ )
+
+ def test_basic_and_digest_auth_handlers(self):
+ # HTTPDigestAuthHandler threw an exception if it couldn't handle a 40*
+ # response (http://python.org/sf/1479302), where it should instead
+ # return None to allow another handler (especially
+ # HTTPBasicAuthHandler) to handle the response.
+ class TestDigestAuthHandler(mechanize.HTTPDigestAuthHandler):
+ handler_order = 400 # strictly before HTTPBasicAuthHandler
+ opener = OpenerDirector()
+ password_manager = MockPasswordManager()
+ digest_handler = TestDigestAuthHandler(password_manager)
+ basic_handler = mechanize.HTTPBasicAuthHandler(password_manager)
+ opener.add_handler(digest_handler)
+ realm = "ACME Networks"
+ http_handler = MockHTTPHandler(
+ 401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
+ self._test_basic_auth(opener, basic_handler, "Authorization",
+ realm, http_handler, password_manager,
+ "http://acme.example.com/protected",
+ "http://acme.example.com/protected",
+ )
+
+ def _test_basic_auth(self, opener, auth_handler, auth_header,
+ realm, http_handler, password_manager,
+ request_url, protected_url):
+ import base64, httplib
+ user, password = "wile", "coyote"
+ opener.add_handler(auth_handler)
+ opener.add_handler(http_handler)
+
+ # .add_password() fed through to password manager
+ auth_handler.add_password(realm, request_url, user, password)
+ self.assertEqual(realm, password_manager.realm)
+ self.assertEqual(request_url, password_manager.url)
+ self.assertEqual(user, password_manager.user)
+ self.assertEqual(password, password_manager.password)
+
+ r = opener.open(request_url)
+
+ # should have asked the password manager for the username/password
+ self.assertEqual(password_manager.target_realm, realm)
+ self.assertEqual(password_manager.target_url, protected_url)
+
+ # expect one request without authorization, then one with
+ self.assertEqual(len(http_handler.requests), 2)
+ self.failIf(http_handler.requests[0].has_header(auth_header))
+ userpass = '%s:%s' % (user, password)
+ auth_hdr_value = 'Basic '+base64.encodestring(userpass).strip()
+ self.assertEqual(http_handler.requests[1].get_header(auth_header),
+ auth_hdr_value)
+
+ # if the password manager can't find a password, the handler won't
+ # handle the HTTP auth error
+ password_manager.user = password_manager.password = None
+ http_handler.reset()
+ r = opener.open(request_url)
+ self.assertEqual(len(http_handler.requests), 1)
+ self.failIf(http_handler.requests[0].has_header(auth_header))
+
class HeadParserTests(unittest.TestCase):
@@ -863,45 +1001,42 @@
self.assertEqual(parse_head(StringIO.StringIO(html), HeadParser()), result)
-class MockHTTPHandler(HTTPHandler):
- def __init__(self): self._count = 0
+def build_test_opener(*handler_instances):
+ opener = OpenerDirector()
+ for h in handler_instances:
+ opener.add_handler(h)
+ return opener
+
+class MockHTTPHandler(mechanize.BaseHandler):
+ # useful for testing redirections and auth
+ # sends supplied headers and code as first response
+ # sends 200 OK as second response
+ def __init__(self, code, headers):
+ self.code = code
+ self.headers = headers
+ self.reset()
+ def reset(self):
+ self._count = 0
+ self.requests = []
def http_open(self, req):
- import mimetools
+ import mimetools, httplib, copy
from StringIO import StringIO
+ self.requests.append(copy.deepcopy(req))
if self._count == 0:
self._count = self._count + 1
- msg = mimetools.Message(
- StringIO("Location: http://www.cracker.com/\r\n\r\n"))
+ msg = mimetools.Message(StringIO(self.headers))
return self.parent.error(
- "http", req, MockFile(), 302, "Found", msg)
+ "http", req, MockFile(), self.code, "Blah", msg)
else:
self.req = req
msg = mimetools.Message(StringIO("\r\n\r\n"))
return MockResponse(200, "OK", msg, "", req.get_full_url())
-class MiscTests(unittest.TestCase):
-
- def test_cookie_redirect(self):
- # cookies shouldn't leak into redirected requests
- from mechanize import CookieJar, build_opener, HTTPHandler, \
- HTTPCookieProcessor
- from urllib2 import HTTPError
-
- from test_cookies import interact_netscape
-
- cj = CookieJar()
- interact_netscape(cj, "http://www.example.com/", "spam=eggs")
- hh = MockHTTPHandler()
- cp = HTTPCookieProcessor(cj)
- o = build_opener(hh, cp)
- o.open("http://www.example.com/")
- self.assert_(not hh.req.has_header("Cookie"))
-
class MyHTTPHandler(HTTPHandler): pass
-class FooHandler(urllib2.BaseHandler):
+class FooHandler(mechanize.BaseHandler):
def foo_open(self): pass
-class BarHandler(urllib2.BaseHandler):
+class BarHandler(mechanize.BaseHandler):
def bar_open(self): pass
class A:
More information about the wwwsearch-commits
mailing list