import os from py.magic import greenlet from collections import deque from select import select as _select from time import time as _time from heapq import heappush, heappop class Channel(object): def __init__(self): self.receivers = deque() self.senders = deque() def send(self, obj, block=True): if self.receivers: other = self.receivers.popleft() g_active.append(greenlet.getcurrent()) other.switch(obj) return True elif block: self.senders.append((greenlet.getcurrent(), obj)) g_dispatcher.switch() return True else: return False def receive(self): #, timeout=None): while self.senders: other, obj = self.senders.popleft() g_active.append(other) if obj is not EMPTY: return obj self.receivers.append(greenlet.getcurrent()) #if timeout is not None: # g_timers obj = g_dispatcher.switch() #if obj is Timeout: # raise Timeout return obj def wait_sendable(self): if not self.receivers: self.send(EMPTY) #class Timeout(Exception): # pass EMPTY = object() def wait_input(sock): _register(g_iwtd, sock) g_dispatcher.switch() def recv(sock, bufsize): wait_input(sock) return sock.recv(bufsize) def read(fd, bufsize): assert fd >= 0 wait_input(fd) return os.read(fd, bufsize) def wait_output(sock): _register(g_owtd, sock) g_dispatcher.switch() def sendall(sock, buffer): g = greenlet.getcurrent() in_front = False while buffer: _register(g_owtd, sock, in_front=in_front) g_dispatcher.switch() count = sock.send(buffer) buffer = buffer[count:] in_front = True def writeall(fd, buffer): assert fd >= 0 g = greenlet.getcurrent() in_front = False while buffer: _register(g_owtd, fd, in_front=in_front) g_dispatcher.switch() count = os.write(fd, buffer) if not count: raise EOFError("outbound connexion closed") buffer = buffer[count:] in_front = True def sleep(duration): _register_timer(duration) g_dispatcher.switch() # ____________________________________________________________ class autogreenlet(greenlet): def __init__(self, function, *args, **kwds): self.parent = g_dispatcher self.function = function self.args = args self.kwds = kwds g_active.append(self) def run(self): self.function(*self.args, **self.kwds) def __repr__(self): args = ', '.join([repr(s) for s in self.args] + ['%s=%r' % keyvalue for keyvalue in self.kwds.items()]) return '' % (self.function.__name__, args) g_dispatcher = greenlet.getcurrent() g_active = deque() g_iwtd = {} g_owtd = {} g_timers = [] def _register(g_wtd, sock, in_front=False): d = g_wtd.setdefault(sock, deque()) if in_front: d.appendleft(greenlet.getcurrent()) else: d.append(greenlet.getcurrent()) def _register_timer(duration): heappush(g_timers, (_time() + duration, greenlet.getcurrent())) ##def _unregister_timer(): ## ... def mainloop(): while 1: while g_active: #print 'active:', g_active[0] g_active.popleft().switch() if g_timers: activationtime, g = g_timers[0] delay = activationtime - _time() if delay <= 0.0: heappop(g_timers) #print 'timeout:', g g.switch() #Timeout) continue else: if not (g_iwtd or g_owtd): return delay = None #print 'selecting...', g_iwtd.keys(), g_owtd.keys(), delay iwtd, owtd, ewtd = _select(g_iwtd.keys(), g_owtd.keys(), [], delay) for s in owtd: if s in g_owtd: d = g_owtd[s] #print 'owtd:', d[0] d.popleft().switch() if not d: try: del g_owtd[s] except KeyError: pass for s in iwtd: if s in g_iwtd: d = g_iwtd[s] #print 'iwtd:', d[0] d.popleft().switch() if not d: try: del g_iwtd[s] except KeyError: pass