#! /usr/bin/env python # This one is READ-ONLY! import py, lufs, os, time, thread, time from lufs import FileSystem, SilentFSError STUPIDLY_SLEEP = 0.99 # ____________________________________________________________ UID = os.getuid() GID = os.getgid() IGNORE_OS_FLAGS = ~(os.O_NOCTTY | os.O_APPEND | os.O_NONBLOCK | os.O_NDELAY | os.O_SYNC | os.O_NOFOLLOW | os.O_DIRECTORY | os.O_LARGEFILE) def normalize_stat(st): if st is None: raise OSError, 'lstat' st = list(st) assert len(st) == 10 st[4] = UID st[5] = GID return tuple(st) class ExecNetFS(FileSystem): def __init__(self, factory): self.factory = factory def readdir(self, dir): thunk = self.factory.dircache[self.factory.fn(dir)] lst = thunk.get().keys() lst.sort() return lst def stat(self, filename): filename = filename.strip('/') if filename == '': return self.factory.rootstat i = filename.rfind('/') dirname = filename[:i+1] basename = filename[i+1:] thunk = self.factory.dircache[self.factory.fn(dirname)] content = thunk.get() try: return normalize_stat(content[basename]) except KeyError: raise SilentFSError, filename def open(self, filename, flags): modif = flags & IGNORE_OS_FLAGS if modif: raise IOError, "open(%s): read-only file system" % (filename,) def release(self, filename): pass def read(self, filename, offset, count): filename = self.factory.fn(filename) result = [] blocksize = self.factory.BLOCKSIZE p = 0 while count > 0: p = (offset // blocksize) * blocksize thunk = self.factory.fileblockcache[filename, p] data = thunk.get() data = data[offset-p:offset+count-p] if not data: break result.append(data) offset += len(data) count -= len(data) p += blocksize else: for i in range(self.factory.READAHEAD): self.factory.fileblockcache[filename, p] p += blocksize return ''.join(result) def readlink(self, link): thunk = self.factory.symlinkcache[self.factory.fn(link)] return thunk.get() class Thunk: def __init__(self, channel): self.channel = channel def get(self): try: return self.result except AttributeError: print self, 'waiting' try: self.result = self.channel.receive() print self, 'ok' except EOFError: print self, 'EOFError' return self.result class Cache: def __init__(self, gateway, max_entries, source): self.gateway = gateway self.source = source self.entries = {} self.max_entries = max_entries self.next = 0 self.lock = thread.allocate_lock() def __getitem__(self, key): try: thunk = self.entries[key] except KeyError: self.lock.acquire() try: if key in self.entries: thunk = self.entries[key] else: time.sleep(STUPIDLY_SLEEP) channel = self.gateway.remote_exec(self.source) channel.send(key) thunk = Thunk(channel) thunk.time = self.next print thunk, 'requested', key self.entries[key] = thunk finally: self.lock.release() while len(self.entries) > self.max_entries: _, oldestkey = min([(value.time, key) for key, value in self.entries.items()]) try: del self.entries[oldestkey] except KeyError: pass thunk.time = self.next self.next += 1 return thunk class ExecNetFSFactory: BLOCKSIZE = 128 * 1024 READAHEAD = 2 CACHESIZE = 10 * 1024*1024 def __init__(self, gw, root='/'): self.gw = gw if not root.endswith('/'): root += '/' self.root = root channel = gw.remote_exec(""" import os root = channel.receive() channel.send(tuple(os.stat(root))) """) channel.send(root) self.rootstat = normalize_stat(channel.receive()) self.dircache = Cache(gw, 100, """ import os dir = channel.receive() res = {} for fn in os.listdir(dir): try: st = tuple(os.lstat(dir+'/'+fn)) except OSError: st = None res[fn] = st channel.send(res) """) self.fileblockcache = Cache(gw, self.CACHESIZE//self.BLOCKSIZE + 1, """ filename, pos = channel.receive() f = open(filename, 'rb') try: f.seek(pos) channel.send(f.read(%d)) finally: f.close() """ % self.BLOCKSIZE) self.symlinkcache = Cache(gw, 100, """ import os link = channel.receive() channel.send(os.readlink(link)) """) def __call__(self): return ExecNetFS(self) def fn(self, path): return self.root + path.strip('/') # ____________________________________________________________ if __name__ == '__main__': import sys hostname = sys.argv[1] gw = py.execnet.SshGateway(hostname) lufs.mount(ExecNetFSFactory(gw), sys.argv[2]) ## import sys ## gw = py.execnet.PopenGateway() ## lufs.mount(ExecNetFSFactory(gw), sys.argv[1])