import os, pty import greensock2 from vt102 import VT102 class PtyController(object): def __init__(self, cmd, args, env=None): childpid, ptyfd = pty.fork() if childpid == 0: # in the child os.execvpe(cmd, args, env) self.childpid = childpid self.ptyfd = ptyfd input_giver, self.input = greensock2.meetingpoint() self.output, output_accepter = greensock2.meetingpoint() greensock2.autogreenlet(self._read_from_child, input_giver) greensock2.autogreenlet(self._write_to_child, output_accepter) def _read_from_child(self, input_giver): try: while True: try: buf = greensock2.read(self.ptyfd, 2048) except OSError: break #print `buf` input_giver.give(buf) finally: input_giver.close() os.close(self.ptyfd) def _write_to_child(self, output_accepter): try: while True: buf = output_accepter.accept() try: greensock2.writeall(self.ptyfd, buf) except OSError: break finally: output_accepter.close() def write(self, buf): self.output.give(buf) def read(self): return self.input.accept() def readall(self): result = [] try: while True: result.append(self.input.accept()) except (greensock2.MeetingPointClosed, greensock2.Interrupted): pass return ''.join(result) class VTermBuffer(VT102): def __init__(self, input_accepter, output_giver, width=80, height=24): VT102.__init__(self, width=width, height=height) self.output_giver = output_giver greensock2.autogreenlet(self._handle_input, input_accepter) def screenshot(self, delay=0.01): # delay can be >= 0, but even sleep(0) is necessary to make sure # that at least some fresh data can arrive greensock2.sleep(delay) return VT102.screenshot(self) def _handle_input(self, input_accepter): while True: self.terminal_input(input_accepter.accept()) def expect(self, text): # for tests for i in range(50): greensock2.sleep(0.1) image = self.screenshot() print print '-'*self.width print image print '-'*self.width if text in image: break # ok else: raise AssertionError("expected text %r not showing up" % (text,)) def write(self, buf): self.output_giver.give(buf) def run_vterm(cmd, args, env=None, **kwds): if env is None: env = os.environ.copy() env['TERM'] = 'vt102' ptyc = PtyController(cmd, args, env) return VTermBuffer(ptyc.input, ptyc.output, **kwds)