import py, random, sys from hashtable import * udir = py.path.local.make_numbered_dir(prefix='usession-', keep=3) def test_int_hashtable(): fn = str(udir.join('table1')) expected = {} t = HashTable(fn, "i", "ii") for i in range(10000): key = random.randrange(1, 99999) values = ( random.randrange(-sys.maxint, sys.maxint), random.randrange(-sys.maxint, sys.maxint), ) expected[key] = values t[key] = values #assert len(t) == len(expected) for key, values in expected.items(): got = t[key] assert got == values t.close() t = HashTable(fn, "i", "ii") for key, values in expected.items(): got = t[key] assert got == values #assert len(t) == len(expected) t.close() def test_sha_hashtable(): fn = str(udir.join('table2')) expected = {} t = HashTable(fn, "20s", "ii", hashhead) for i in range(10000): key = os.urandom(20) values = ( random.randrange(-sys.maxint, sys.maxint), random.randrange(-sys.maxint, sys.maxint), ) expected[key] = values t[key] = values #assert len(t) == len(expected) for key, values in expected.items(): got = t[key] assert got == values t.close() t = HashTable(fn, "20s", "ii", hashhead) for key, values in expected.items(): got = t[key] assert got == values #assert len(t) == len(expected) t.close() def test_outdated(): d = {} keys = [] removed = {} class MyTable(HashTable): def outdated(self, bucket, index=None): return self.bucketvalues(bucket)[0] in removed def check(): for k, value in d.iteritems(): assert k in t assert t[k] == value for time, k in removed.iteritems(): if k not in d: assert k not in t py.test.raises(KeyError, "t[k]") fn = str(udir.join('table3')) t = MyTable(fn, "i", "i") for time in range(20000): r = random.random() if r < 0.19999 and keys: k = keys.pop(int(r*5.0*len(keys))) removed[d[k][0]] = k del d[k] elif (1 - r) * time > 5.5: k = int(r * 100000) + 1 if k in d: continue values = (time,) keys.append(k) d[k] = values t[k] = values else: print time check() print '.' check() def test_no_growth_with_outdated_items(): class MyTable(HashTable): def outdated(self, bucket, index=None): j = self.bucketvalues(bucket)[0] return j < i-500 # only keep the 500 most recent values fn = str(udir.join('table4')) t = MyTable(fn, "i", "i") for i in range(1, 45000): t[i] = i, t.close() assert os.path.getsize(fn) <= 16384