#! /usr/bin/env python import py, lufs, thread, os, time, re from stat import * from Queue import Queue from lufs import FileSystem WRITE_POLICY = "SW" # RO: read only, # CW: copy on write (original file system not modified) # SW: synchronous writes # BW: background writes UID = os.getuid() GID = os.getgid() UMASK = os.umask(0); os.umask(UMASK) # ____________________________________________________________ class Node: modif = False def __init__(self, st): st = list(st) assert len(st) == 10 st[ST_UID] = UID st[ST_GID] = GID self.st = st class File(Node): content = None class Dir(Node): entries = None class SymLink(Node): data = None NODE_CLASS = { S_IFDIR: Dir, S_IFREG: File, S_IFLNK: SymLink, } def newstat(ifmt, mode): now = time.time() if ifmt == S_IFDIR: nlink = 2 size = 4096 else: nlink = 1 size = 0 return [ifmt | (mode & ~UMASK), 0, 0, nlink, UID, GID, size, now, now, now] def NO_OP(*args): pass class ROExecNetFS: def __init__(self, gateway): self.gateway = gateway self.root = Dir([040755, 0, 0, 2, 0, 0, 0, 0, 0, 0]) def grabparent(self, path): i = path.rfind('/') if i <= 0: parent = '/' else: parent = path[:i] node = self.grab(parent) if node.entries is None: self.readdir(parent) return node, path[i+1:] def grab(self, path): if path == '/': return self.root parent, name = self.grabparent(path) return parent.entries[name] def readdir(self, dir): node = self.grab(dir) assert isinstance(node, Dir) if node.entries is None: print dir channel = self.gateway.remote_exec(""" import os path = channel.receive() result = [] dirname = path if not dirname.endswith('/'): dirname += '/' for name in os.listdir(path): result.append((name, os.lstat(dirname+name))) channel.send(result) """) channel.send(dir) entries = {} dirname = dir if not dirname.endswith('/'): dirname += '/' for name, st in channel.receive(): subnode = NODE_CLASS.get(S_IFMT(st[ST_MODE]), Node)(st) entries[name] = subnode node.entries = entries return node.entries def stat(self, filename): return self.grab(filename).st open = NO_OP release = NO_OP def read(self, filename, offset, count): node = self.grab(filename) assert isinstance(node, File) if node.content is None: print filename channel = self.gateway.remote_exec(""" f = open(channel.receive(), 'rb') data = f.read() f.close() channel.send(data) """) channel.send(filename) node.content = channel.receive() return node.content[offset:offset+count] def readlink(self, path): link = self.grab(path) assert isinstance(link, SymLink) if link.data is None: print path channel = self.gateway.remote_exec(""" import os channel.send(os.readlink(channel.receive())) """) channel.send(path) link.data = channel.receive() return link.data class CWExecNetFS(ROExecNetFS): def changing(self, node, path, *args): if not node.modif: if isinstance(node, File): self.read(path, 0, 0) elif isinstance(node, Dir): self.readdir(path) elif isinstance(node, SymLink): self.readlink(path) node.modif = True self.operation(path, *args) def operation(self, *args): op = args[-1] print op.upper(), if op == 'write': print '%r -- %d bytes at position %d' % (args[0], len(args[2]), args[1]) else: print ', '.join([repr(a) for a in args[:-1]]) def mkdir(self, dirname, mode): node, name = self.grabparent(dirname) self.operation(dirname, mode, 'mkdir') newnode = Dir(newstat(S_IFDIR, mode)) newnode.entries = {} node.entries[name] = newnode node.st[ST_NLINK] += 1 def rmdir(self, dirname): node, name = self.grabparent(dirname) assert isinstance(node.entries[name], Dir) self.operation(dirname, 'rmdir') del node.entries[name] if node.st[ST_NLINK] > 2: node.st[ST_NLINK] -= 1 def unlink(self, filename): node, name = self.grabparent(filename) assert not isinstance(node.entries[name], Dir) self.operation(filename, 'unlink') del node.entries[name] def rename(self, fromname, toname): node1, name1 = self.grabparent(fromname) node2, name2 = self.grabparent(toname) node = node1.entries[name1] self.changing(node, fromname, toname, 'rename') node2.entries[name2] = node del node1.entries[name1] if isinstance(node, Dir): node2.st[ST_NLINK] += 1 if node1.st[ST_NLINK] > 2: node1.st[ST_NLINK] -= 1 def create(self, filename, mode): node, name = self.grabparent(filename) if name in node.entries: assert isinstance(node.entries[name], File) self.operation(filename, mode, 'create') newnode = File(newstat(S_IFREG, mode)) newnode.content = '' node.entries[name] = newnode def write(self, filename, offset, buf): node = self.grab(filename) assert isinstance(node, File) self.changing(node, filename, offset, buf, 'write') data = node.content[:offset] if len(data) < offset: data += '\x00' * (offset - len(data)) data += buf data += node.content[offset+len(buf):] node.content = data node.st[ST_MTIME] = time.time() node.st[ST_SIZE] = len(node.content) return len(buf) def link(self, target, link): node = self.grab(target) if isinstance(node, Dir): raise IOError("no directory hard links") self.changing(node, target, link, 'link') parent, name = self.grabparent(link) assert name not in parent.entries parent.entries[name] = node def symlink(self, target, link): parent, name = self.grabparent(link) assert name not in parent.entries self.operation(link, target, 'symlink') newnode = SymLink(newstat(S_IFLNK, 0777)) newnode.data = target parent.entries[name] = newnode def setattr(self, filename, attribs): node = self.grab(filename) self.changing(node, filename, attribs, 'setattr') mode, nlink, uid, gid, size, atime, mtime, ctime = attribs if size is not None and isinstance(node, File): if size <= len(node.content): node.content = node.content[:size] else: node.content += '\x00' * (size - len(node.content)) if mode is not None: node.st[ST_MODE] = (node.st[ST_MODE] &~ 07777) | (mode & 07777) if atime is not None: node.st[ST_ATIME] = atime if mtime is not None: node.st[ST_MTIME] = mtime if ctime is not None: node.st[ST_CTIME] = ctime REMOTE_WRITER = """ import os def mkdir(dirname, mode): os.mkdir(dirname, mode) def rmdir(dirname): os.rmdir(dirname) def unlink(filename): os.unlink(filename) def rename(fromname, toname): os.rename(fromname, toname) def create(filename, mode): fd = os.open(filename, os.O_CREAT|os.O_WRONLY|os.O_TRUNC, mode) os.close(fd) def write(filename, offset, buf): fd = os.open(filename, os.O_WRONLY) try: os.lseek(fd, offset, 0) while buf: count = os.write(fd, buf) buf = buf[count:] finally: os.close(fd) def link(target, link): os.link(target, link) # link/symlink: reversed argument order def symlink(link, target): os.symlink(target, link) def setattr(filename, attribs): st = os.lstat(filename) mode, nlink, uid, gid, size, atime, mtime, ctime = attribs if st.st_size != size is not None: fd = os.open(filename, os.O_WRONLY) os.ftruncate(fd, size) os.close(fd) st = os.lstat(filename) if st.st_mode != mode is not None: os.chmod(filename, mode) if (st.st_mtime != mtime or st.st_atime != atime) and mtime is not None: os.utime(filename, (atime, mtime)) operations = locals() for msg in channel: op = msg[-1] args = msg[:-1] try: operations[op](*args) except Exception, e: error = '%s: %s' % (e.__class__.__name__, e) else: error = None channel.send(error) """ class RemoteWriterError(Exception): pass class SWExecNetFS(CWExecNetFS): opchannel = None def operation(self, *args): CWExecNetFS.operation(self, *args) if self.opchannel is None: self.opchannel = self.gateway.remote_exec(REMOTE_WRITER) self.opchannel.send(args) error = self.opchannel.receive() if error: raise RemoteWriterError(error) class BWExecNetFS(CWExecNetFS): opchannel = None def __init__(self, gateway): CWExecNetFS.__init__(self, gateway) try: me = __file__ except NameError: import sys; me = sys.argv[0] executable = os.path.join(os.path.dirname(me), 'xcolorbox') self.xcolorbox = os.popen(executable, 'w') def operation(self, *args): CWExecNetFS.operation(self, *args) if self.opchannel is None: self.remotepending = 0 self.remotelock = thread.allocate_lock() self.opchannel = self.gateway.newchannel(self.remote_notif) self.gateway.remote_exec(REMOTE_WRITER, channel=self.opchannel) self.opchannel.send(args) self.remotelock.acquire() try: if self.remotepending == 0: print >> self.xcolorbox, "red" self.xcolorbox.flush() self.remotepending += 1 finally: self.remotelock.release() def remote_notif(self, error): self.remotelock.acquire() try: self.remotepending -= 1 if self.remotepending == 0: color = "black" elif self.remotepending & 1: color = "brown" else: color = "red" print >> self.xcolorbox, color self.xcolorbox.flush() finally: self.remotelock.release() # ____________________________________________________________ class Serializer(FileSystem): def __init__(self, fs): lock = thread.allocate_lock() for name in dir(fs): if not name.startswith('_'): obj = getattr(fs, name) if callable(obj): self._install(name, obj, lock) def _install(self, name, method, lock): if method.im_func == NO_OP: locker = NO_OP else: def locker(*args): lock.acquire() try: #print name, args return method(*args) finally: lock.release() setattr(self, name, locker) # ____________________________________________________________ def ExecNetFSFactory(gw): fs = globals()[WRITE_POLICY+'ExecNetFS'](gw) fs = Serializer(fs) return lambda: fs if __name__ == '__main__': import sys hostname = sys.argv[1] gw = py.execnet.SshGateway(hostname) lufs.mount(ExecNetFSFactory(gw), sys.argv[2])