[wwwsearch-commits] r32687 - in wwwsearch/mechanize/trunk: . mechanize test
jjlee at codespeak.net
jjlee at codespeak.net
Thu Sep 28 03:25:25 CEST 2006
Author: jjlee
Date: Thu Sep 28 03:25:08 2006
New Revision: 32687
Modified:
wwwsearch/mechanize/trunk/functional_tests.py
wwwsearch/mechanize/trunk/mechanize/_mechanize.py
wwwsearch/mechanize/trunk/mechanize/_response.py
wwwsearch/mechanize/trunk/test/test_mechanize.py
Log:
.reload() on .back() if necessary (necessary iff response was not fully .read() on first .open()ing)
Modified: wwwsearch/mechanize/trunk/functional_tests.py
==============================================================================
--- wwwsearch/mechanize/trunk/functional_tests.py (original)
+++ wwwsearch/mechanize/trunk/functional_tests.py Thu Sep 28 03:25:08 2006
@@ -226,6 +226,14 @@
# closing the opener removed the temporary file
self.failIf(os.path.isfile(filename))
+ def test_reload_read_incomplete(self):
+ from mechanize import Browser
+ browser = Browser()
+ browser.open("http://plone.org")
+ browser.open("http://plone.org/products")
+ browser.back()
+ browser.follow_link(text="About")
+
## def test_cacheftp(self):
## from urllib2 import CacheFTPHandler, build_opener
## o = build_opener(CacheFTPHandler())
Modified: wwwsearch/mechanize/trunk/mechanize/_mechanize.py
==============================================================================
--- wwwsearch/mechanize/trunk/mechanize/_mechanize.py (original)
+++ wwwsearch/mechanize/trunk/mechanize/_mechanize.py Thu Sep 28 03:25:08 2006
@@ -207,14 +207,6 @@
self.form = None
self._response = _upgrade.upgrade_response(response)
-
- # XXX
- # Temporary hack to eagerly read data (otherwise, History can contain
- # closed and partially-read responses). Proper fix is for responses to
- # know if they're partially read or not; .back() should then .reload()
- # if required.
- self._response.get_data()
-
self._factory.set_response(self._response)
def geturl(self):
@@ -241,6 +233,8 @@
self._response.close()
self.request, response = self._history.back(n, self._response)
self.set_response(response)
+ if not response.read_complete:
+ self.reload()
return response
def clear_history(self):
Modified: wwwsearch/mechanize/trunk/mechanize/_response.py
==============================================================================
--- wwwsearch/mechanize/trunk/mechanize/_response.py (original)
+++ wwwsearch/mechanize/trunk/mechanize/_response.py Thu Sep 28 03:25:08 2006
@@ -50,6 +50,7 @@
def __init__(self, wrapped):
self.wrapped = wrapped
+ self.__read_complete_state = [False]
self.__have_readline = hasattr(self.wrapped, "readline")
self.__cache = StringIO()
self.__pos = 0 # seek position
@@ -60,11 +61,21 @@
return self.wrapped.tell() == len(self.__cache.getvalue())
def __getattr__(self, name):
+ if name == "read_complete":
+ return self.__read_complete_state[0]
+
wrapped = self.__dict__.get("wrapped")
if wrapped:
return getattr(wrapped, name)
+
return getattr(self.__class__, name)
+ def __setattr__(self, name, value):
+ if name == "read_complete":
+ self.__read_complete_state[0] = bool(value)
+ else:
+ self.__dict__[name] = value
+
def seek(self, offset, whence=0):
assert whence in [0,1,2]
@@ -93,9 +104,14 @@
if to_read is None:
assert whence == 2
self.__cache.write(self.wrapped.read())
+ self.read_complete = True
self.__pos = self.__cache.tell() - offset
else:
- self.__cache.write(self.wrapped.read(to_read))
+ data = self.wrapped.read(to_read)
+ if not data:
+ self.read_complete = True
+ else:
+ self.__cache.write(data)
# Don't raise an exception even if we've seek()ed past the end
# of .wrapped, since fseek() doesn't complain in that case.
# Also like fseek(), pretend we have seek()ed past the end,
@@ -112,6 +128,7 @@
def __copy__(self):
cpy = self.__class__(self.wrapped)
cpy.__cache = self.__cache
+ cpy.__read_complete_state = self.__read_complete_state
return cpy
def get_data(self):
@@ -137,10 +154,15 @@
self.__cache.seek(0, 2)
if size == -1:
self.__cache.write(self.wrapped.read())
+ self.read_complete = True
else:
to_read = size - available
assert to_read > 0
- self.__cache.write(self.wrapped.read(to_read))
+ data = self.wrapped.read(to_read)
+ if not data:
+ self.read_complete = True
+ else:
+ self.__cache.write(data)
self.__cache.seek(pos)
data = self.__cache.read(size)
@@ -156,7 +178,11 @@
# read another line first
pos = self.__pos
self.__cache.seek(0, 2)
- self.__cache.write(self.wrapped.readline())
+ data = self.wrapped.readline()
+ if not data:
+ self.read_complete = True
+ else:
+ self.__cache.write(data)
self.__cache.seek(pos)
data = self.__cache.readline()
@@ -172,6 +198,7 @@
pos = self.__pos
self.__cache.seek(0, 2)
self.__cache.write(self.wrapped.read())
+ self.read_complete = True
self.__cache.seek(pos)
data = self.__cache.readlines(sizehint)
self.__pos = self.__cache.tell()
Modified: wwwsearch/mechanize/trunk/test/test_mechanize.py
==============================================================================
--- wwwsearch/mechanize/trunk/test/test_mechanize.py (original)
+++ wwwsearch/mechanize/trunk/test/test_mechanize.py Thu Sep 28 03:25:08 2006
@@ -130,6 +130,8 @@
if info is None: info = {}
self._info = MockHeaders(info)
self.source = "%d%d" % (id(self), random.randint(0, sys.maxint-1))
+ # otherwise we can't test for "same_response" in test_history
+ self.read_complete = True
def info(self): return self._info
def geturl(self): return self.url
def read(self, size=-1): return self.fp.read(size)
@@ -465,7 +467,8 @@
self.assertRaises(mechanize.BrowserStateError, b.back, 2)
r8 = b.open("/spam")
- # even if we get a HTTPError, history and .response() should still get updated
+ # even if we get a HTTPError, history and .response() should still get
+ # updated
error = urllib2.HTTPError("http://example.com/bad", 503, "Oops",
MockHeaders(), StringIO.StringIO())
b.add_handler(make_mock_handler()([("https_open", error)]))
@@ -476,6 +479,38 @@
b.close()
# XXX assert BrowserStateError
+ def test_reload_read_incomplete(self):
+ import mechanize
+ from mechanize._response import test_response
+ class Browser(TestBrowser):
+ def __init__(self):
+ TestBrowser.__init__(self)
+ self.reloaded = False
+ def reload(self):
+ self.reloaded = True
+ TestBrowser.reload(self)
+ br = Browser()
+ data = "<html><head><title></title></head><body>%s</body></html>"
+ data = data % ("The quick brown fox jumps over the lazy dog."*100)
+ r = test_response(data, [("content-type", "text/html")])
+ br.add_handler(make_mock_handler()([("http_open", r)]))
+
+ # .reload() on .back() if the whole response hasn't already been read
+ # (.read_incomplete is True)
+ r = br.open(r.geturl())
+ r.read(10)
+ br.open('http://www.example.com/blah')
+ self.failIf(br.reloaded)
+ br.back()
+ self.assert_(br.reloaded)
+
+ # don't reload if already read
+ br.reloaded = False
+ br.response().read()
+ br.open('http://www.example.com/blah')
+ br.back()
+ self.failIf(br.reloaded)
+
def test_viewing_html(self):
# XXX not testing multiple Content-Type headers
import mechanize
More information about the wwwsearch-commits
mailing list