import py, Queue import branch from buffer import Buffer, Cursor source = py.path.local(__file__).new(basename='branch.py').read('r') source = py.code.Source(source, """ try: import sys, Queue, thread branch = sys.__dict__.setdefault('__execnet_buffer_branch', Branch()) branch.lock.acquire() try: head = branch.head mycursornb = 0 while mycursornb in head.cursors: mycursornb += 1 head.cursors[mycursornb] = 0 finally: branch.lock.release() channel.send(head.data) channel.send(mycursornb) subchannel = channel.receive() queue = Queue.Queue() STOP = object() MYID = object() except: import traceback traceback.print_exc(file=open('shped.ERROR-LOG','a')) def channel_reader(channel, queue): for msg in channel: queue.put(msg) queue.put(STOP) def head_watcher(head, queue, finished): while not finished: head.waitlock.acquire() head.waitlock.release() queue.put(None) head = head.next def event_handler(head, queue, channel, finished): knownremotehead = head try: while True: msg = queue.get() if msg is STOP: finished.append(1) break while True: while head.next is not None: channel.send((0, head.nextdiff)) head = head.next if msg is None: break nb_ack, diff = msg for i in range(nb_ack): assert knownremotehead is not head knownremotehead = knownremotehead.next rev = knownremotehead while rev is not head: if rev.nextdiffauthor is not MYID: diff = shiftpatch(diff, rev.nextdiff) rev = rev.next try: head = branch.commit(diff, head, MYID) except OutOfDateError: continue channel.send((1, diff)) break except: import traceback traceback.print_exc(file=open('shped.ERROR-LOG','a')) del activeclients[mycursornb] marker = [] thread.start_new_thread(channel_reader, (subchannel, queue)) thread.start_new_thread(head_watcher, (head, queue, marker)) thread.start_new_thread(event_handler, (head, queue, subchannel, marker)) """) class ExecnetBuffer(Buffer): def __init__(self, gateway): basechannel = gateway.remote_exec(source) self.queue = Queue.Queue() self.channel = gateway.newchannel() self.channel.setcallback(self.queue.put) basechannel.send(self.channel) print 'waiting...' initialdata = basechannel.receive() self.mycursornb = basechannel.receive() print 'ok' self.basedata = initialdata self.basecursors = {} self.localchanges = [] self.cursorcache = {} self.nb_ack = 0 Buffer.__init__(self, initialdata) def getmycursor(self): for cur in self.getcursors(): if cur.color == self.mycursornb: return cur raise ValueError, "lost my cursor" def modify(self, start, length, ndata): cur = self.getmycursor() if not Buffer.modify(self, start, length, ndata): if (self.localchanges and cur.pos == self.localchanges[-1][-1][self.mycursornb]): return diff = (start, length, ndata, {self.mycursornb: cur.pos}) self.channel.send((self.nb_ack, diff)) self.localchanges.append(diff) self.nb_ack = 0 def receive_msg(self, (echo, diff)): self.basedata = branch.patch(self.basedata, diff) self.basecursors = branch.patchcursors(self.basecursors, diff) if echo: del self.localchanges[0] else: self.localchanges = [branch.shiftpatch(localdiff, diff) for localdiff in self.localchanges] # apply the local changes over the current state data = self.basedata cursors = self.basecursors for diff in self.localchanges: data = branch.patch(data, diff) cursors = branch.patchcursors(cursors, diff) self._data = data # restore cursors ncursors = cursors.copy() ncache = {} for cur in self.getcursors(): pos = cur.pos if cur.color is not None: if cur.color in ncursors: pos = ncursors.pop(cur.color) ncache[cur] = True elif cur.color != self.mycursornb: cur.color = None if pos < 0: pos = 0 elif pos > len(data): pos = len(data) cur.pos = pos for ncolor, npos in ncursors.items(): if npos < 0: npos = 0 elif npos > len(data): npos = len(data) cur = Cursor(self, npos, ncolor) ncache[cur] = True self.cursorcache = ncache self.nb_ack += 1 def poll(self): result = False try: while True: msg = self.queue.get(block=False) self.receive_msg(msg) result = True except Queue.Empty: pass return result