#!/usr/bin/env python # These tests access the network. # thanks Moof (aka Giles Antonio Radford) for some of these import errno import os import socket import sys import tempfile import urllib import mechanize from mechanize import build_opener, install_opener, urlopen, urlretrieve from mechanize import CookieJar, HTTPCookieProcessor, \ HTTPHandler, HTTPRefreshProcessor, \ HTTPEquivProcessor, HTTPRedirectHandler, \ HTTPRedirectDebugProcessor, HTTPResponseDebugProcessor from mechanize._rfc3986 import urljoin from mechanize._util import hide_experimental_warnings, \ reset_experimental_warnings import mechanize._sockettimeout from mechanize._testcase import TestCase #from cookielib import CookieJar #from urllib2 import build_opener, install_opener, urlopen #from urllib2 import HTTPCookieProcessor, HTTPHandler #from mechanize import CreateBSDDBCookieJar ## import logging ## logger = logging.getLogger("mechanize") ## logger.addHandler(logging.StreamHandler(sys.stdout)) ## #logger.setLevel(logging.DEBUG) ## logger.setLevel(logging.INFO) def sanepathname2url(path): import urllib urlpath = urllib.pathname2url(path) if os.name == "nt" and urlpath.startswith("///"): urlpath = urlpath[2:] # XXX don't ask me about the mac... return urlpath def read_file(filename): fh = open(filename) try: return fh.read() finally: fh.close() class SocketTimeoutTest(TestCase): # the timeout tests in this module aren't full functional tests: in order # to speed things up, don't actually call .settimeout on the socket. XXX # allow running the tests against a slow server with a real timeout def _monkey_patch_socket(self): class Delegator(object): def __init__(self, delegate): self._delegate = delegate def __getattr__(self, name): return getattr(self._delegate, name) assertEquals = self.assertEquals class TimeoutLog(object): AnyValue = object() def __init__(self): self._nr_sockets = 0 self._timeouts = [] self.start() def start(self): self._monitoring = True def stop(self): self._monitoring = False def socket_created(self): if self._monitoring: self._nr_sockets += 1 def settimeout_called(self, timeout): if self._monitoring: self._timeouts.append(timeout) def verify(self, value=AnyValue): if sys.version_info[:2] < (2, 6): # per-connection timeout not supported in Python 2.5 self.verify_default() else: assertEquals(len(self._timeouts), self._nr_sockets) if value is not self.AnyValue: for timeout in self._timeouts: assertEquals(timeout, value) def verify_default(self): assertEquals(len(self._timeouts), 0) log = TimeoutLog() def settimeout(timeout): log.settimeout_called(timeout) orig_socket = socket.socket def make_socket(*args, **kwds): sock = Delegator(orig_socket(*args, **kwds)) log.socket_created() sock.settimeout = settimeout return sock self.monkey_patch(socket, "socket", make_socket) return log class SimpleTests(SocketTimeoutTest): # thanks Moof (aka Giles Antonio Radford) def setUp(self): super(SimpleTests, self).setUp() self.browser = mechanize.Browser() def test_simple(self): self.browser.open(self.uri) self.assertEqual(self.browser.title(), 'Python bits') # relative URL self.browser.open('/mechanize/') self.assertEqual(self.browser.title(), 'mechanize') def test_basic_auth(self): uri = urljoin(self.uri, "basic_auth") self.assertRaises(mechanize.URLError, self.browser.open, uri) self.browser.add_password(uri, "john", "john") self.browser.open(uri) self.assertEqual(self.browser.title(), 'Basic Auth Protected Area') def test_digest_auth(self): uri = urljoin(self.uri, "digest_auth") self.assertRaises(mechanize.URLError, self.browser.open, uri) self.browser.add_password(uri, "digestuser", "digestuser") self.browser.open(uri) self.assertEqual(self.browser.title(), 'Digest Auth Protected Area') def test_open_with_default_timeout(self): timeout_log = self._monkey_patch_socket() self.browser.open(self.uri) self.assertEqual(self.browser.title(), 'Python bits') timeout_log.verify_default() def test_open_with_timeout(self): timeout_log = self._monkey_patch_socket() timeout = 10. self.browser.open(self.uri, timeout=timeout) self.assertEqual(self.browser.title(), 'Python bits') timeout_log.verify(timeout) def test_urlopen_with_default_timeout(self): timeout_log = self._monkey_patch_socket() response = mechanize.urlopen(self.uri) self.assert_contains(response.read(), "Python bits") timeout_log.verify_default() def test_urlopen_with_timeout(self): timeout_log = self._monkey_patch_socket() timeout = 10. response = mechanize.urlopen(self.uri, timeout=timeout) self.assert_contains(response.read(), "Python bits") timeout_log.verify(timeout) def test_302_and_404(self): # the combination of 302 and 404 (/redirected is configured to redirect # to a non-existent URL /nonexistent) has caused problems in the past # due to accidental double-wrapping of the error response import urllib2 self.assertRaises( urllib2.HTTPError, self.browser.open, urljoin(self.uri, "/redirected"), ) def test_reread(self): # closing response shouldn't stop methods working (this happens also to # be true for e.g. mechanize.OpenerDirector when mechanize's own # handlers are in use, but is guaranteed to be true for # mechanize.Browser) r = self.browser.open(self.uri) data = r.read() r.close() r.seek(0) self.assertEqual(r.read(), data) self.assertEqual(self.browser.response().read(), data) def test_error_recovery(self): self.assertRaises(mechanize.URLError, self.browser.open, 'file:///c|thisnoexistyiufheiurgbueirgbue') self.browser.open(self.uri) self.assertEqual(self.browser.title(), 'Python bits') def test_redirect(self): # 301 redirect due to missing final '/' r = self.browser.open(urljoin(self.uri, "bits")) self.assertEqual(r.code, 200) self.assert_("GeneralFAQ.html" in r.read(2048)) def test_refresh(self): def refresh_request(seconds): uri = urljoin(self.uri, "/cgi-bin/cookietest.cgi") val = urllib.quote_plus('%d; url="%s"' % (seconds, self.uri)) return uri + ("?refresh=%s" % val) self.browser.set_handle_refresh(True, honor_time=False) r = self.browser.open(refresh_request(5)) self.assertEqual(r.geturl(), self.uri) # Set a maximum refresh time of 30 seconds (these long refreshes tend # to be there only because the website owner wants you to see the # latest news, or whatever -- they're not essential to the operation of # the site, and not really useful or appropriate when scraping). refresh_uri = refresh_request(60) self.browser.set_handle_refresh(True, max_time=30., honor_time=True) r = self.browser.open(refresh_uri) self.assertEqual(r.geturl(), refresh_uri) # allow long refreshes (but don't actually wait 60 seconds) self.browser.set_handle_refresh(True, max_time=None, honor_time=False) r = self.browser.open(refresh_request(60)) self.assertEqual(r.geturl(), self.uri) def test_file_url(self): url = "file://%s" % sanepathname2url( os.path.abspath('functional_tests.py')) r = self.browser.open(url) self.assert_("this string appears in this file ;-)" in r.read()) def test_open_local_file(self): # Since the file: URL scheme is not well standardised, Browser has a # special method to open files by name, for convenience: br = mechanize.Browser() response = br.open_local_file("mechanize/_mechanize.py") self.assert_("def open_local_file(self, filename):" in response.get_data()) def test_open_novisit(self): def test_state(br): self.assert_(br.request is None) self.assert_(br.response() is None) self.assertRaises(mechanize.BrowserStateError, br.back) test_state(self.browser) uri = urljoin(self.uri, "bits") # note this involves a redirect, which should itself be non-visiting r = self.browser.open_novisit(uri) test_state(self.browser) self.assert_("GeneralFAQ.html" in r.read(2048)) # Request argument instead of URL r = self.browser.open_novisit(mechanize.Request(uri)) test_state(self.browser) self.assert_("GeneralFAQ.html" in r.read(2048)) def test_non_seekable(self): # check everything still works without response_seek_wrapper and # the .seek() method on response objects ua = mechanize.UserAgent() ua.set_seekable_responses(False) ua.set_handle_equiv(False) response = ua.open(self.uri) self.failIf(hasattr(response, "seek")) data = response.read() self.assert_("Python bits" in data) class ResponseTests(TestCase): def test_seek(self): br = mechanize.Browser() r = br.open(self.uri) html = r.read() r.seek(0) self.assertEqual(r.read(), html) def test_seekable_response_opener(self): opener = mechanize.OpenerFactory( mechanize.SeekableResponseOpener).build_opener() r = opener.open(urljoin(self.uri, "bits/cctest2.txt")) r.read() r.seek(0) self.assertEqual(r.read(), r.get_data(), "Hello ClientCookie functional test suite.\n") def test_seek_wrapper_class_name(self): opener = mechanize.UserAgent() opener.set_seekable_responses(True) try: opener.open(urljoin(self.uri, "nonexistent")) except mechanize.HTTPError, exc: self.assert_("HTTPError instance" in repr(exc)) def test_no_seek(self): # should be possible to turn off UserAgent's .seek() functionality def check_no_seek(opener): r = opener.open(urljoin(self.uri, "bits/cctest2.txt")) self.assert_(not hasattr(r, "seek")) try: opener.open(urljoin(self.uri, "nonexistent")) except mechanize.HTTPError, exc: self.assert_(not hasattr(exc, "seek")) # mechanize.UserAgent opener = mechanize.UserAgent() opener.set_handle_equiv(False) opener.set_seekable_responses(False) opener.set_debug_http(False) check_no_seek(opener) # mechanize.OpenerDirector opener = mechanize.build_opener() check_no_seek(opener) def test_consistent_seek(self): # if we explicitly request that returned response objects have the # .seek() method, then raised HTTPError exceptions should also have the # .seek() method def check(opener, excs_also): r = opener.open(urljoin(self.uri, "bits/cctest2.txt")) data = r.read() r.seek(0) self.assertEqual(data, r.read(), r.get_data()) try: opener.open(urljoin(self.uri, "nonexistent")) except mechanize.HTTPError, exc: data = exc.read() if excs_also: exc.seek(0) self.assertEqual(data, exc.read(), exc.get_data()) else: self.assert_(False) opener = mechanize.UserAgent() opener.set_debug_http(False) # Here, only the .set_handle_equiv() causes .seek() to be present, so # exceptions don't necessarily support the .seek() method (and do not, # at present). opener.set_handle_equiv(True) opener.set_seekable_responses(False) check(opener, excs_also=False) # Here, (only) the explicit .set_seekable_responses() causes .seek() to # be present (different mechanism from .set_handle_equiv()). Since # there's an explicit request, ALL responses are seekable, even # exception responses (HTTPError instances). opener.set_handle_equiv(False) opener.set_seekable_responses(True) check(opener, excs_also=True) def test_set_response(self): br = mechanize.Browser() r = br.open(self.uri) html = r.read() self.assertEqual(br.title(), "Python bits") newhtml = """click me""" r.set_data(newhtml) self.assertEqual(r.read(), newhtml) self.assertEqual(br.response().read(), html) br.response().set_data(newhtml) self.assertEqual(br.response().read(), html) self.assertEqual(list(br.links())[0].url, 'http://sourceforge.net') br.set_response(r) self.assertEqual(br.response().read(), newhtml) self.assertEqual(list(br.links())[0].url, "spam") def test_new_response(self): br = mechanize.Browser() data = "Test

