#! /usr/bin/env python import py, lufs, os, time, thread from lufs import FileSystem, SilentFSError # ____________________________________________________________ UID = os.getuid() GID = os.getgid() IGNORE_OS_FLAGS = ~(os.O_NOCTTY | os.O_APPEND | os.O_NONBLOCK | os.O_NDELAY | os.O_SYNC | os.O_NOFOLLOW | os.O_DIRECTORY | os.O_LARGEFILE) def normalize_stat(st): if st is None: raise OSError, 'lstat' st = list(st) assert len(st) == 10 st[4] = UID st[5] = GID return tuple(st) class ExecNetFS(FileSystem): def __init__(self, factory): self.factory = factory def readdir(self, dir): lst = self.factory.getdir(dir).keys() lst.sort() return lst def stat(self, filename): filename = filename.strip('/') if filename == '': return self.factory.rootstat i = filename.rfind('/') dirname = filename[:i+1] basename = filename[i+1:] content = self.factory.getdir(dirname) try: return normalize_stat(content[basename]) except KeyError: raise SilentFSError, filename def setattr(self, filename, attribs): channel = self.factory.sync_exec('setattr', """ filename, mode, size, atime, mtime = channel.receive() st = os.lstat(filename) if st.st_size != size is not None: fd = os.open(filename, os.O_WRONLY) os.ftruncate(fd, size) os.close(fd) st = os.lstat(filename) if st.st_mode != mode is not None: os.chmod(filename, mode) if (st.st_mtime != mtime or st.st_atime != atime) and mtime is not None: os.utime(filename, (atime, mtime)) """) mode, nlink, uid, gid, size, atime, mtime, ctime = attribs channel.send((self.factory.modifying(filename), mode, size, atime, mtime)) def open(self, filename, flags): modif = flags & IGNORE_OS_FLAGS channel = self.factory.sync_exec(modif and 'open', """ filename, flags = channel.receive() fd = os.open(filename, flags) os.close(fd) """) if modif: self.factory.modifying(filename) filename = filename.strip('/') channel.send((self.factory.root + filename, flags)) def create(self, filename, mode): channel = self.factory.sync_exec('create', """ filename, mode = channel.receive() fd = os.open(filename, os.O_CREAT|os.O_WRONLY|os.O_TRUNC, mode) os.close(fd) """) channel.send((self.factory.modifying(filename), mode)) def release(self, filename): filename = filename.strip('/') try: del self.factory.openfiles[filename] except KeyError: pass def read(self, filename, offset, count): data = self.factory.getfile(filename) return data[offset:offset+count] def readlink(self, link): channel = self.factory.sync_exec(False, """ link = channel.receive() channel.send(os.readlink(link)) """) channel.send(self.factory.root + link) return channel.receive() def write(self, filename, offset, data): channel = self.factory.sync_exec('write', """ filename, offset, data = channel.receive() f = open(filename, 'wb') f.seek(offset) f.write(data) f.close() """) filename = filename.strip('/') channel.send((self.factory.modifying(filename), offset, data)) return len(data) def unlink(self, filename): channel = self.factory.sync_exec('unlink', """ filename = channel.receive() os.unlink(filename) """) channel.send(self.factory.modifying(filename)) channel.waitclose() def mkdir(self, dirname, mode): channel = self.factory.sync_exec('mkdir', """ dirname, mode = channel.receive() os.mkdir(dirname, mode) """) channel.send((self.factory.modifying(dirname), mode)) channel.waitclose() def rmdir(self, dirname): channel = self.factory.sync_exec('rmdir', """ dirname = channel.receive() os.rmdir(dirname) """) channel.send(self.factory.modifying(dirname)) channel.waitclose() def rename(self, fromname, toname): channel = self.factory.sync_exec('rename', """ fromname, toname = channel.receive() os.rename(fromname, toname) """) channel.send((self.factory.modifying(fromname), self.factory.modifying(toname))) channel.waitclose() def link(self, target, link): channel = self.factory.sync_exec('link', """ target, link = channel.receive() os.link(target, link) """) channel.send((self.factory.modifying(target), self.factory.modifying(link))) channel.waitclose() def symlink(self, target, link): channel = self.factory.sync_exec('symlink', """ target, link = channel.receive() os.symlink(target, link) """) channel.send((target, self.factory.modifying(link))) channel.waitclose() class ExecNetFSFactory: def __init__(self, gw, root='/'): self.gw = gw if not root.endswith('/'): root += '/' self.root = root self.dircache = {} self.openfiles = {} self.rev = 0 self.revlock = thread.allocate_lock() channel = gw.remote_exec(""" import os, thread LOCKS = {} HEAD = 0 def syncrev(rev): if rev <= HEAD: return try: lock = LOCKS[rev] except KeyError: lock = thread.allocate_lock() lock.acquire() lock = LOCKS.setdefault(rev, lock) if rev <= HEAD: try: del LOCKS[rev] except KeyError: pass return lock.acquire() lock.release() def precommit(newrev): assert newrev == HEAD+1 lock = thread.allocate_lock() lock.acquire() LOCKS.setdefault(newrev, lock) def postcommit(): global HEAD newrev = HEAD+1 LOCKS[newrev].release() HEAD = newrev try: del LOCKS[newrev] except KeyError: pass channel.gateway._enfs2syncrev = syncrev channel.gateway._enfs2precommit = precommit channel.gateway._enfs2postcommit = postcommit root = channel.receive() channel.send(tuple(os.stat(root))) """) channel.send(root) self.rootstat = normalize_stat(channel.receive()) def __call__(self): return ExecNetFS(self) def sync_exec(self, modif, source): if modif: source = py.code.Source(source).putaround(""" import os rev = channel.receive() __notif_channel = channel.receive() channel.gateway._enfs2syncrev(rev) channel.gateway._enfs2precommit(rev+1) try: """, """ finally: channel.gateway._enfs2postcommit() __notif_channel.send(1) """) self.revlock.acquire() try: rev = self.rev self.rev = rev + 1 print '%s: %d...' % (modif, rev+1) finally: self.revlock.release() channel = self.gw.remote_exec(source) channel.send(rev) def notif(dummy): print 'At revision %d.' % (rev+1,) channel2 = self.gw.newchannel(notif) channel.send(channel2) return channel else: source = py.code.Source(""" import os rev = channel.receive() channel.gateway._enfs2syncrev(rev) """, source) channel = self.gw.remote_exec(source) channel.send(self.rev) return channel def getdir(self, dir): dir = dir.strip('/') try: res = self.dircache[dir] except KeyError: res = thread.allocate_lock() while isinstance(res, thread.LockType): lock = res lock.acquire() try: res = self.dircache.setdefault(dir, lock) if lock is res: channel = self.sync_exec(False, """ dir = channel.receive() res = {} for fn in os.listdir(dir): try: st = tuple(os.lstat(dir+'/'+fn)) except OSError: st = None res[fn] = st channel.send(res) """) channel.send(self.root + dir) print 'readdir:', repr(dir) res = channel.receive() self.dircache[dir] = res finally: lock.release() return res def getfile(self, filename): filename = filename.strip('/') try: res = self.openfiles[filename] except KeyError: res = thread.allocate_lock() while isinstance(res, thread.LockType): lock = res lock.acquire() try: res = self.openfiles.setdefault(filename, lock) if lock is res: channel = self.sync_exec(False, """ filename = channel.receive() f = open(filename, 'rb') channel.send(f.read()) f.close() """) channel.send(self.root + filename) print 'read:', repr(filename) res = channel.receive() self.openfiles[filename] = res finally: lock.release() return res def modifying(self, filename): filename = filename.strip('/') try: del self.openfiles[filename] except KeyError: pass i = filename.rfind('/') if i < 0: i = 0 dirname = filename[:i] if dirname in self.dircache: self.getdir(dirname) del self.dircache[dirname] return self.root + filename # ____________________________________________________________ if __name__ == '__main__': import sys hostname = sys.argv[1] gw = py.execnet.SshGateway(hostname) lufs.mount(ExecNetFSFactory(gw), sys.argv[2]) ## import sys ## gw = py.execnet.PopenGateway() ## lufs.mount(ExecNetFSFactory(gw), sys.argv[1])