import os class Piper: def run(self, *args): readend, writeend = os.pipe() if not os.fork(): try: # in the child os.close(readend) self.generate(writeend, *args) finally: os._exit(0) os.close(writeend) self.readend = readend def __del__(self): if hasattr(self, 'readend'): os.close(self.readend) def fileno(self): return self.readend def read(self): result = [] while 1: t = os.read(self.readend, 4096) if not t: break result.append(t) return ''.join(result) def as_file(self): f = os.fdopen(self.readend) del self.readend return f class StringWriter(Piper): def __init__(self, data): self.run(data) def generate(self, writeend, data): while data: data = data[os.write(writeend, data):] class Process(Piper): def __init__(self, executable, *args): args = list(args) for i in range(len(args)): arg = args[i] if hasattr(arg, 'fileno'): fd = arg.fileno() args[i] = '/dev/fd/%d' % fd self.run(executable, args) def generate(self, writeend, executable, args): os.dup2(writeend, 1) os.close(writeend) os.close(0) os.execvp(executable, [executable] + args)