""" Generic VT102 emulator. """ import re class VT102: r_control_chars = re.compile(r"([\000-\033])") r_escape_sequence = re.compile(r"\033[\040-\057]*[\060-\176]") r_control_sequence = re.compile(r"\033\133([\060-\077]*)" "[\040-\057]*[\100-\176]") def __init__(self, width=80, height=24): self.width = width self.height = height self.reset() def reset(self): self.lines = [''] * self.height self.curx = self.cury = 0 self.topmargin = 0 self.bottommargin = self.height self.controlseq = '' self.modes = {} def screenshot(self): return '\n'.join(self.lines) def terminal_input(self, buf): """Update the virtual terminal screen from the input data in 'buf'.""" for part in self.r_control_chars.split(buf): if part >= ' ': if self.controlseq: part = self.process_sequence(self.controlseq + part) if not part or part.startswith('\033'): self.controlseq = part continue self.controlseq = '' self.scr_write(part) else: # a single control character HANDLER[part](self) def writekey(self, keyname): self.write(KEYSEQUENCE[keyname]) def write(self, buf): """Abstract method to send data as if coming from the terminal.""" def scr_write(self, text): # write regular text pstart = 0 line = self.lines[self.cury] if len(line) < self.curx: line += ' ' * (self.curx - len(line)) self.lines[self.cury] = line endx = self.curx + len(text) while endx > self.width: # line too long, wrap-around keep = self.width - self.curx self.lines[self.cury] = (line[:self.curx] + text[pstart:pstart+keep]) pstart += keep self.scr_newline() line = self.lines[self.cury] endx = len(text) - pstart self.lines[self.cury] = (line[:self.curx] + text[pstart:] + line[endx:]) self.curx = endx def scr_newline(self): self.curx = 0 self.scr_linefeed() def scr_null(self): pass def scr_enquire(self): self.write("\033[?1;2c") # vt100 answerback def scr_bell(self): pass def scr_backspace(self): # '\b' if self.curx > 0: self.curx -= 1 def scr_horizontal_tab(self): # '\t' self.curx = (self.curx + 8) & ~7 if self.curx > self.width: self.curx = self.width def scr_linefeed(self): # '\n' if self.cury == self.bottommargin-1: # scroll self.lines[self.topmargin:self.bottommargin] = ( self.lines[self.topmargin+1:self.bottommargin] + ['']) else: self.cury += 1 scr_vertical_tab = scr_linefeed scr_form_feed = scr_linefeed def scr_carriage_return(self): # '\r' self.curx = 0 def scr_cancel(self): self.controlseq = '' self.scr_write('?') scr_substitute = scr_cancel def scr_escape(self): self.controlseq = '\033' def process_sequence(self, controlseq): match = self.r_control_sequence.match(controlseq) if match: end = match.end() self.process_csi(match.group(1), controlseq[end-1]) return controlseq[end:] match = self.r_escape_sequence.match(controlseq) if match: end = match.end() self.process_oldstyle_escape(controlseq[:end]) return controlseq[end:] return controlseq def process_csi(self, parameters, op): if '<' <= parameters < '@': priv = parameters[0] parameters = parameters[1:] else: priv = '' if parameters: try: numeric_args = [int(n) for n in parameters.split(';')] except ValueError: return else: numeric_args = () CSI_HANDLER[op](self, priv, *numeric_args) def getline(self): if self.curx >= self.width: self.curx = self.width - 1 line = self.lines[self.cury] line += ' ' * (self.curx - len(line)) return line def process_csi_r(self, priv, pt=0, pb=0, *rest): "Set top and bottom margins" pb = pb or self.height pt = max(pt - 1, 0) if pb >= pt+2: self.topmargin = pt self.bottommargin = pb self.curx = 0 self.cury = self.topmargin def process_csi_A(self, priv, pn=0, *rest): "Cursor up" self.cury = max(self.cury - (pn or 1), self.topmargin) def process_csi_B(self, priv, pn=0, *rest): "Cursor down" self.cury = min(self.cury + (pn or 1), self.bottommargin-1) def process_csi_C(self, priv, pn=0, *rest): "Cursor forward" self.curx = min(self.curx + (pn or 1), self.width-1) def process_csi_D(self, priv, pn=0, *rest): "Cursor backward" self.curx = max(self.curx - (pn or 1), 0) def process_csi_H(self, priv, pl=0, pc=0, *rest): "Cursor position" self.curx = bound(pc-1, 0, self.width) self.cury = bound(pl-1, self.topmargin, self.bottommargin) process_csi_f = process_csi_H def process_csi_K(self, priv, what=0, *rest): "Erase in line" line = self.getline() if what == 0: # to end of line line = line[:self.curx] elif what == 1: # from start of line line = ' ' * (self.curx+1) + line[self.curx+1:] elif what == 2: # complete line line = ' ' * self.curx self.lines[self.cury] = line def process_csi_J(self, priv, what=0, *rest): "Erase in display" self.process_csi_K(priv, what) if what in (0, 2): # to end of screen for i in range(self.cury+1, self.height): self.lines[i] = '' if what in (1, 2): # from start of screen for i in range(self.cury): self.lines[i] = '' def process_csi_m(self, priv, *rest): pass # XXX attributes def process_csi_h(self, priv, mode=0, *rest): if priv == '?': self.modes[mode] = True def process_csi_l(self, priv, mode=0, *rest): if priv == '?': self.modes.pop(mode, False) def process_oldstyle_escape(self, seq): OLDSTYLE_HANDLER[seq[-1]](self, seq[1:-1]) def process_oldstyle_EQUALS_SIGN(self, params): self.modes[APPLICATION_KEYPAD_MODE] = True def process_oldstyle_GREATER_THAN_SIGN(self, params): self.modes.pop(APPLICATION_KEYPAD_MODE, False) def bound(x, min_included, max_excluded): return max(min(x, max_excluded-1), min_included) # ____________________________________________________________ CONTROL_CHARS = { 000: 'null', #003: 'end_of_text', #004: 'end_of_transmission', 005: 'enquire', 007: 'bell', 010: 'backspace', 011: 'horizontal_tab', 012: 'linefeed', 013: 'vertical_tab', 014: 'form_feed', 015: 'carriage_return', #016: 'shift_out', #017: 'shift_in', #021: 'device_control_1', #023: 'device_control_3', 030: 'cancel', 032: 'substitute', 033: 'escape', } CURSOR_KEY_MODE = 1 APPLICATION_KEYPAD_MODE = 'aplKP' def noop(*args): pass def ignored_csi_sequence(op, self, priv, *numeric_args): print 'ignored: ESC[%s%s%s' % (priv, ';'.join(map(str, numeric_args)), chr(op)) def ignored_oldstyle_sequence(op, params): print 'ignored: ESC %s %s' % (params, op) HANDLER = {'': noop} for num in range(32): name = CONTROL_CHARS.get(num, 'null') HANDLER[chr(num)] = VT102.__dict__['scr_' + name] del num, name CSI_HANDLER = {} for op in range(0100, 0177): CSI_HANDLER[chr(op)] = VT102.__dict__.get('process_csi_' + chr(op), ignored_csi_sequence.__get__(op)) OLDSTYLE_HANDLER = {} for op in range(0060, 0177): ch = chr(op) if not ch.isalnum(): import unicodedata # :-) ch = unicodedata.name(unicode(ch)).replace(' ', '_').replace('-', '_') OLDSTYLE_HANDLER[chr(op)] = VT102.__dict__.get( 'process_oldstyle_' + ch, ignored_oldstyle_sequence.__get__(ch)) del op, ch KEYSEQUENCE = { 'UP' : '\033OA', 'DOWN' : '\033OB', 'RIGHT' : '\033OC', 'LEFT' : '\033OD', 'ESCAPE' : '\033', #'PAGEUP' : '\033[5~', #'PAGEDOWN' : '\033[6~', #'HOME' : '\033[1~', #'END' : '\033[4~', 'BACKSPACE': '\b', #'DELETE' : 'mess mess mess', }