""" A block storage format. Stores blocks of 4096 bytes in a single file that also contains an SHA-based index. Using the index, reading a block from its SHA hash is fast, and duplicate blocks are never stored twice. """ import os, struct, mmap, sys, md5, sha def readall(fd, size): buffer = '' while len(buffer) < size: data = os.read(fd, size - len(buffer)) if not data: raise EOFError buffer += data return buffer def writeall(fd, data): while data: count = os.write(fd, data) if not count: raise EOFError data = data[count:] class BlockFS(object): MAGIC = 'BlkFs\x01\x0c' blockshift = 12 # 4096 bytes mmap = None fd = None digest_size = sha.digest_size def __init__(self, path): self.fd = os.open(path, os.O_RDWR) magic = readall(self.fd, 8) assert magic[:7] == self.MAGIC hash_bits = ord(magic[7]) assert 9 <= hash_bits < 32 self.open_map(hash_bits) def get_init_string(cls): return cls.MAGIC + '\x09' + '\x00' * (4096-8) get_init_string = classmethod(get_init_string) def open_map(self, hash_bits): self.blocksize = 1 << self.blockshift self.blockmask = self.blocksize - 1 hashcount = 1 << hash_bits self.hash_bits = hash_bits self.hash_mask = hashcount - 1 self.hash_table_size = hashcount * 8 self.mmap = mmap.mmap(self.fd, self.hash_table_size) def close_map(self): self.mmap.close() self.mmap = None def close(self): if self.mmap is not None: self.close_map() if self.fd is not None: os.close(self.fd) self.fd = None def __del__(self): self.close() def digest(self, blockdata): "Can be overridden to use another hashing function." return sha.new(blockdata).digest() def readblock(self, n): os.lseek(self.fd, n << self.blockshift, 0) return readall(self.fd, self.blocksize) def locate(self, key): fullhash, = struct.unpack("!i", key[:4]) h = fullhash step = 1 while 1: h &= self.hash_mask h1, n1 = struct.unpack("!iI", self.mmap[h<<3:(h+1)<<3]) if n1 == 0: # empty bucket return False, h, fullhash if h1 == fullhash: # filled bucket with a matching hash try: block = self.readblock(n1) except EOFError: pass else: if self.digest(block) == key: return True, h, block h += step step += 1 def __getitem__(self, key): found, h, block = self.locate(key) #print 'Reading %s in bucket 0x%x: %r' % (key.encode('hex'), h, found) if found: return block else: raise KeyError def getitems(self, keys): return map(self.__getitem__, keys) def get_block_count(self): try: used = self.buckets_used except AttributeError: used = 0 zero = '\x00' * 8 for h in range(0, self.hash_table_size, 8): used += self.mmap[h:h+8] != zero self.buckets_used = used return used - 1 def set_buckets_used(self, delta=0): self.get_block_count() self.buckets_used += delta def write_new_block(self, blockdata): position = os.lseek(self.fd, 0, 2) if position & self.blockmask: # padding needed? position = (position & ~self.blockmask) + 1 os.lseek(self.fd, position, 0) writeall(self.fd, blockdata) return position >> self.blockshift ## def next_position(self): ## position = os.lseek(self.fd, 0, 2) ## if position & self.blockmask: # padding needed? ## position = (position & ~self.blockmask) + 1 ## return position ## def write_blocks(self, frm, to): ## n = self.next_position() ## for p in xrange(frm << self.blockshift, to << self.blockshift, ## self.blocksize): ## self.write_new_block(self.mmap[p:p+self.blocksize]) ## return n def reserve_new_space(self, blockcount): position = os.lseek(self.fd, 0, 2) if position & self.blockmask: # padding needed? position = (position & ~self.blockmask) + 1 os.ftruncate(self.fd, position + (blockcount << self.blockshift)) return position >> self.blockshift def expand_hash_table(self): print >> sys.stderr, "Rehashing...", assert (self.hash_table_size & self.blockmask) == 0 oldhashblocks = self.hash_table_size >> self.blockshift newhashblocks = oldhashblocks * 2 # rehash newtable = self.reserve_new_space(newhashblocks) newtableofs = newtable << self.blockshift fd = self.fd os.lseek(fd, newtableofs, 0) writeall(fd, self.MAGIC + chr(self.hash_bits + 1)) new_hash_mask = (self.hash_mask << 1) | 1 relocation_delta = newtable - oldhashblocks for srch in xrange(8, self.hash_table_size, 8): fullhash, bucket_n = struct.unpack("!iI", self.mmap[srch:srch+8]) if bucket_n == 0: pass # empty bucket in the old hash table else: h = fullhash step = 1 while 1: h &= new_hash_mask os.lseek(fd, newtableofs + (h<<3), 0) h1, n1 = struct.unpack("!iI", readall(fd, 8)) if n1 == 0: break # found an empty bucket in the new table h += step step += 1 # data blocks below newhashblocks will be relocated if bucket_n < newhashblocks: bucket_n += relocation_delta os.lseek(fd, newtableofs + (h<<3), 0) writeall(fd, struct.pack("!iI", fullhash, bucket_n)) self.close_map() # exchange the new table and the data blocks to relocate bs = self.blocksize olddataofs = oldhashblocks << self.blockshift newtablemiddleofs = newtableofs + olddataofs for pos in xrange(0, oldhashblocks << self.blockshift, bs): os.lseek(fd, olddataofs + pos, 0); data1 = readall(fd, bs) os.lseek(fd, newtableofs + pos, 0); data2 = readall(fd, bs) os.lseek(fd, newtablemiddleofs + pos, 0); data3 = readall(fd, bs) os.lseek(fd, newtableofs + pos, 0); writeall(fd, data1) os.lseek(fd, olddataofs + pos, 0); writeall(fd, data3) os.lseek(fd, pos, 0); writeall(fd, data2) del data3, data2, data1 os.ftruncate(fd, newtablemiddleofs) # done self.open_map(self.hash_bits + 1) print >> sys.stderr, "done, table has now %d bits" % (self.hash_bits,) def store(self, blockdata): assert len(blockdata) == self.blocksize key = self.digest(blockdata) found, h, block = self.locate(key) if found: assert blockdata == block else: fullhash = block n = self.write_new_block(blockdata) #print 'Adding %s -> position 0x%x' % (key.encode('hex'), n) self.set_buckets_used(+1) used = self.buckets_used self.mmap[h<<3:(h+1)<<3] = struct.pack("!iI", fullhash, n) if used * 3 > self.hash_mask * 2: # more than 2/3rd full? self.expand_hash_table() return key class MD5BlockFS(BlockFS): MAGIC = 'BlkFs\x00\x0c' digest_size = md5.digest_size def digest(self, blockdata): return md5.new(blockdata).digest()