Hello.

" response = mechanize.make_response( data, [("Content-type", "text/html")], "http://example.com/", 200, "OK" ) br.set_response(response) self.assertEqual(br.response().get_data(), data) def hidden_test_close_pickle_load(self): print ("Test test_close_pickle_load is expected to fail unless Python " "standard library patch http://python.org/sf/1144636 has been " "applied") import pickle b = mechanize.Browser() r = b.open(urljoin(self.uri, "bits/cctest2.txt")) r.read() r.close() r.seek(0) self.assertEqual(r.read(), "Hello ClientCookie functional test suite.\n") HIGHEST_PROTOCOL = -1 p = pickle.dumps(b, HIGHEST_PROTOCOL) b = pickle.loads(p) r = b.response() r.seek(0) self.assertEqual(r.read(), "Hello ClientCookie functional test suite.\n") class FunctionalTests(SocketTimeoutTest): def test_referer(self): br = mechanize.Browser() br.set_handle_refresh(True, honor_time=False) referer = urljoin(self.uri, "bits/referertest.html") info = urljoin(self.uri, "/cgi-bin/cookietest.cgi") r = br.open(info) self.assert_(referer not in r.get_data()) br.open(referer) r = br.follow_link(text="Here") self.assert_(referer in r.get_data()) def test_cookies(self): import urllib2 # this test page depends on cookies, and an http-equiv refresh #cj = CreateBSDDBCookieJar("/home/john/db.db") cj = CookieJar() handlers = [ HTTPCookieProcessor(cj), HTTPRefreshProcessor(max_time=None, honor_time=False), HTTPEquivProcessor(), HTTPRedirectHandler(), # needed for Refresh handling in 2.4.0 # HTTPHandler(True), # HTTPRedirectDebugProcessor(), # HTTPResponseDebugProcessor(), ] o = apply(build_opener, handlers) try: install_opener(o) try: r = urlopen(urljoin(self.uri, "/cgi-bin/cookietest.cgi")) except urllib2.URLError, e: #print e.read() raise data = r.read() #print data self.assert_( data.find("Your browser supports cookies!") >= 0) self.assert_(len(cj) == 1) # test response.seek() (added by HTTPEquivProcessor) r.seek(0) samedata = r.read() r.close() self.assert_(samedata == data) finally: o.close() install_opener(None) def test_robots(self): plain_opener = mechanize.build_opener(mechanize.HTTPRobotRulesProcessor) browser = mechanize.Browser() for opener in plain_opener, browser: r = opener.open(urljoin(self.uri, "robots")) self.assertEqual(r.code, 200) self.assertRaises( mechanize.RobotExclusionError, opener.open, urljoin(self.uri, "norobots")) def _check_retrieve(self, url, filename, headers): from urllib import urlopen self.assertEqual(headers.get('Content-Type'), 'text/html') self.assertEqual(read_file(filename), urlopen(url).read()) def test_retrieve_to_named_file(self): url = urljoin(self.uri, "/mechanize/") test_filename = os.path.join(self.make_temp_dir(), "python.html") opener = mechanize.build_opener() verif = CallbackVerifier(self) filename, headers = opener.retrieve(url, test_filename, verif.callback) self.assertEqual(filename, test_filename) self._check_retrieve(url, filename, headers) self.assert_(os.path.isfile(filename)) def test_retrieve(self): # not passing an explicit filename downloads to a temporary file # using a Request object instead of a URL works url = urljoin(self.uri, "/mechanize/") opener = mechanize.build_opener() verif = CallbackVerifier(self) request = mechanize.Request(url) filename, headers = opener.retrieve(request, reporthook=verif.callback) self.assertEquals(request.visit, False) self._check_retrieve(url, filename, headers) opener.close() # closing the opener removed the temporary file self.failIf(os.path.isfile(filename)) def test_urlretrieve(self): timeout_log = self._monkey_patch_socket() timeout = 10. url = urljoin(self.uri, "/mechanize/") verif = CallbackVerifier(self) filename, headers = mechanize.urlretrieve(url, reporthook=verif.callback, timeout=timeout) timeout_log.stop() self._check_retrieve(url, filename, headers) timeout_log.verify(timeout) def test_reload_read_incomplete(self): from mechanize import Browser browser = Browser() r1 = browser.open(urljoin(self.uri, "bits/mechanize_reload_test.html")) # if we don't do anything and go straight to another page, most of the # last page's response won't be .read()... r2 = browser.open(urljoin(self.uri, "mechanize")) self.assert_(len(r1.get_data()) < 4097) # we only .read() a little bit # ...so if we then go back, .follow_link() for a link near the end (a # few kb in, past the point that always gets read in HTML files because # of HEAD parsing) will only work if it causes a .reload()... r3 = browser.back() browser.follow_link(text="near the end") # ... good, no LinkNotFoundError, so we did reload. # we have .read() the whole file self.assertEqual(len(r3._seek_wrapper__cache.getvalue()), 4202) ## def test_cacheftp(self): ## from urllib2 import CacheFTPHandler, build_opener ## o = build_opener(CacheFTPHandler()) ## r = o.open("ftp://ftp.python.org/pub/www.python.org/robots.txt") ## data1 = r.read() ## r.close() ## r = o.open("ftp://ftp.python.org/pub/www.python.org/2.3.2/announce.txt") ## data2 = r.read() ## r.close() ## self.assert_(data1 != data2) class CookieJarTests(TestCase): def test_mozilla_cookiejar(self): filename = tempfile.mktemp() try: def get_cookiejar(): cj = mechanize.MozillaCookieJar(filename=filename) try: cj.revert() except IOError, exc: if exc.errno != errno.ENOENT: raise return cj def commit(cj): cj.save() self._test_cookiejar(get_cookiejar, commit) finally: try: os.remove(filename) except OSError, exc: if exc.errno != errno.ENOENT: raise def test_firefox3_cookiejar(self): try: mechanize.Firefox3CookieJar except AttributeError: # firefox 3 cookiejar is only supported in Python 2.5 and later; # also, sqlite3 must be available return filename = tempfile.mktemp() try: def get_cookiejar(): hide_experimental_warnings() try: cj = mechanize.Firefox3CookieJar(filename=filename) finally: reset_experimental_warnings() cj.connect() return cj def commit(cj): pass self._test_cookiejar(get_cookiejar, commit) finally: os.remove(filename) def _test_cookiejar(self, get_cookiejar, commit): cookiejar = get_cookiejar() br = mechanize.Browser() br.set_cookiejar(cookiejar) br.set_handle_refresh(False) url = urljoin(self.uri, "/cgi-bin/cookietest.cgi") # no cookie was set on the first request html = br.open(url).read() self.assertEquals(html.find("Your browser supports cookies!"), -1) self.assertEquals(len(cookiejar), 1) # ... but now we have the cookie html = br.open(url).read() self.assert_("Your browser supports cookies!" in html) commit(cookiejar) # should still have the cookie when we load afresh cookiejar = get_cookiejar() br.set_cookiejar(cookiejar) html = br.open(url).read() self.assert_("Your browser supports cookies!" in html) class CallbackVerifier: # for .test_urlretrieve() def __init__(self, testcase): self._count = 0 self._testcase = testcase def callback(self, block_nr, block_size, total_size): self._testcase.assertEqual(block_nr, self._count) self._count = self._count + 1 if __name__ == "__main__": import sys sys.path.insert(0, "test-tools") test_path = os.path.join(os.path.dirname(sys.argv[0]), "test") sys.path.insert(0, test_path) import testprogram USAGE_EXAMPLES = """ Examples: %(progName)s - run all tests %(progName)s functional_tests.SimpleTests - run all 'test*' test methods in class SimpleTests %(progName)s functional_tests.SimpleTests.test_redirect - run SimpleTests.test_redirect %(progName)s -l - start a local Twisted HTTP server and run the functional tests against that, rather than against SourceForge (quicker!) If this option doesn't work on Windows/Mac, somebody please tell me about it, or I'll never find out... """ prog = testprogram.TestProgram( ["functional_tests"], localServerProcess=testprogram.TwistedServerProcess(), usageExamples=USAGE_EXAMPLES, ) result = prog.runTests()