#! /usr/bin/env python """Serve files from a remote ssh server, downloading them lazily. Suppose you want to play video files from sshhostname:remotedir/file.avi. Step 1: generate a local file with the '.redir' extension which contain a list of block checksums from the remote file. Example: lazyconnect.py file.avi [file2.avi..] sshhostname:remotedir Step 2: expose the '.redir' files in a Fuse mount: lazyconnect.py mountpoint mplayer mountpoint/file.avi This looks for '$CWD/*.redir' files ($CWD is the current working dir). When applications try to read the files from the mountpoint directory, they are downloaded piecewise and stored in '$CWD/*' files. If they are completely downloaded, the '$CWD/*' file is then a normal exact copy of the remote files and the corresponding .redir file is removed; e.g. you get a complete local 'file.avi' after you played it completely. """ import py import os, sha, sys from handler import Handler from objectfs import ObjectFs, SymLink class RedirFile(object): def __init__(self, localfile, redirfile): data = py.path.local(redirfile).read() assert data.startswith('{') and data.endswith('}\n') self.redirfile = redirfile self.redir = eval(data) self.localfile = localfile self.changed = False self.requested = [] blocksize = self.redir['blocksize'] totalblocks = (self.redir['size'] + blocksize - 1) // blocksize print '%s: %d/%d blocks to check' % (self.localfile, len(self.redir['checksums']), totalblocks) def size(self): return self.redir['size'] def read(self): try: f = open(self.localfile, 'r+b', 0) except IOError: f = open(self.localfile, 'w+b', 0) return OpenFile(self, self.redir, f) def direct_connect(self): print 'connecting to', self.redir['host'] self.gw = py.execnet.SshGateway(self.redir['host']) self.channel = self.gw.remote_exec(""" filename = channel.receive() blocksize = channel.receive() f = open(filename, 'rb', 0) for n in channel: f.seek(n * blocksize) channel.send(f.read(blocksize)) f.close() """) self.channel.send(self.redir['remotefile']) self.channel.send(self.redir['blocksize']) self.post_request = self.channel.send self.receive_answer = self.channel.receive def sha_connect(self): print 'connecting to sha server', self.redir['sha_client'] self.client = sha_client_connect(*self.redir['sha_client']) def post_request(n): self.client.post_read_request(self.redir['checksums'][n]) self.post_request = post_request self.receive_answer = self.client.receive_delayed_answer def post_request(self, n): if 'sha_client' in self.redir: self.sha_connect() else: self.direct_connect() self.post_request(n) def block_valid(self, n, f): if n not in self.redir['checksums']: return True blocksize = self.redir['blocksize'] saved_pos = f.tell() f.seek(n * blocksize) data = f.read(blocksize) f.seek(saved_pos) digest = sha.sha(data).digest() if digest == self.redir['checksums'][n]: self._acquired(n) return True else: return False def getanswer(self, n, f): data = self.receive_answer() digest = sha.sha(data).digest() assert digest == self.redir['checksums'][n] blocksize = self.redir['blocksize'] saved_pos = f.tell() f.seek(n * blocksize) f.write(data) f.seek(saved_pos) self._acquired(n) def _acquired(self, n): del self.redir['checksums'][n] self.changed = True if not self.redir['checksums']: self.flush() def request(self, n, f): # returns False if we know that the block is already valid if n in self.requested: return True elif self.block_valid(n, f): return False else: self.post_request(n) self.requested.append(n) return True def loadblocks(self, nums, f): if not nums: return pending = {} for n in nums: if self.request(n, f): pending[n] = True self.request(nums[-1]+1, f) self.request(nums[-1]+2, f) while pending: n = self.requested.pop(0) self.getanswer(n, f) pending.pop(n, None) def flush(self): if not self.changed: return if self.redir['checksums']: f = open(self.redirfile, 'w') f.write(repr(self.redir)) f.write('\n') f.close() else: try: os.unlink(self.redirfile) except OSError: pass self.changed = False class OpenFile(object): def __init__(self, _file, redir, flocal): self._file = _file self.redir = redir self.flocal = flocal self.flocal.seek(0, 2) totalsize = self.flocal.tell() self.flocal.seek(0) assert totalsize <= self.redir['size'] def seek(self, pos): self.flocal.seek(pos) def read(self, count): pos = self.flocal.tell() blocksize = self.redir['blocksize'] blocknums = range(pos // blocksize, (pos + count - 1) // blocksize + 1) self.loadblocks(blocknums) assert pos == self.flocal.tell() return self.flocal.read(count) def loadblocks(self, nums): self._file.loadblocks(nums, self.flocal) def build_redir(host, remotefile, blocksize=64*1024): gw = py.execnet.SshGateway(host) channel = gw.remote_exec(""" import sha filename = channel.receive() blocksize = channel.receive() f = open(filename, 'rb') while True: data = f.read(blocksize) if not data: break channel.send(sha.sha(data).digest()) channel.send(f.tell()) f.close() """) channel.send(remotefile) channel.send(blocksize) checksums = {} n = 0 while True: msg = channel.receive() if type(msg) is not str: break checksums[n] = msg n += 1 assert type(msg) in (int, long) return { 'host': host, 'remotefile': remotefile, 'blocksize': blocksize, 'checksums': checksums, 'size': msg, } def sha_client_connect(host, dir): import sys sys.path.insert(0, '/home/arigo/svn/my/python/hacks/offload') import client return client.Connexion(host, dir) def build_sha_redir(host, dir, checksum): client = sha_client_connect(host, dir) st_size, chain = client.filestat(checksum) checksums = {} p = 0 for n in range(len(chain) // sha.digest_size): q = p + sha.digest_size checksums[n] = chain[p:q] p = q return { 'sha_client': (host, dir), 'sha': checksum, 'blocksize': client.BLOCKSIZE, 'checksums': checksums, 'size': st_size, } def serve(mountpoint): root = {} for fn in os.listdir('.'): if fn.endswith('.redir'): root[fn[:-6]] = RedirFile(fn[:-6], fn) for fn in os.listdir('.'): if fn not in root and not fn.endswith('.redir'): root[fn] = SymLink(os.path.abspath(fn)) try: handler = Handler(mountpoint, ObjectFs(root), max_read=256*1024) handler.loop_forever() finally: for f in root.values(): if isinstance(f, RedirFile): f.flush() def make_redir(localfile, host, remotedir): if os.path.islink(localfile): shalink = os.readlink(localfile) assert shalink.startswith('/mnt/sha/') redir = build_sha_redir(host, remotedir, shalink[9:].decode('hex')) os.unlink(localfile) f = open(localfile, 'wb') f.close() else: remotefile = os.path.join(remotedir, os.path.basename(localfile)) redir = build_redir(host, remotefile) f = open(localfile + '.redir', 'w') f.write(repr(redir)) f.write('\n') f.close() print 'wrote', f.name if __name__ == '__main__': if len(sys.argv) == 2: mountpoint = sys.argv[1] serve(mountpoint) else: localfiles = sys.argv[1:-1] host_and_dir = sys.argv[-1] host, remotedir = host_and_dir.split(':') for localfile in localfiles: make_redir(localfile, host, remotedir)