from __future__ import generators from collect import collect, concat import py, random class Gen: def __init__(self, sequence, seen=None): self.it = iter(sequence) if seen is None: seen = [] self.seen = seen def __iter__(self): return self def next(self): value = self.it.next() self.seen.append(value) return value class Deletable: def __init__(self, chk): self.chk = chk def __del__(self): self.chk.append(True) def kill(chk): for i in range(10): yield (i, Deletable(chk)) def test_simple(): g = Gen('hello world') c = collect(g) assert g.seen == [] repr(c) assert g.seen == ['h', 'e', 'l'] repr(c) assert g.seen == ['h', 'e', 'l'] str(c) assert g.seen == list('hello world') def test_len_and_nonzero(): assert collect('abc') assert not collect('') assert len(collect('abc')) == 3 assert len(collect('')) == 0 def test_laziness(): g = Gen('hello world') i = 0 for x in collect(g): assert x == 'hello world'[i] i += 1 assert g.seen == list('hello world'[:i]) assert i == len('hello world') def test_laziness_2(): # the difference with test_laziness() is the extra ref to 'c' that # keeps all the items alive g = Gen('hello world') c = collect(g) i = 0 for x in c: assert x == 'hello world'[i] i += 1 assert g.seen == list('hello world'[:i]) assert i == len('hello world') def test_init(): g = Gen('hello world') c1 = collect(g) c2 = collect(c1) repr(c2) assert g.seen == ['h', 'e', 'l'] assert list(c1) == list(c2) == list('hello world') c3 = collect([5,4,3,2,1]) assert list(c3) == [5,4,3,2,1] c4 = collect(xrange(5, 10)) assert list(c4) == [5,6,7,8,9] def test_early_deletion(): n = 0 chk = [] for i, x in collect(kill(chk)): assert i == n assert chk == n*[True] n += 1 del x assert chk == n*[True] def test_lazy_getitem(): g = Gen('hello world') c = collect(g) assert g.seen == [] c[3] assert g.seen == ['h', 'e', 'l', 'l'] c[2] assert g.seen == ['h', 'e', 'l', 'l'] c[5] assert g.seen == ['h', 'e', 'l', 'l', 'o', ' '] c[0] assert g.seen == ['h', 'e', 'l', 'l', 'o', ' '] c[3] assert g.seen == ['h', 'e', 'l', 'l', 'o', ' '] c[5] assert g.seen == ['h', 'e', 'l', 'l', 'o', ' '] py.test.raises(IndexError, "c[21]") assert g.seen == ['h', 'e', 'l', 'l', 'o', ' ', 'w', 'o', 'r', 'l', 'd'] def test_lazy_nonzero(): g = Gen('hello world') c = collect(g) assert c assert g.seen == ['h'] def test_multiple_iterators(): chk = [] c = collect(kill(chk)) i1 = iter(c) i2 = iter(c) i3 = iter(c) del c order = [i1, i2, i3] * 15 random.shuffle(order) current = {i1: 0, i2: 0, i3: 0} for it in order: if current[it] < 10: assert it.next()[0] == current[it] current[it] += 1 else: py.test.raises(StopIteration, it.next) released = min(current.values()) assert chk == released * [True] def test_random_getitem(): s = 'qedx387tn3uixhvt 7fh387fymh3dh238 dwd-wq.dwq9' c = collect(s) keys = range(-len(s)-5, len(s)+5) choices = keys + [None]*12 stepchoices = [None, None, None, 1, 1, -1, -1, 2, -2, len(s)-1, len(s), len(s)+1, -len(s)-1, -len(s), -len(s)+1] for i in range(200): keys.append(slice(random.choice(choices), random.choice(choices), random.choice(stepchoices))) random.shuffle(keys) for key in keys: if random.random() < 0.15: c = collect(s) try: expected = s[key] except IndexError: py.test.raises(IndexError, "c[key]") else: assert ''.join(c[key]) == expected def test_lazy_slice(): g = Gen('hello world') c = collect(g) assert c[:3] == ['h', 'e', 'l'] assert g.seen == ['h', 'e', 'l'] assert c[2:5] == ['l', 'l', 'o'] assert g.seen == ['h', 'e', 'l', 'l', 'o'] # to know that the last indexing [1] doesn't raise IndexError, # the collector must verify there there are more than 1+3+2 items assert c[:-2][:-3][1] == 'e' assert g.seen == ['h', 'e', 'l', 'l', 'o', ' ', 'w'] def test_lazy_revslice(): g = Gen('hello world') c = collect(g) assert c[4::-1] == ['o', 'l', 'l', 'e', 'h'] assert g.seen == ['h', 'e', 'l', 'l', 'o'] def test_len(): assert len(collect('hello world')) == 11 c = collect('hello world') c[5] assert len(c) == 11 c = collect('hello world') assert len(list(c)) == 11 assert len(c) == 11 def test_repr(): c = collect(xrange(10)) assert repr(c) == 'collect([0, 1, 2...)' assert c[2] == 2 assert repr(c) == 'collect([0, 1, 2...)' assert c[3] == 3 assert repr(c) == 'collect([0, 1, 2, 3...)' assert c[9] == 9 assert repr(c) == 'collect([0, 1, 2, 3, 4, 5, 6, 7, 8, 9...)' assert c[-1] == 9 assert repr(c) == 'collect([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])' def test_str(): assert str(collect(range(5))) == 'collect([0, 1, 2, 3, 4])' def test_add(): assert list(collect('ab') + collect('cd')) == ['a', 'b', 'c', 'd'] assert list(collect('hello') + [1,2]) == ['h', 'e', 'l', 'l', 'o', 1, 2] assert list([3,4] + collect('hello')) == [3, 4, 'h', 'e', 'l', 'l', 'o'] def test_concat(): # exercise the various code paths in concat() expr1 = [[1,2,3,4], (1,2,3,4), {1:0,2:0,3:0,4:0}, collect([1,2,3,4]), xrange(1,5)] expr2 = [[5,6,7,8], (5,6,7,8), {5:0,6:0,7:0,8:0}, collect([5,6,7,8]), xrange(5,9)] for e1 in expr1: for e2 in expr2: e3 = collect(concat(e1, e2)) if isinstance(e1, dict) or isinstance(e2, dict): e3.sort() # order is not preserved assert e3 == [1,2,3,4,5,6,7,8] def test_mul(): for n in range(-2, 5): assert list(collect('ab') * n) == ['a', 'b'] * n assert list(n * collect('ab')) == ['a', 'b'] * n assert list(collect('') * n) == [] assert list(n * collect('')) == [] # Note: CPython lists give an OverflowError in the following test. # Here we mainly check that collect('')*n can be computed in time < n. assert list(collect('') * 1000000000000000000000) == [] assert list(1000000000000000000000 * collect('')) == [] def test_compare(): assert collect('abcd') == collect('abcd') assert collect('abcd') == list('abcd') assert list('abcd') == collect('abcd') assert collect('') == [] assert collect('abc') < collect('abcd') assert collect('abc') != collect('abcd') assert collect('abc') <= collect('abcd') assert collect('abd') > collect('abcd') assert collect('abd') >= collect('abcd') assert collect('') < collect('abcd') assert collect('abcd') <= collect('abcd') assert list('abc') <= collect('abc') assert not (collect('abcd') != collect('abcd')) assert not (collect('abcd') != list('abcd')) assert not (list('abcd') != collect('abcd')) assert not (collect('') != []) assert not (collect('abc') >= collect('abcd')) assert not (collect('abc') == collect('abcd')) assert not (collect('abc') > collect('abcd')) assert not (collect('abd') <= collect('abcd')) assert not (collect('abd') < collect('abcd')) assert not (collect('') >= collect('abcd')) assert not (collect('abcd') > collect('abcd')) assert not (list('abc') > collect('abc')) def test_hash(): py.test.raises(TypeError, hash, collect('')) # ____________________________________________________________ def test_setitem(): c = collect('hello world') c[3] = 'X' assert ''.join(c) == 'helXo world' c[-2] = 'Y' assert ''.join(c) == 'helXo worYd' c[:3] = '12' assert ''.join(c) == '12Xo worYd' c[3:7:2] = '34' assert ''.join(c) == '12X3 4orYd' del c[-2:] assert ''.join(c) == '12X3 4or' del c[::-3] assert ''.join(c) == '1X34o' c[4:2] = 'Z' assert ''.join(c) == '1X34Zo' c[2:3] = c assert ''.join(c) == '1X1X34Zo4Zo' del c[1] assert ''.join(c) == '11X34Zo4Zo' del c[-1] assert ''.join(c) == '11X34Zo4Z' py.test.raises(IndexError, "c[9] = '?'") py.test.raises(IndexError, "c[999] = '?'") py.test.raises(IndexError, "c[-10] = '?'") py.test.raises(IndexError, "del c[9]") py.test.raises(IndexError, "del c[999]") py.test.raises(IndexError, "del c[-10]") def test_lazy_setitem(): g = Gen('hello world') c = collect(g) c[2] = 'X' assert g.seen == ['h', 'e', 'l'] c[-1] = 'Y' #assert g.seen == ['h', 'e', 'l'] XXX assert ''.join(c) == 'heXlo worlY' def test_lazy_delitem(): g = Gen('hello world') c = collect(g) del c[2] assert g.seen == ['h', 'e', 'l'] del c[-1] #assert g.seen == ['h', 'e', 'l'] XXX assert ''.join(c) == 'helo worl' def test_random_setitem(): s = range(39) c = collect(s) expected = list(s) keys = range(-len(s)-5, len(s)+5) choices = keys + [None]*12 stepchoices = [None, None, None, 1, 1, -1, -1, 2, -2, len(s)-1, len(s), len(s)+1, -len(s)-1, -len(s), -len(s)+1] for i in range(200): keys.append(slice(random.choice(choices), random.choice(choices), random.choice(stepchoices))) random.shuffle(keys) n = len(s) for key in keys: if random.random() < 0.15: random.shuffle(s) c = collect(s) expected = list(s) try: value = expected[key] except IndexError: py.test.raises(IndexError, "c[key] = 42") else: if isinstance(value, int): # non-slicing c[key] = n expected[key] = n n += 1 else: # slice assignment mode = random.choice(['samesize', 'resize', 'delete']) if mode == 'delete': del c[key] del expected[key] elif mode == 'samesize': newvalue = range(n, n+len(value)) c[key] = newvalue expected[key] = newvalue n += len(newvalue) elif mode == 'resize' and key.step is None: newvalue = range(n, n+random.randrange(0, 20)) c[key] = newvalue expected[key] = newvalue n += len(newvalue) assert list(c) == expected def test_append(): c = collect('hello world') c.append('X') assert ''.join(c) == 'hello worldX' c.append('Y') c.append('Z') assert ''.join(c) == 'hello worldXYZ' def test_iadd(): c = collect('hello ') c += 'world' assert ''.join(c) == 'hello world' c += collect('!') assert ''.join(c) == 'hello world!' c += c assert ''.join(c) == 'hello world!hello world!' def test_imul(): c = collect('hello') c *= 3 assert ''.join(c) == 'hellohellohello' def test_extend(): c = collect('hello ') c.extend('world') assert ''.join(c) == 'hello world' c.extend(collect('!')) assert ''.join(c) == 'hello world!' c.extend(c) assert ''.join(c) == 'hello world!hello world!' def test_count(): c = collect('hello') assert c.count('l') == 2 assert c.count('h') == 1 assert c.count('w') == 0 def test_index(): c = collect('hello world') assert c.index('l') == 2 py.test.raises(ValueError, "c.index('!')") assert c.index('l', 3) == 3 assert c.index('l', 4) == 9 py.test.raises(ValueError, "c.index('l', 10)") assert c.index('l', -5) == 9 assert c.index('l', -25) == 2 assert c.index('o', 1, 5) == 4 py.test.raises(ValueError, "c.index('o', 1, 4)") assert c.index('o', 1, 5-11) == 4 py.test.raises(ValueError, "c.index('o', 1, 4-11)") def test_lazy_index(): g = Gen('hello world') c = collect(g) assert c.index('l', 4) == 9 assert g.seen == ['h', 'e', 'l', 'l', 'o', ' ', 'w', 'o', 'r', 'l'] def test_insert(): c = collect('hello world') c.insert(0, 'X') assert c[:4] == ['X', 'h', 'e', 'l'] c.insert(2, 'Y') c.insert(-2, 'Z') assert ''.join(c) == 'XhYello worZld' def test_lazy_insert(): g = Gen('hello world') c = collect(g) c.insert(2, 'Y') assert g.seen == ['h', 'e'] def test_pop(): c = collect('hello world') s = '' for i in range(11): s += c.pop() assert s == 'dlrow olleh' py.test.raises(IndexError, c.pop) assert len(c) == 0 def test_lazy_pop(): g = Gen('hello world') c = collect(g) assert c.pop(4) == 'o' assert g.seen == ['h', 'e', 'l', 'l', 'o'] assert ''.join(c) == 'hell world' def test_remove(): c = collect('hello world') c.remove('l') assert ''.join(c) == 'helo world' c.remove('l') assert ''.join(c) == 'heo world' c.remove('l') assert ''.join(c) == 'heo word' py.test.raises(ValueError, "c.remove('l')") assert ''.join(c) == 'heo word' def test_lazy_remove(): g = Gen('hello world') c = collect(g) c.remove('o') assert g.seen == ['h', 'e', 'l', 'l', 'o'] def test_reverse(): c = collect('hello world') c.reverse() assert ''.join(c) == 'dlrow olleh' def test_sort(): c = collect('hello world') c.sort() assert ''.join(c) == ' dehllloorw' for i in range(50): size = int(min(1000.0, random.expovariate(0.05))) s = range(size) random.shuffle(s) c = collect(s) c.sort() assert c == range(size) def test_stable_sort(): s = [0] * 20 + [0.0] * 20 random.shuffle(s) c = collect(s) c.sort() assert c == s def test_sort_key(): c = collect(range(10)) c.sort(key=lambda n: n % 3.9) assert c == [0, 4, 8, 1, 5, 9, 2, 6, 3, 7] def test_sort_reverse(): c = collect('hello world') c.sort(reverse=True) assert ''.join(c) == 'wroolllhed ' def test_stable_sort_reverse(): s = [0] * 20 + [0.0] * 20 random.shuffle(s) c = collect(s) c.sort(reverse=True) s.reverse() assert c == s def test_sort_cmp(): c = collect(range(10)) c.sort(cmp=lambda x, y: cmp(y % 3.9, x % 3.9)) assert c == [7, 3, 6, 2, 9, 5, 1, 8, 4, 0] def test_sort_key_cmp(): c = collect(range(10)) c.sort(key=lambda n: n % 3.9, cmp=lambda x, y: cmp(y, x)) assert c == [7, 3, 6, 2, 9, 5, 1, 8, 4, 0] # ____________________________________________________________ class TestLaziness: def __init__(self): self.seen = [] self.deleted = [] def gen(self, seq): for item in seq: result = [(item, Deletable(self.deleted))] self.seen += result yield result.pop() # trick to avoid to keep a local ref def verify(self, newlyseen, newlydeleted=0): assert [pair[0] for pair in self.seen] == newlyseen pair = None del self.seen[:] assert len(self.deleted) == newlydeleted del self.deleted[:] def consume(self, it, result, newlyseen, newlydeleted=0): self.verify([]) assert it.next()[0] == result self.verify(newlyseen, newlydeleted) def test_add_lazy(self): g1 = self.gen('ab') g2 = self.gen('cd') it = iter(collect(g1) + collect(g2)) self.consume(it, 'a', ['a'], 1) self.consume(it, 'b', ['b'], 1) self.consume(it, 'c', ['c'], 1) self.consume(it, 'd', ['d'], 1) py.test.raises(StopIteration, it.next) def test_mul_lazy(self): g1 = self.gen('ab') it = iter(collect(g1) * 3) self.consume(it, 'a', ['a'], 0) self.consume(it, 'b', ['b'], 0) self.consume(it, 'a', [], 0) self.consume(it, 'b', [], 0) self.consume(it, 'a', [], 1) self.consume(it, 'b', [], 1) py.test.raises(StopIteration, it.next) self.verify([]) def test_slice_lazy(self): g1 = self.gen('0123456789') it = iter(collect(g1)[3:8:2]) self.verify(['0','1','2'], 3) self.consume(it, '3', ['3'], 1) self.consume(it, '5', ['4','5'], 2) self.consume(it, '7', ['6','7'], 2) py.test.raises(StopIteration, it.next) self.verify(['8'], 1) def test_append_lazy(self): g1 = self.gen('abc') c = collect(g1) c.append(('d', Deletable(self.deleted))) c.append(('e', Deletable(self.deleted))) it = iter(c) del c self.consume(it, 'a', ['a'], 1) self.consume(it, 'b', ['b'], 1) self.consume(it, 'c', ['c'], 1) self.consume(it, 'd', [], 1) self.consume(it, 'e', [], 1) py.test.raises(StopIteration, it.next) self.verify([]) def test_sort_lazy(self): chk = [] def my_cmp(x, y): chk.append((x, y)) return cmp(x, y) s = range(1000) random.shuffle(s) c = collect(s) c.sort(my_cmp) assert len(chk) < 2000 # a full sort requires around 8600 compares del chk[:] for x in c: assert len(chk) < 20 del chk[:] # ____________________________________________________________ def test_accesstime(): # the following loops take ages if the operations are not eventully # constant-time. c = collect(xrange(10000)) for i in range(5000): c[-i] for i in range(5000): c[-i] = i for i in range(1000): c.pop() assert len(c) == 9999-i