import os, sys try: from stackless import greenlet except ImportError: import py greenlet = py.magic.greenlet from collections import deque from select import select as _select from time import time as _time from heapq import heappush, heappop, heapify TRACE = False def meetingpoint(): senders = deque() # list of senders, or [None] if Giver closed receivers = deque() # list of receivers, or [None] if Receiver closed return (MeetingPointGiver(senders, receivers), MeetingPointAccepter(senders, receivers)) def producer(func, *args, **kwds): iterable = func(*args, **kwds) giver, accepter = meetingpoint() def autoproducer(): try: giver.wait() for obj in iterable: giver.give(obj) giver.wait() finally: giver.close() autogreenlet(autoproducer) return accepter class MeetingPointBase(object): def __init__(self, senders, receivers): self.senders = senders self.receivers = receivers self.g_active = g_active def close(self): while self.senders: if self.senders[0] is None: break packet = self.senders.popleft() if packet.g_from is not None: self.g_active.append(packet.g_from) else: self.senders.append(None) while self.receivers: if self.receivers[0] is None: break other = self.receivers.popleft() self.g_active.append(other) else: self.receivers.append(None) __del__ = close def closed(self): return self.receivers and self.receivers[0] is None class MeetingPointGiver(MeetingPointBase): def give(self, obj): if self.receivers: if self.receivers[0] is None: raise MeetingPointClosed other = self.receivers.popleft() g_active.append(g_getcurrent()) packet = _Packet() packet.payload = obj other.switch(packet) if not packet.accepted: raise Interrupted("packet not accepted") else: packet = _Packet() packet.g_from = g_getcurrent() packet.payload = obj try: self.senders.append(packet) g_dispatcher.switch() if not packet.accepted: raise Interrupted("packet not accepted") except: remove_by_id(self.senders, packet) raise def give_queued(self, obj): if self.receivers: self.give(obj) else: packet = _Packet() packet.g_from = None packet.payload = obj self.senders.append(packet) def ready(self): return self.receivers and self.receivers[0] is not None def wait(self): if self.receivers: if self.receivers[0] is None: raise MeetingPointClosed else: packet = _Packet() packet.g_from = g_getcurrent() packet.empty = True self.senders.append(packet) try: g_dispatcher.switch() if not packet.accepted: raise Interrupted("no accepter found") except: remove_by_id(self.senders, packet) raise def trigger(self): if self.ready(): self.give(None) class MeetingPointAccepter(MeetingPointBase): def accept(self): while self.senders: if self.senders[0] is None: raise MeetingPointClosed packet = self.senders.popleft() packet.accepted = True if packet.g_from is not None: g_active.append(packet.g_from) if not packet.empty: return packet.payload g = g_getcurrent() self.receivers.append(g) try: packet = g_dispatcher.switch() except: remove_by_id(self.receivers, g) raise if type(packet) is not _Packet: remove_by_id(self.receivers, g) raise Interrupted("no packet") packet.accepted = True return packet.payload def ready(self): for packet in self.senders: if packet is None: return False if not packet.empty: return True return False def wait_trigger(self, timeout=None, default=None): if timeout is None: return self.accept() else: timer = Timer(timeout) try: try: return self.accept() finally: timer.stop() except Interrupted: if timer.finished: return default raise class MeetingPointClosed(greenlet.GreenletExit): pass class Interrupted(greenlet.GreenletExit): pass class ConnexionClosed(greenlet.GreenletExit): pass class _Packet(object): empty = False accepted = False def remove_by_id(d, obj): lst = [x for x in d if x is not obj] d.clear() d.extend(lst) # ____________________________________________________________ ##class Queue(object): ## def __init__(self): ## self.giver, self.accepter = meetingpoint() ## self.pending = deque() ## def put(self, item): # preserve the caller's atomicity ## self.pending.append(item) ## if self.accepter.ready(): ## self.accepter.accept() ## def get(self, block=True): ## if self.pending: ## return self.pending.popleft() ## elif block: ## self.giver.give(None) ## return self.pending.popleft() ## else: ## raise Empty ##class Empty(Interrupted): ## pass ##class Event(object): ## def __init__(self): ## self.giver, self.accepter = meetingpoint() ## clear = __init__ ## def isSet(self): ## return self.accepter is None ## def set(self): # preserve the caller's atomicity ## if self.accepter is not None: ## accepter = self.accepter ## self.giver = self.accepter = None ## while accepter.ready(): # wake up all waiters ## accepter.accept() ## def wait(self, timeout=None): ## if self.accepter is not None: ## if timeout is None: ## self.giver.give(None) ## else: ## timer = Timer(timeout) ## try: ## try: ## self.giver.give(None) ## except Interrupted: ## pass ## finally: ## timer.stop() ##class Semaphore(object): ## def __init__(self, value=1): ## self.giver, self.accepter = meetingpoint() ## for i in range(value): ## self.release() ## def acquire(self, blocking=True): ## if blocking or self.accepter.ready(): ## return self.accepter.accept() ## else: ## return False ## def release(self): ## autogreenlet(self.giver.put, True) # ____________________________________________________________ def wait_input(sock): _register(g_iwtd, sock) def recv(sock, bufsize): wait_input(sock) buf = sock.recv(bufsize) if not buf: raise ConnexionClosed("inbound connexion closed") return buf def recvall(sock, bufsize): in_front = False data = [] while bufsize > 0: _register(g_iwtd, sock, in_front=in_front) buf = sock.recv(bufsize) if not buf: raise ConnexionClosed("inbound connexion closed") data.append(buf) bufsize -= len(buf) in_front = True return ''.join(data) def read(fd, bufsize): assert fd >= 0 wait_input(fd) buf = os.read(fd, bufsize) if not buf: raise ConnexionClosed("inbound connexion closed") return buf def readall(fd, bufsize): assert fd >= 0 in_front = False data = [] while bufsize > 0: _register(g_iwtd, fd, in_front=in_front) buf = os.read(fd, bufsize) if not buf: raise ConnexionClosed("inbound connexion closed") data.append(buf) bufsize -= len(buf) in_front = True return ''.join(data) def wait_output(sock): _register(g_owtd, sock) def sendall(sock, buffer): in_front = False while buffer: _register(g_owtd, sock, in_front=in_front) count = sock.send(buffer) buffer = buffer[count:] in_front = True def writeall(fd, buffer): assert fd >= 0 in_front = False while buffer: _register(g_owtd, fd, in_front=in_front) count = os.write(fd, buffer) if not count: raise ConnexionClosed("outbound connexion closed") buffer = buffer[count:] in_front = True def sleep(duration): timer = Timer(duration) try: wait() finally: ok = timer.finished timer.stop() if not ok: raise Interrupted def wait(): g_dispatcher.switch() class Timer(object): started = False finished = False def __init__(self, timeout): self.g = g_getcurrent() entry = (_time() + timeout, self) if g_timers_mixed: g_timers.append(entry) else: heappush(g_timers, entry) def stop(self): global g_timers_mixed if not self.finished: for i, (activationtime, timer) in enumerate(g_timers): if timer is self: g_timers[i] = g_timers[-1] g_timers.pop() g_timers_mixed = True break self.finished = True # ____________________________________________________________ class autogreenlet(greenlet): def __init__(self, function, *args, **kwds): self.parent = g_dispatcher self.function = function self.args = args self.kwds = kwds g_active.append(self) def run(self): self.trace("start") try: self.function(*self.args, **self.kwds) except Exception, e: self.trace("stop (%s%s)", e.__class__.__name__, str(e) and (': '+str(e))) raise else: self.trace("done") def __repr__(self): ## args = ', '.join([repr(s) for s in self.args] + ## ['%s=%r' % keyvalue for keyvalue in self.kwds.items()]) ## return '' % (self.function.__name__, args) return '' % (self.function.__name__, hex(id(self))) def trace(self, msg, *args): if TRACE: print self, msg % args def interrupt(self): self.throw(Interrupted) g_active = deque() g_iwtd = {} g_owtd = {} g_timers = [] g_timers_mixed = False g_getcurrent = greenlet.getcurrent def _register(g_wtd, sock, in_front=False): d = g_wtd.setdefault(sock, deque()) g = g_getcurrent() if in_front: d.appendleft(g) else: d.append(g) try: if g_dispatcher.switch() is not g_wtd: raise Interrupted except: remove_by_id(d, g) raise ##def _unregister_timer(): ## ... def check_dead_greenlets(mapping): to_remove = [i for i, v in mapping.items() if not v] for k in to_remove: del mapping[k] def dispatcher_mainloop(): global g_timers_mixed while 1: try: while g_active: #print 'active:', g_active[0] g_active.popleft().switch() if g_timers: if g_timers_mixed: heapify(g_timers) g_timers_mixed = False activationtime, timer = g_timers[0] delay = activationtime - _time() if delay <= 0.0: if timer.started: heappop(g_timers) #print 'timeout:', g timer.finished = True timer.g.switch() continue delay = 0.0 timer.started = True else: check_dead_greenlets(g_iwtd) check_dead_greenlets(g_owtd) if not (g_iwtd or g_owtd): # nothing to do, switch to the main greenlet g_dispatcher.parent.switch() continue delay = None #print 'selecting...', g_iwtd.keys(), g_owtd.keys(), delay iwtd, owtd, _ = _select(g_iwtd.keys(), g_owtd.keys(), [], delay) #print 'done' for s in owtd: if s in g_owtd: d = g_owtd[s] #print 'owtd:', d[0] g = d.popleft() if not d: try: del g_owtd[s] except KeyError: pass g.switch(g_owtd) for s in iwtd: if s in g_iwtd: d = g_iwtd[s] #print 'iwtd:', d[0] g = d.popleft() if not d: try: del g_iwtd[s] except KeyError: pass g.switch(g_iwtd) except: import sys g_dispatcher.parent.throw(*sys.exc_info()) g_dispatcher = greenlet(dispatcher_mainloop)