"""Tests for mechanize._response.seek_wrapper and friends.""" import copy import cStringIO from unittest import TestCase class TestUnSeekable: def __init__(self, text): self._file = cStringIO.StringIO(text) self.log = [] def tell(self): return self._file.tell() def seek(self, offset, whence=0): assert False def read(self, size=-1): self.log.append(("read", size)) return self._file.read(size) def readline(self, size=-1): self.log.append(("readline", size)) return self._file.readline(size) def readlines(self, sizehint=-1): self.log.append(("readlines", sizehint)) return self._file.readlines(sizehint) class TestUnSeekableResponse(TestUnSeekable): def __init__(self, text, headers): TestUnSeekable.__init__(self, text) self.code = 200 self.msg = "OK" self.headers = headers self.url = "http://example.com/" def geturl(self): return self.url def info(self): return self.headers def close(self): pass class SeekableTests(TestCase): text = """\ The quick brown fox jumps over the lazy dog. """ text_lines = map(lambda l: l+"\n", text.split("\n")[:-1]) def testSeekable(self): from mechanize._response import seek_wrapper text = self.text text_lines = self.text_lines for ii in range(1, 6): fh = TestUnSeekable(text) sfh = seek_wrapper(fh) test = getattr(self, "_test%d" % ii) test(sfh) # copies have independent seek positions fh = TestUnSeekable(text) sfh = seek_wrapper(fh) self._testCopy(sfh) def _testCopy(self, sfh): sfh2 = copy.copy(sfh) sfh.read(10) text = self.text self.assertEqual(sfh2.read(10), text[:10]) sfh2.seek(5) self.assertEqual(sfh.read(10), text[10:20]) self.assertEqual(sfh2.read(10), text[5:15]) sfh.seek(0) sfh2.seek(0) return sfh2 def _test1(self, sfh): text = self.text text_lines = self.text_lines assert sfh.read(10) == text[:10] # calls fh.read assert sfh.log[-1] == ("read", 10) # .log delegated to fh sfh.seek(0) # doesn't call fh.seek assert sfh.read(10) == text[:10] # doesn't call fh.read assert len(sfh.log) == 1 sfh.seek(0) assert sfh.read(5) == text[:5] # read only part of cached data assert len(sfh.log) == 1 sfh.seek(0) assert sfh.read(25) == text[:25] # calls fh.read assert sfh.log[1] == ("read", 15) lines = [] sfh.seek(-1, 1) while 1: l = sfh.readline() if l == "": break lines.append(l) assert lines == ["s over the lazy\n"]+text_lines[2:] assert sfh.log[2:] == [("readline", -1)]*5 sfh.seek(0) lines = [] while 1: l = sfh.readline() if l == "": break lines.append(l) assert lines == text_lines def _test2(self, sfh): text = self.text sfh.read(5) sfh.seek(0) assert sfh.read() == text assert sfh.read() == "" sfh.seek(0) assert sfh.read() == text sfh.seek(0) assert sfh.readline(5) == "The q" assert sfh.read() == text[5:] sfh.seek(0) assert sfh.readline(5) == "The q" assert sfh.readline() == "uick brown fox\n" def _test3(self, sfh): text = self.text text_lines = self.text_lines sfh.read(25) sfh.seek(-1, 1) self.assertEqual(sfh.readlines(), ["s over the lazy\n"]+text_lines[2:]) nr_logs = len(sfh.log) sfh.seek(0) assert sfh.readlines() == text_lines def _test4(self, sfh): text = self.text text_lines = self.text_lines count = 0 limit = 10 while count < limit: if count == 5: self.assertRaises(StopIteration, sfh.next) break else: sfh.next() == text_lines[count] count = count + 1 else: assert False, "StopIteration not raised" def _test5(self, sfh): text = self.text sfh.read(10) sfh.seek(5) self.assert_(sfh.invariant()) sfh.seek(0, 2) self.assert_(sfh.invariant()) sfh.seek(0) self.assertEqual(sfh.read(), text) def testResponseSeekWrapper(self): from mechanize import response_seek_wrapper hdrs = {"Content-type": "text/html"} r = TestUnSeekableResponse(self.text, hdrs) rsw = response_seek_wrapper(r) rsw2 = self._testCopy(rsw) self.assert_(rsw is not rsw2) self.assertEqual(rsw.info(), rsw2.info()) self.assert_(rsw.info() is not rsw2.info()) # should be able to close already-closed object rsw2.close() rsw2.close() def testSetResponseData(self): from mechanize import response_seek_wrapper r = TestUnSeekableResponse(self.text, {'blah': 'yawn'}) rsw = response_seek_wrapper(r) rsw.set_data("""\ A Seeming somwhat more than View; That doth instruct the Mind In Things that ly behind, """) self.assertEqual(rsw.read(9), "A Seeming") self.assertEqual(rsw.read(13), " somwhat more") rsw.seek(0) self.assertEqual(rsw.read(9), "A Seeming") self.assertEqual(rsw.readline(), " somwhat more than View;\n") rsw.seek(0) self.assertEqual(rsw.readline(), "A Seeming somwhat more than View;\n") rsw.seek(-1, 1) self.assertEqual(rsw.read(7), "\n That") r = TestUnSeekableResponse(self.text, {'blah': 'yawn'}) rsw = response_seek_wrapper(r) rsw.set_data(self.text) self._test2(rsw) rsw.seek(0) self._test4(rsw) def testGetResponseData(self): from mechanize import response_seek_wrapper r = TestUnSeekableResponse(self.text, {'blah': 'yawn'}) rsw = response_seek_wrapper(r) self.assertEqual(rsw.get_data(), self.text) self._test2(rsw) rsw.seek(0) self._test4(rsw) if __name__ == "__main__": import unittest unittest.main()