import os, thread, sys from collections import deque from select import select as _select from time import sleep as _sleep class Channel(object): def __init__(self): self.receivers = deque() self.senders = deque() def send(self, obj, block=True): assert g_is_greenlet() # XXX for now self.content = obj if self.receivers: other = self.receivers.popleft() other.release() print 'send1', obj g_global_lock.acquire() return True elif block: lock = thread.allocate_lock() lock.acquire() self.senders.append((lock, obj)) g_inc_count(-1) g_global_lock.release() print 'send2', obj lock.acquire() g_global_lock.acquire() return True else: return False def receive(self): assert g_is_greenlet() # XXX for now while self.senders: other, obj = self.senders.popleft() other.release() print 'receive1', obj g_inc_count(+1) if obj is not EMPTY: return obj lock = thread.allocate_lock() lock.acquire() self.receivers.append(lock) g_inc_count(-1) g_global_lock.release() print 'receive2' lock.acquire() g_inc_count(+1) obj = self.content del self.content print 'got', obj return obj def wait_sendable(self): assert g_is_greenlet() # XXX for now if not self.receivers: self.send(EMPTY) EMPTY = object() def wait_input(sock): withlock_autogreenlet(_select, [sock], [], []) def recv(sock, bufsize): return withlock_autogreenlet(sock.recv, bufsize) def read(fd, bufsize): return withlock_autogreenlet(os.read, fd, bufsize) def wait_output(sock): withlock_autogreenlet(_select, [], [sock], []) def sendall(sock, buffer): withlock_autogreenlet(sock.sendall, buffer) def writeall(fd, buffer): withlock_autogreenlet(_os_writeall, fd, buffer) def _os_writeall(fd, buffer): while buffer: count = os.write(fd, buffer) if not count: raise EOFError("outbound connexion closed") buffer = buffer[count:] def sleep(duration): withlock_autogreenlet(_sleep, duration) def withlock_autogreenlet(fn, *args): if g_is_greenlet(): g_global_lock.release() try: return fn(*args) finally: g_global_lock.acquire() else: return fn(*args) # ____________________________________________________________ g_global_lock = thread.allocate_lock() g_global_lock.acquire() g_greenlet_ids = {thread.get_ident(): True} g_active_count = 0 g_active_lock = thread.allocate_lock() class GreenletExit(Exception): pass def g_is_greenlet(): return thread.get_ident() in g_greenlet_ids def g_inc_count(n): global g_active_count assert g_is_greenlet() # XXX for now print 'g_inc_count', g_active_count, n if g_active_count <= 0: g_active_lock.acquire() g_active_count += n if g_active_count <= 0: g_active_lock.release() assert g_active_count >= 0 print 'ok' def autogreenlet(function, *args, **kwds): glock = g_global_lock def greenlet(): print 'greenlet starting...' glock.acquire() print 'greenlet started' try: id = thread.get_ident() g_greenlet_ids[id] = True try: function(*args, **kwds) except GreenletExit: pass except: global g_exception g_exception = sys.exc_info() g_active_lock.release() return g_inc_count(-1) del g_greenlet_ids[id] finally: glock.release() g_inc_count(+1) return thread.start_new_thread(greenlet, ()) def mainloop(): global g_exception assert g_is_greenlet() g_exception = None g_global_lock.release() try: print 'mainloop waiting...' g_active_lock.acquire() print 'mainloop done' g_active_lock.release() finally: g_global_lock.acquire() print 'mainloop finished' if g_exception is not None: etype, evalue, etb = g_exception raise etype, evalue, etb