[py-svn] r39973 - in py/trunk/py/net: . pipe server test

fijal at codespeak.net fijal at codespeak.net
Tue Mar 6 10:07:55 CET 2007


Author: fijal
Date: Tue Mar  6 10:07:53 2007
New Revision: 39973

Added:
   py/trunk/py/net/   (props changed)
   py/trunk/py/net/__init__.py   (contents, props changed)
   py/trunk/py/net/greenexecnet.py   (contents, props changed)
   py/trunk/py/net/greensock2.py   (contents, props changed)
   py/trunk/py/net/msgstruct.py   (contents, props changed)
   py/trunk/py/net/pipe/   (props changed)
   py/trunk/py/net/pipe/__init__.py   (contents, props changed)
   py/trunk/py/net/pipe/common.py   (contents, props changed)
   py/trunk/py/net/pipe/fd.py   (contents, props changed)
   py/trunk/py/net/pipe/gsocket.py   (contents, props changed)
   py/trunk/py/net/pipe/mp.py   (contents, props changed)
   py/trunk/py/net/pipelayer.py   (contents, props changed)
   py/trunk/py/net/server/   (props changed)
   py/trunk/py/net/server/__init__.py
   py/trunk/py/net/server/httpserver.py   (contents, props changed)
   py/trunk/py/net/test/   (props changed)
   py/trunk/py/net/test/test_greenexecnet.py   (contents, props changed)
   py/trunk/py/net/test/test_greensock2.py   (contents, props changed)
   py/trunk/py/net/test/test_pipelayer.py   (contents, props changed)
Log:
Add a (stolen almost directly from arigo/hack) network layer
based on top of greenlets. Needs some tweaking, so it's not exposed
as a py.xxx, but rather py.__.net


Added: py/trunk/py/net/__init__.py
==============================================================================

Added: py/trunk/py/net/greenexecnet.py
==============================================================================
--- (empty file)
+++ py/trunk/py/net/greenexecnet.py	Tue Mar  6 10:07:53 2007
@@ -0,0 +1,203 @@
+import sys, os, py, inspect
+from py.__.net import greensock2
+from py.__.net.msgstruct import message, decodemessage
+
+MSG_REMOTE_EXEC = 'r'
+MSG_OBJECT      = 'o'
+MSG_ERROR       = 'e'
+MSG_CHAN_CLOSE  = 'c'
+MSG_FATAL       = 'f'
+MSG_CHANNEL  = 'n'
+
+class Gateway(object):
+
+    def __init__(self, input, output, is_remote=False):
+        self.input  = input
+        self.output = output
+        self.nextchannum = int(is_remote)
+        self.receivers = {}
+        self.greenlet = greensock2.autogreenlet(self.serve_forever, is_remote)
+
+    def remote_exec(self, remote_source):
+        remote_source = py.code.Source(remote_source)
+        chan = self.newchannel()
+        msg = message(MSG_REMOTE_EXEC, chan.n, str(remote_source))
+        self.output.sendall(msg)
+        return chan
+
+    def newchannel(self):
+        n = self.nextchannum
+        self.nextchannum += 2
+        return self.make_channel(n)
+
+    def make_channel(self, n):
+        giver, accepter = greensock2.meetingpoint()
+        assert n not in self.receivers
+        self.receivers[n] = giver
+        return Channel(self, n, accepter)
+
+    def serve_forever(self, is_remote=False):
+        try:
+            buffer = ""
+            while 1:
+                msg, buffer = decodemessage(buffer)
+                if msg is None:
+                    buffer += self.input.recv(16384)
+                else:
+                    handler = HANDLERS[msg[0]]
+                    handler(self, *msg[1:])
+        except greensock2.greenlet.GreenletExit:
+            raise
+        except:
+            if is_remote:
+                msg = message(MSG_FATAL, format_error(*sys.exc_info()))
+                self.output.sendall(msg)
+            else:
+                raise
+
+    def msg_remote_exec(self, n, source):
+        def do_execute(channel):
+            try:
+                d = {'channel': channel}
+                exec source in d
+            except:
+                channel.report_error(*sys.exc_info())
+            else:
+                channel.close()
+        greensock2.autogreenlet(do_execute, self.make_channel(n))
+
+    def msg_object(self, n, objrepr):
+        obj = eval(objrepr)
+        if n in self.receivers:
+            self.receivers[n].give_queued(obj)
+
+    def msg_error(self, n, s):
+        if n in self.receivers:
+            self.receivers[n].give_queued(RemoteError(s))
+            self.receivers[n].close()
+            del self.receivers[n]
+
+    def msg_chan_close(self, n):
+        if n in self.receivers:
+            self.receivers[n].close()
+            del self.receivers[n]
+
+    def msg_channel(self, n, m):
+        if n in self.receivers:
+            self.receivers[n].give_queued(self.make_channel(m))
+
+    def msg_fatal(self, s):
+        raise RemoteError(s)
+
+HANDLERS = {
+    MSG_REMOTE_EXEC: Gateway.msg_remote_exec,
+    MSG_OBJECT:      Gateway.msg_object,
+    MSG_ERROR:       Gateway.msg_error,
+    MSG_CHAN_CLOSE:  Gateway.msg_chan_close,
+    MSG_CHANNEL:     Gateway.msg_channel,
+    MSG_FATAL:       Gateway.msg_fatal,
+    }
+
+
+class Channel(object):
+
+    def __init__(self, gw, n, accepter):
+        self.gw = gw
+        self.n = n
+        self.accepter = accepter
+
+    def send(self, obj):
+        if isinstance(obj, Channel):
+            assert obj.gw is self.gw
+            msg = message(MSG_CHANNEL, self.n, obj.n)
+        else:
+            msg = message(MSG_OBJECT, self.n, repr(obj))
+        self.gw.output.sendall(msg)
+
+    def receive(self):
+        obj = self.accepter.accept()
+        if isinstance(obj, RemoteError):
+            raise obj
+        else:
+            return obj
+
+    def close(self):
+        try:
+            self.gw.output.sendall(message(MSG_CHAN_CLOSE, self.n))
+        except OSError:
+            pass
+
+    def report_error(self, exc_type, exc_value, exc_traceback=None):
+        s = format_error(exc_type, exc_value, exc_traceback)
+        try:
+            self.gw.output.sendall(message(MSG_ERROR, self.n, s))
+        except OSError:
+            pass
+
+
+class RemoteError(Exception):
+    pass
+
+def format_error(exc_type, exc_value, exc_traceback=None):
+    import traceback, StringIO
+    s = StringIO.StringIO()
+    traceback.print_exception(exc_type, exc_value, exc_traceback, file=s)
+    return s.getvalue()
+
+
+class PopenCmdGateway(Gateway):
+    action = "exec input()"
+
+    def __init__(self, cmdline):
+        from py.__.net.pipe.fd import FDInput, FDOutput
+        child_in, child_out = os.popen2(cmdline, 't', 0)
+        fdin  = FDInput(child_out.fileno(), child_out.close)
+        fdout = FDOutput(child_in.fileno(), child_in.close)
+        fdout.sendall(self.get_bootstrap_code())
+        super(PopenCmdGateway, self).__init__(input = fdin, output = fdout)
+
+    def get_bootstrap_code():
+        # XXX assumes that the py lib is installed on the remote side
+        src = []
+        src.append('from py.__.net import greenexecnet')
+        src.append('greenexecnet.PopenCmdGateway.run_server()')
+        src.append('')
+        return '%r\n' % ('\n'.join(src),)
+    get_bootstrap_code = staticmethod(get_bootstrap_code)
+
+    def run_server():
+        from py.__.net.pipe.fd import FDInput, FDOutput
+        gw = Gateway(input = FDInput(os.dup(0)),
+                     output = FDOutput(os.dup(1)),
+                     is_remote = True)
+        # for now, ignore normal I/O
+        fd = os.open('/dev/null', os.O_RDWR)
+        os.dup2(fd, 0)
+        os.dup2(fd, 1)
+        os.close(fd)
+        greensock2.wait(gw.greenlet)
+    run_server = staticmethod(run_server)
+
+class PopenGateway(PopenCmdGateway):
+    def __init__(self, python=sys.executable):
+        cmdline = '"%s" -u -c "%s"' % (python, self.action)
+        super(PopenGateway, self).__init__(cmdline)
+
+class SshGateway(PopenCmdGateway):
+    def __init__(self, sshaddress, remotepython='python', identity=None):
+        self.sshaddress = sshaddress
+        remotecmd = '%s -u -c "%s"' % (remotepython, self.action)
+        cmdline = [sshaddress, remotecmd]
+        # XXX Unix style quoting
+        for i in range(len(cmdline)):
+            cmdline[i] = "'" + cmdline[i].replace("'", "'\\''") + "'"
+        cmd = 'ssh -C'
+        if identity is not None: 
+            cmd += ' -i %s' % (identity,)
+        cmdline.insert(0, cmd) 
+        super(SshGateway, self).__init__(' '.join(cmdline))
+
+
+##f = open('LOG', 'a')
+##import os; print >> f, '[%d] READY' % (os.getpid(),)
+##f.close()

Added: py/trunk/py/net/greensock2.py
==============================================================================
--- (empty file)
+++ py/trunk/py/net/greensock2.py	Tue Mar  6 10:07:53 2007
@@ -0,0 +1,526 @@
+import os, sys
+try:
+    from stackless import greenlet
+except ImportError:
+    import py
+    greenlet = py.magic.greenlet
+from collections import deque
+from select import select as _select
+from time import time as _time
+from heapq import heappush, heappop, heapify
+
+TRACE = True
+
+def meetingpoint():
+    senders = deque()    # list of senders, or [None] if Giver closed
+    receivers = deque()  # list of receivers, or [None] if Receiver closed
+    return (MeetingPointGiver(senders, receivers),
+            MeetingPointAccepter(senders, receivers))
+
+def producer(func, *args, **kwds):
+    iterable = func(*args, **kwds)
+    giver, accepter = meetingpoint()
+    def autoproducer():
+        try:
+            giver.wait()
+            for obj in iterable:
+                giver.give(obj)
+                giver.wait()
+        finally:
+            giver.close()
+    autogreenlet(autoproducer)
+    return accepter
+
+
+class MeetingPointBase(object):
+
+    def __init__(self, senders, receivers):
+        self.senders = senders
+        self.receivers = receivers
+        self.g_active = g_active
+
+    def close(self):
+        while self.senders:
+            if self.senders[0] is None:
+                break
+            packet = self.senders.popleft()
+            if packet.g_from is not None:
+                self.g_active.append(packet.g_from)
+        else:
+            self.senders.append(None)
+        while self.receivers:
+            if self.receivers[0] is None:
+                break
+            other = self.receivers.popleft()
+            self.g_active.append(other)
+        else:
+            self.receivers.append(None)
+
+    __del__ = close
+
+    def closed(self):
+        return self.receivers and self.receivers[0] is None
+
+
+class MeetingPointGiver(MeetingPointBase):
+
+    def give(self, obj):
+        if self.receivers:
+            if self.receivers[0] is None:
+                raise MeetingPointClosed
+            other = self.receivers.popleft()
+            g_active.append(g_getcurrent())
+            packet = _Packet()
+            packet.payload = obj
+            other.switch(packet)
+            if not packet.accepted:
+                raise Interrupted("packet not accepted")
+        else:
+            packet = _Packet()
+            packet.g_from = g_getcurrent()
+            packet.payload = obj
+            try:
+                self.senders.append(packet)
+                g_dispatcher.switch()
+                if not packet.accepted:
+                    raise Interrupted("packet not accepted")
+            except:
+                remove_by_id(self.senders, packet)
+                raise
+
+    def give_queued(self, obj):
+        if self.receivers:
+            self.give(obj)
+        else:
+            packet = _Packet()
+            packet.g_from = None
+            packet.payload = obj
+            self.senders.append(packet)
+
+    def ready(self):
+        return self.receivers and self.receivers[0] is not None
+
+    def wait(self):
+        if self.receivers:
+            if self.receivers[0] is None:
+                raise MeetingPointClosed
+        else:
+            packet = _Packet()
+            packet.g_from = g_getcurrent()
+            packet.empty = True
+            self.senders.append(packet)
+            try:
+                g_dispatcher.switch()
+                if not packet.accepted:
+                    raise Interrupted("no accepter found")
+            except:
+                remove_by_id(self.senders, packet)
+                raise
+
+    def trigger(self):
+        if self.ready():
+            self.give(None)
+
+
+class MeetingPointAccepter(MeetingPointBase):
+
+    def accept(self):
+        while self.senders:
+            if self.senders[0] is None:
+                raise MeetingPointClosed
+            packet = self.senders.popleft()
+            packet.accepted = True
+            if packet.g_from is not None:
+                g_active.append(packet.g_from)
+            if not packet.empty:
+                return packet.payload
+        g = g_getcurrent()
+        self.receivers.append(g)
+        try:
+            packet = g_dispatcher.switch()
+        except:
+            remove_by_id(self.receivers, g)
+            raise
+        if type(packet) is not _Packet:
+            remove_by_id(self.receivers, g)
+            raise Interrupted("no packet")
+        packet.accepted = True
+        return packet.payload
+
+    def ready(self):
+        for packet in self.senders:
+            if packet is None:
+                return False
+            if not packet.empty:
+                return True
+        return False
+
+    def wait_trigger(self, timeout=None, default=None):
+        if timeout is None:
+            return self.accept()
+        else:
+            timer = Timer(timeout)
+            try:
+                try:
+                    return self.accept()
+                finally:
+                    timer.stop()
+            except Interrupted:
+                if timer.finished:
+                    return default
+                raise
+
+
+class MeetingPointClosed(greenlet.GreenletExit):
+    pass
+
+class Interrupted(greenlet.GreenletExit):
+    pass
+
+class ConnexionClosed(greenlet.GreenletExit):
+    pass
+
+class _Packet(object):
+    empty = False
+    accepted = False
+
+def remove_by_id(d, obj):
+    lst = [x for x in d if x is not obj]
+    d.clear()
+    d.extend(lst)
+
+# ____________________________________________________________
+
+##class Queue(object):
+
+##    def __init__(self):
+##        self.giver, self.accepter = meetingpoint()
+##        self.pending = deque()
+
+##    def put(self, item):    # preserve the caller's atomicity
+##        self.pending.append(item)
+##        if self.accepter.ready():
+##            self.accepter.accept()
+
+##    def get(self, block=True):
+##        if self.pending:
+##            return self.pending.popleft()
+##        elif block:
+##            self.giver.give(None)
+##            return self.pending.popleft()
+##        else:
+##            raise Empty
+
+##class Empty(Interrupted):
+##    pass
+
+##class Event(object):
+
+##    def __init__(self):
+##        self.giver, self.accepter = meetingpoint()
+
+##    clear = __init__
+
+##    def isSet(self):
+##        return self.accepter is None
+
+##    def set(self):        # preserve the caller's atomicity
+##        if self.accepter is not None:
+##            accepter = self.accepter
+##            self.giver = self.accepter = None
+##            while accepter.ready():   # wake up all waiters
+##                accepter.accept()
+
+##    def wait(self, timeout=None):
+##        if self.accepter is not None:
+##            if timeout is None:
+##                self.giver.give(None)
+##            else:
+##                timer = Timer(timeout)
+##                try:
+##                    try:
+##                        self.giver.give(None)
+##                    except Interrupted:
+##                        pass
+##                finally:
+##                    timer.stop()
+
+##class Semaphore(object):
+
+##    def __init__(self, value=1):
+##        self.giver, self.accepter = meetingpoint()
+##        for i in range(value):
+##            self.release()
+
+##    def acquire(self, blocking=True):
+##        if blocking or self.accepter.ready():
+##            return self.accepter.accept()
+##        else:
+##            return False
+
+##    def release(self):
+##        autogreenlet(self.giver.put, True)
+
+# ____________________________________________________________
+
+def wait_input(sock):
+    _register(g_iwtd, sock)
+
+def recv(sock, bufsize):
+    wait_input(sock)
+    buf = sock.recv(bufsize)
+    if not buf:
+        raise ConnexionClosed("inbound connexion closed")
+    return buf
+
+def recvall(sock, bufsize):
+    in_front = False
+    data = []
+    while bufsize > 0:
+        _register(g_iwtd, sock, in_front=in_front)
+        buf = sock.recv(bufsize)
+        if not buf:
+            raise ConnexionClosed("inbound connexion closed")
+        data.append(buf)
+        bufsize -= len(buf)
+        in_front = True
+    return ''.join(data)
+
+def read(fd, bufsize):
+    assert fd >= 0
+    wait_input(fd)
+    buf = os.read(fd, bufsize)
+    if not buf:
+        raise ConnexionClosed("inbound connexion closed")
+    return buf
+
+def readall(fd, bufsize):
+    assert fd >= 0
+    in_front = False
+    data = []
+    while bufsize > 0:
+        _register(g_iwtd, fd, in_front=in_front)
+        buf = os.read(fd, bufsize)
+        if not buf:
+            raise ConnexionClosed("inbound connexion closed")
+        data.append(buf)
+        bufsize -= len(buf)
+        in_front = True
+    return ''.join(data)
+
+
+def wait_output(sock):
+    _register(g_owtd, sock)
+
+def sendall(sock, buffer):
+    in_front = False
+    while buffer:
+        _register(g_owtd, sock, in_front=in_front)
+        count = sock.send(buffer)
+        buffer = buffer[count:]
+        in_front = True
+
+def writeall(fd, buffer):
+    assert fd >= 0
+    in_front = False
+    while buffer:
+        _register(g_owtd, fd, in_front=in_front)
+        count = os.write(fd, buffer)
+        if not count:
+            raise ConnexionClosed("outbound connexion closed")
+        buffer = buffer[count:]
+        in_front = True
+
+
+def sleep(duration, *greenlets):
+    timer = Timer(duration)
+    try:
+        wait(*greenlets)
+    finally:
+        ok = timer.finished
+        timer.stop()
+    if not ok:
+        raise Interrupted
+
+def _wait():
+    g_dispatcher.switch()
+
+def wait(*greenlets):
+    assert greenlets#, "should not wait without events to wait on"
+    current = g_getcurrent()
+    for g in greenlets:
+        if g in g_waiters:
+            g_waiters[g].append(current)
+        else:
+            g_waiters[g] = [current]
+    g_dispatcher.switch()
+
+class Timer(object):
+    started = False
+    finished = False
+
+    def __init__(self, timeout):
+        self.g = g_getcurrent()
+        entry = (_time() + timeout, self)
+        if g_timers_mixed:
+            g_timers.append(entry)
+        else:
+            heappush(g_timers, entry)
+
+    def stop(self):
+        global g_timers_mixed
+        if not self.finished:
+            for i, (activationtime, timer) in enumerate(g_timers):
+                if timer is self:
+                    g_timers[i] = g_timers[-1]
+                    g_timers.pop()
+                    g_timers_mixed = True
+                    break
+            self.finished = True
+
+# ____________________________________________________________
+
+class autogreenlet(greenlet):
+    def __init__(self, function, *args, **kwds):
+        self.parent = g_dispatcher
+        self.function = function
+        self.args = args
+        self.kwds = kwds
+        g_active.append(self)
+
+    def run(self):
+        self.trace("start")
+        try:
+            self.function(*self.args, **self.kwds)
+        except Exception, e:
+            self.trace("stop (%s%s)", e.__class__.__name__,
+                       str(e) and (': '+str(e)))
+            raise
+        else:
+            self.trace("done")
+
+    def __repr__(self):
+##        args = ', '.join([repr(s) for s in self.args] +
+##                        ['%s=%r' % keyvalue for keyvalue in self.kwds.items()])
+##        return '<autogreenlet %s(%s)>' % (self.function.__name__, args)
+        return '<autogreenlet %s at %s>' % (self.function.__name__,
+                                            hex(id(self)))
+
+    def trace(self, msg, *args):
+        if TRACE:
+            print self, msg % args
+
+    def interrupt(self):
+        self.throw(Interrupted)
+
+
+g_active = deque()
+g_iwtd = {}
+g_owtd = {}
+g_timers = []
+g_timers_mixed = False
+
+g_getcurrent = greenlet.getcurrent
+
+def _register(g_wtd, sock, in_front=False):
+    d = g_wtd.setdefault(sock, deque())
+    g = g_getcurrent()
+    if in_front:
+        d.appendleft(g)
+    else:
+        d.append(g)
+    try:
+        if g_dispatcher.switch() is not g_wtd:
+            raise Interrupted
+    except:
+        remove_by_id(d, g) 
+        raise
+
+##def _unregister_timer():
+##    ...
+
+
+def check_dead_greenlets(mapping):
+    to_remove = [i for i, v in mapping.items() if not v]
+    for k in to_remove:
+        del mapping[k]
+
+def check_waiters(active):
+    if active in g_waiters:
+        for g in g_waiters[active]:
+            g.switch()
+        del g_waiters[active]
+
+
+def dispatcher_mainloop():
+    global g_timers_mixed
+    while 1:
+        try:
+            while g_active:
+                print 'active:', g_active[0]
+                active = g_active.popleft()
+                active.switch()
+                if active.dead:
+                    check_waiters(active)
+                    del active
+            if g_timers:
+                if g_timers_mixed:
+                    heapify(g_timers)
+                    g_timers_mixed = False
+                activationtime, timer = g_timers[0]
+                delay = activationtime - _time()
+                if delay <= 0.0:
+                    if timer.started:
+                        heappop(g_timers)
+                        #print 'timeout:', g
+                        timer.finished = True
+                        timer.g.switch()
+                        if timer.g.dead:
+                            check_waiters(timer.g)
+                        continue
+                    delay = 0.0
+                timer.started = True
+            else:
+                check_dead_greenlets(g_iwtd)
+                check_dead_greenlets(g_owtd)
+                if not (g_iwtd or g_owtd):
+                    # nothing to do, switch to the main greenlet
+                    g_dispatcher.parent.switch()
+                    continue
+                delay = None
+
+            print 'selecting...', g_iwtd.keys(), g_owtd.keys(), delay
+            iwtd, owtd, _ = _select(g_iwtd.keys(), g_owtd.keys(), [], delay)
+            print 'done'
+            for s in owtd:
+                if s in g_owtd:
+                    d = g_owtd[s]
+                    #print 'owtd:', d[0]
+                    g = d.popleft()
+                    if not d:
+                        try:
+                            del g_owtd[s]
+                        except KeyError:
+                            pass
+                    g.switch(g_owtd)
+                    if g.dead:
+                        check_waiters(g)
+            for s in iwtd:
+                if s in g_iwtd:
+                    d = g_iwtd[s]
+                    #print 'iwtd:', d[0]
+                    g = d.popleft()
+                    if not d:
+                        try:
+                            del g_iwtd[s]
+                        except KeyError:
+                            pass
+                    g.switch(g_iwtd)
+                    if g.dead:
+                        check_waiters(g)
+        except:
+            import sys
+            g_dispatcher.parent.throw(*sys.exc_info())
+
+g_dispatcher = greenlet(dispatcher_mainloop)
+g_waiters = {}

Added: py/trunk/py/net/msgstruct.py
==============================================================================
--- (empty file)
+++ py/trunk/py/net/msgstruct.py	Tue Mar  6 10:07:53 2007
@@ -0,0 +1,29 @@
+from struct import pack, unpack, calcsize
+
+
+def message(tp, *values):
+    strtype = type('')
+    typecodes = ['']
+    for v in values:
+        if type(v) is strtype:
+            typecodes.append('%ds' % len(v))
+        elif 0 <= v < 256:
+            typecodes.append('B')
+        else:
+            typecodes.append('l')
+    typecodes = ''.join(typecodes)
+    assert len(typecodes) < 256
+    return pack(("!B%dsc" % len(typecodes)) + typecodes,
+                len(typecodes), typecodes, tp, *values)
+
+def decodemessage(data):
+    if data:
+        limit = ord(data[0]) + 1
+        if len(data) >= limit:
+            typecodes = "!c" + data[1:limit]
+            end = limit + calcsize(typecodes)
+            if len(data) >= end:
+                return unpack(typecodes, data[limit:end]), data[end:]
+            #elif end > 1000000:
+            #    raise OverflowError
+    return None, data

Added: py/trunk/py/net/pipe/__init__.py
==============================================================================

Added: py/trunk/py/net/pipe/common.py
==============================================================================
--- (empty file)
+++ py/trunk/py/net/pipe/common.py	Tue Mar  6 10:07:53 2007
@@ -0,0 +1,38 @@
+import greensock2
+from pypeers.tool import log
+
+
+class BufferedInput(object):
+    in_buf = ''
+
+    def recv(self, bufsize):
+        self.wait_input()
+        buf = self.in_buf[:bufsize]
+        self.in_buf = self.in_buf[bufsize:]
+        return buf
+
+    def recvall(self, bufsize):
+        result = []
+        while bufsize > 0:
+            buf = self.recv(bufsize)
+            result.append(buf)
+            bufsize -= len(buf)
+        return ''.join(result)
+
+# ____________________________________________________________
+
+def forwardpipe(s1, s2):
+    try:
+        while 1:
+            s2.wait_output()
+            buffer = s1.recv(32768)
+            log('[%r -> %r] %r', s1, s2, buffer)
+            s2.sendall(buffer)
+            del buffer
+    finally:
+        s2.shutdown_wr()
+        s1.shutdown_rd()
+
+def linkpipes(s1, s2):
+    greensock2.autogreenlet(forwardpipe, s1, s2)
+    greensock2.autogreenlet(forwardpipe, s2, s1)

Added: py/trunk/py/net/pipe/fd.py
==============================================================================
--- (empty file)
+++ py/trunk/py/net/pipe/fd.py	Tue Mar  6 10:07:53 2007
@@ -0,0 +1,69 @@
+import os
+from py.__.net import greensock2
+
+
+class FDInput(object):
+
+    def __init__(self, read_fd, close=True):
+        self.read_fd = read_fd
+        self._close = close     # a flag or a callback
+
+    def shutdown_rd(self):
+        fd = self.read_fd
+        if fd is not None:
+            self.read_fd = None
+            close = self._close
+            if close:
+                self._close = False
+                if close == True:
+                    os.close(fd)
+                else:
+                    close()
+
+    __del__ = shutdown_rd
+
+    def wait_input(self):
+        greensock2.wait_input(self.read_fd)
+
+    def recv(self, bufsize):
+##        f = open('LOG', 'a')
+##        import os; print >> f, '[%d] RECV' % (os.getpid(),)
+##        f.close()
+        res = greensock2.read(self.read_fd, bufsize)
+##        f = open('LOG', 'a')
+##        import os; print >> f, '[%d] RECV %r' % (os.getpid(), res)
+##        f.close()
+        return res
+
+    def recvall(self, bufsize):
+        return greensock2.readall(self.read_fd, bufsize)
+
+
+class FDOutput(object):
+
+    def __init__(self, write_fd, close=True):
+        self.write_fd = write_fd
+        self._close = close     # a flag or a callback
+
+    def shutdown_wr(self):
+        fd = self.write_fd
+        if fd is not None:
+            self.write_fd = None
+            close = self._close
+            if close:
+                self._close = False
+                if close == True:
+                    os.close(fd)
+                else:
+                    close()
+
+    __del__ = shutdown_wr
+
+    def wait_output(self):
+        greensock2.wait_output(self.write_fd)
+
+    def sendall(self, buffer):
+##        f = open('LOG', 'a')
+##        import os; print >> f, '[%d] %r' % (os.getpid(), buffer)
+##        f.close()
+        greensock2.writeall(self.write_fd, buffer)

Added: py/trunk/py/net/pipe/gsocket.py
==============================================================================
--- (empty file)
+++ py/trunk/py/net/pipe/gsocket.py	Tue Mar  6 10:07:53 2007
@@ -0,0 +1,114 @@
+import greensock2
+import socket, errno, os
+
+error = socket.error
+
+
+class _delegate(object):
+    def __init__(self, methname):
+        self.methname = methname
+    def __get__(self, obj, typ=None):
+        result = getattr(obj._s, self.methname)
+        setattr(obj, self.methname, result)
+        return result
+
+
+class GreenSocket(object):
+
+    def __init__(self, family = socket.AF_INET,
+                       type   = socket.SOCK_STREAM,
+                       proto  = 0):
+        self._s = socket.socket(family, type, proto)
+        self._s.setblocking(False)
+
+    def fromsocket(cls, s):
+        if isinstance(s, GreenSocket):
+            s = s._s
+        result = GreenSocket.__new__(cls)
+        result._s = s
+        s.setblocking(False)
+        return result
+    fromsocket = classmethod(fromsocket)
+
+    def accept(self):
+        while 1:
+            try:
+                s, addr = self._s.accept()
+                break
+            except error, e:
+                import pdb;pdb.set_trace()
+                if e.args[0] not in (errno.EAGAIN, errno.EWOULDBLOCK):
+                    raise
+            self.wait_input()
+        return self.fromsocket(s), addr
+
+    bind   = _delegate("bind")
+    close  = _delegate("close")
+
+    def connect(self, addr):
+        err = self.connect_ex(addr)
+        if err:
+            raise error(err, os.strerror(err))
+
+    def connect_ex(self, addr):
+        err = self._s.connect_ex(addr)
+        if err == errno.EINPROGRESS:
+            greensock2.wait_output(self._s)
+            err = self._s.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
+        return err
+
+    #XXX dup
+    fileno      = _delegate("fileno")
+    getpeername = _delegate("getpeername")
+    getsockname = _delegate("getsockname")
+    getsockopt  = _delegate("getsockopt")
+    listen      = _delegate("listen")
+
+    def makefile(self, mode='r', bufsize=-1):
+        # hack, but reusing the internal socket._fileobject should just work
+        return socket._fileobject(self, mode, bufsize)
+
+    def recv(self, bufsize):
+        return greensock2.recv(self._s, bufsize)
+
+    def recvall(self, bufsize):
+        return greensock2.recvall(self._s, bufsize)
+
+    def recvfrom(self, bufsize):
+        self.wait_input()
+        buf, addr = self._s.recvfrom(bufsize)
+        if not buf:
+            raise ConnexionClosed("inbound connexion closed")
+        return buf, addr
+
+    def send(self, data):
+        self.wait_output()
+        return self._s.send(data)
+
+    def sendto(self, data, addr):
+        self.wait_output()
+        return self._s.sendto(data, addr)
+
+    def sendall(self, data):
+        greensock2.sendall(self._s, data)
+
+    setsockopt = _delegate("setsockopt")
+    shutdown   = _delegate("shutdown")
+
+    def shutdown_rd(self):
+        try:
+            self._s.shutdown(socket.SHUT_RD)
+        except error:
+            pass
+
+    def shutdown_wr(self):
+        try:
+            self._s.shutdown(socket.SHUT_WR)
+        except error:
+            pass
+
+    def wait_input(self):
+        greensock2.wait_input(self._s)
+
+    def wait_output(self):
+        greensock2.wait_output(self._s)

Added: py/trunk/py/net/pipe/mp.py
==============================================================================
--- (empty file)
+++ py/trunk/py/net/pipe/mp.py	Tue Mar  6 10:07:53 2007
@@ -0,0 +1,29 @@
+from pypeers.stream.common import BufferedInput
+
+
+class MeetingPointInput(BufferedInput):
+
+    def __init__(self, accepter):
+        self.accepter = accepter
+
+    def wait_input(self):
+        while not self.in_buf:
+            self.in_buf = self.accepter.accept()
+
+    def shutdown_rd(self):
+        self.accepter.close()
+
+
+class MeetingPointOutput(BufferedInput):
+
+    def __init__(self, giver):
+        self.giver = giver
+
+    def wait_output(self):
+        self.giver.wait()
+
+    def sendall(self, buffer):
+        self.giver.give(buffer)
+
+    def shutdown_wr(self):
+        self.giver.close()

Added: py/trunk/py/net/pipelayer.py
==============================================================================
--- (empty file)
+++ py/trunk/py/net/pipelayer.py	Tue Mar  6 10:07:53 2007
@@ -0,0 +1,306 @@
+#import os
+import struct
+from collections import deque
+
+
+class InvalidPacket(Exception):
+    pass
+
+
+FLAG_NAK1 = 0xE0
+FLAG_NAK  = 0xE1
+FLAG_REG  = 0xE2
+FLAG_CFRM = 0xE3
+
+FLAG_RANGE_START  = 0xE0
+FLAG_RANGE_STOP   = 0xE4
+
+max_old_packets = 256      # must be <= 256
+
+
+class PipeLayer(object):
+    timeout = 1
+    headersize = 4
+
+    def __init__(self):
+        #self.localid = os.urandom(4)
+        #self.remoteid = None
+        self.cur_time = 0
+        self.out_queue = deque()
+        self.out_nextseqid = 0
+        self.out_nextrepeattime = None
+        self.in_nextseqid = 0
+        self.in_outoforder = {}
+        self.out_oldpackets = deque()
+        self.out_flags = FLAG_REG
+        self.out_resend = 0
+        self.out_resend_skip = False
+
+    def queue(self, data):
+        if data:
+            self.out_queue.appendleft(data)
+
+    def queue_size(self):
+        total = 0
+        for data in self.out_queue:
+            total += len(data)
+        return total
+
+    def in_sync(self):
+        return not self.out_queue and self.out_nextrepeattime is None
+
+    def settime(self, curtime):
+        self.cur_time = curtime
+        if self.out_queue:
+            if len(self.out_oldpackets) < max_old_packets:
+                return 0   # more data to send now
+        if self.out_nextrepeattime is not None:
+            return max(0, self.out_nextrepeattime - curtime)
+        else:
+            return None
+
+    def encode(self, maxlength):
+        #print ' '*self._dump_indent, '--- OUTQ', self.out_resend, self.out_queue
+        if len(self.out_oldpackets) >= max_old_packets:
+            # congestion, stalling
+            payload = 0
+        else:
+            payload = maxlength - 4
+            if payload <= 0:
+                raise ValueError("encode(): buffer too small")
+        if (self.out_nextrepeattime is not None and
+            self.out_nextrepeattime <= self.cur_time):
+            # no ACK received so far, send a packet (possibly empty)
+            if not self.out_queue:
+                payload = 0
+        else:
+            if not self.out_queue:   # no more data to send
+                return None
+            if payload == 0:         # congestion
+                return None
+        # prepare a packet
+        seqid = self.out_nextseqid
+        flags = self.out_flags
+        self.out_flags = FLAG_REG     # clear out the flags for the next time
+        if payload > 0:
+            self.out_nextseqid = (seqid + 1) & 0xFFFF
+            data = self.out_queue.pop()
+            packetlength = len(data)
+            if self.out_resend > 0:
+                if packetlength > payload:
+                    raise ValueError("XXX need constant buffer size for now")
+                self.out_resend -= 1
+                if self.out_resend_skip:
+                    if self.out_resend > 0:
+                        self.out_queue.pop()
+                        self.out_resend -= 1
+                        self.out_nextseqid = (seqid + 2) & 0xFFFF
+                    self.out_resend_skip = False
+                packetpayload = data
+            else:
+                packet = []
+                while packetlength <= payload:
+                    packet.append(data)
+                    if not self.out_queue:
+                        break
+                    data = self.out_queue.pop()
+                    packetlength += len(data)
+                else:
+                    rest = len(data) + payload - packetlength
+                    packet.append(data[:rest])
+                    self.out_queue.append(data[rest:])
+                packetpayload = ''.join(packet)
+                self.out_oldpackets.appendleft(packetpayload)
+                #print ' '*self._dump_indent, '--- OLDPK', self.out_oldpackets
+        else:
+            # a pure ACK packet, no payload
+            if self.out_oldpackets and flags == FLAG_REG:
+                flags = FLAG_CFRM
+            packetpayload = ''
+        packet = struct.pack("!BBH", flags,
+                             self.in_nextseqid & 0xFF,
+                             seqid) + packetpayload
+        if self.out_oldpackets:
+            self.out_nextrepeattime = self.cur_time + self.timeout
+        else:
+            self.out_nextrepeattime = None
+        #self.dump('OUT', packet)
+        return packet
+
+    def decode(self, rawdata):
+        if len(rawdata) < 4:
+            raise InvalidPacket
+        #print ' '*self._dump_indent, '------ out %d (+%d) in %d' % (self.out_nextseqid, self.out_resend, self.in_nextseqid)
+        #self.dump('IN ', rawdata)
+        in_flags, ack_seqid, in_seqid = struct.unpack("!BBH", rawdata[:4])
+        if not (FLAG_RANGE_START <= in_flags < FLAG_RANGE_STOP):
+            raise InvalidPacket
+        in_diff  = (in_seqid  - self.in_nextseqid ) & 0xFFFF
+        ack_diff = (self.out_nextseqid + self.out_resend - ack_seqid) & 0xFF
+        if in_diff >= max_old_packets:
+            return ''    # invalid, but can occur as a late repetition
+        if ack_diff != len(self.out_oldpackets):
+            # forget all acknowledged packets
+            if ack_diff > len(self.out_oldpackets):
+                return ''   # invalid, but can occur with packet reordering
+            while len(self.out_oldpackets) > ack_diff:
+                #print ' '*self._dump_indent, '--- POP', repr(self.out_oldpackets[-1])
+                self.out_oldpackets.pop()
+            if self.out_oldpackets:
+                self.out_nextrepeattime = self.cur_time + self.timeout
+            else:
+                self.out_nextrepeattime = None   # all packets ACKed
+        if in_flags == FLAG_NAK or in_flags == FLAG_NAK1:
+            # this is a NAK: resend the old packets as far as they've not
+            # also been ACK'ed in the meantime (can occur with reordering)
+            while self.out_resend < len(self.out_oldpackets):
+                self.out_queue.append(self.out_oldpackets[self.out_resend])
+                self.out_resend += 1
+                self.out_nextseqid = (self.out_nextseqid - 1) & 0xFFFF
+                #print ' '*self._dump_indent, '--- REP', self.out_nextseqid, repr(self.out_queue[-1])
+            self.out_resend_skip = in_flags == FLAG_NAK1
+        elif in_flags == FLAG_CFRM:
+            # this is a CFRM: request for confirmation
+            self.out_nextrepeattime = self.cur_time
+        # receive this packet's payload if it is the next in the sequence
+        if in_diff == 0:
+            if len(rawdata) > 4:
+                #print ' '*self._dump_indent, 'RECV ', self.in_nextseqid, repr(rawdata[4:])
+                self.in_nextseqid = (self.in_nextseqid + 1) & 0xFFFF
+                result = [rawdata[4:]]
+                while self.in_nextseqid in self.in_outoforder:
+                    result.append(self.in_outoforder.pop(self.in_nextseqid))
+                    self.in_nextseqid = (self.in_nextseqid + 1) & 0xFFFF
+                return ''.join(result)
+        else:
+            # we missed at least one intermediate packet: send a NAK
+            if len(rawdata) > 4:
+                self.in_outoforder[in_seqid] = rawdata[4:]
+            if ((self.in_nextseqid + 1) & 0xFFFF) in self.in_outoforder:
+                self.out_flags = FLAG_NAK1
+            else:
+                self.out_flags = FLAG_NAK
+            self.out_nextrepeattime = self.cur_time
+        return ''
+
+    _dump_indent = 0
+    def dump(self, dir, rawdata):
+        in_flags, ack_seqid, in_seqid = struct.unpack("!BBH", rawdata[:4])
+        print ' ' * self._dump_indent, dir,
+        if in_flags == FLAG_NAK:
+            print 'NAK',
+        elif in_flags == FLAG_NAK1:
+            print 'NAK1',
+        elif in_flags == FLAG_CFRM:
+            print 'CFRM',
+        #print ack_seqid, in_seqid, '(%d bytes)' % (len(rawdata)-4,)
+        print ack_seqid, in_seqid, repr(rawdata[4:])
+
+
+def pipe_over_udp(udpsock, send_fd=-1, recv_fd=-1,
+                  timeout=1.0, inactivity_timeout=None):
+    """Example: send all data showing up in send_fd over the given UDP
+    socket, and write incoming data into recv_fd.  The send_fd and
+    recv_fd are plain file descriptors.  When an EOF is read from
+    send_fd, this function returns (after making sure that all data was
+    received by the remote side).
+    """
+    import os
+    from select import select
+    from time import time
+    p = PipeLayer()
+    p.timeout = timeout
+    iwtdlist = [udpsock]
+    if send_fd >= 0:
+        iwtdlist.append(send_fd)
+    running = True
+    while running or not p.in_sync():
+        delay = delay1 = p.settime(time())
+        if delay is None:
+            delay = inactivity_timeout
+        iwtd, owtd, ewtd = select(iwtdlist, [], [], delay)
+        if iwtd:
+            if send_fd in iwtd:
+                data = os.read(send_fd, 1500 - p.headersize)
+                if not data:
+                    # EOF
+                    iwtdlist.remove(send_fd)
+                    running = False
+                else:
+                    #print 'queue', len(data)
+                    p.queue(data)
+            if udpsock in iwtd:
+                packet = udpsock.recv(65535)
+                #print 'decode', len(packet)
+                p.settime(time())
+                data = p.decode(packet)
+                i = 0
+                while i < len(data):
+                    i += os.write(recv_fd, data[i:])
+        elif delay1 is None:
+            break    # long inactivity
+        p.settime(time())
+        packet = p.encode(1500)
+        if packet:
+            #print 'send', len(packet)
+            #if os.urandom(1) >= '\x08':    # emulate packet losses
+            udpsock.send(packet)
+
+
+class PipeOverUdp(object):
+
+    def __init__(self, udpsock, timeout=1.0):
+        import thread, os
+        self.os = os
+        self.sendpipe = os.pipe()
+        self.recvpipe = os.pipe()
+        thread.start_new_thread(pipe_over_udp, (udpsock,
+                                                self.sendpipe[0],
+                                                self.recvpipe[1],
+                                                timeout))
+
+    def __del__(self):
+        os = self.os
+        if self.sendpipe:
+            os.close(self.sendpipe[0])
+            os.close(self.sendpipe[1])
+            self.sendpipe = None
+        if self.recvpipe:
+            os.close(self.recvpipe[0])
+            os.close(self.recvpipe[1])
+            self.recvpipe = None
+
+    close = __del__
+
+    def send(self, data):
+        if not self.sendpipe:
+            raise IOError("I/O operation on a closed PipeOverUdp")
+        return self.os.write(self.sendpipe[1], data)
+
+    def sendall(self, data):
+        i = 0
+        while i < len(data):
+            i += self.send(data[i:])
+
+    def recv(self, bufsize):
+        if not self.recvpipe:
+            raise IOError("I/O operation on a closed PipeOverUdp")
+        return self.os.read(self.recvpipe[0], bufsize)
+
+    def recvall(self, bufsize):
+        buf = []
+        while bufsize > 0:
+            data = self.recv(bufsize)
+            buf.append(data)
+            bufsize -= len(data)
+        return ''.join(buf)
+
+    def fileno(self):
+        if not self.recvpipe:
+            raise IOError("I/O operation on a closed PipeOverUdp")
+        return self.recvpipe[0]
+
+    def ofileno(self):
+        if not self.sendpipe:
+            raise IOError("I/O operation on a closed PipeOverUdp")
+        return self.sendpipe[1]

Added: py/trunk/py/net/server/__init__.py
==============================================================================

Added: py/trunk/py/net/server/httpserver.py
==============================================================================
--- (empty file)
+++ py/trunk/py/net/server/httpserver.py	Tue Mar  6 10:07:53 2007
@@ -0,0 +1,42 @@
+import BaseHTTPServer
+from pypeers import greensock2
+from pypeers.pipe.gsocket import GreenSocket
+
+
+class GreenMixIn:
+    """Mix-in class to handle each request in a new greenlet."""
+
+    def process_request_greenlet(self, request, client_address):
+        """Same as in BaseServer but as a greenlet.
+        In addition, exception handling is done here.
+        """
+        try:
+            self.finish_request(request, client_address)
+            self.close_request(request)
+        except:
+            self.handle_error(request, client_address)
+            self.close_request(request)
+
+    def process_request(self, request, client_address):
+        """Start a new greenlet to process the request."""
+        greensock2.autogreenlet(self.process_request_greenlet,
+                                request, client_address)
+
+
+class GreenHTTPServer(GreenMixIn, BaseHTTPServer.HTTPServer):
+    protocol_version = "HTTP/1.1"
+
+    def server_bind(self):
+        self.socket = GreenSocket.fromsocket(self.socket)
+        BaseHTTPServer.HTTPServer.server_bind(self)
+
+
+def test_simple(handler_class=None):
+    if handler_class is None:
+        from SimpleHTTPServer import SimpleHTTPRequestHandler
+        handler_class = SimpleHTTPRequestHandler
+    server_address = ('', 8000)
+    httpd = GreenHTTPServer(server_address, handler_class)
+    sa = httpd.socket.getsockname()
+    print "Serving HTTP on", sa[0], "port", sa[1], "..."
+    httpd.serve_forever()

Added: py/trunk/py/net/test/test_greenexecnet.py
==============================================================================
--- (empty file)
+++ py/trunk/py/net/test/test_greenexecnet.py	Tue Mar  6 10:07:53 2007
@@ -0,0 +1,41 @@
+import py
+from py.__.net.greenexecnet import *
+
+def test_simple():
+    gw = PopenGateway()
+    channel = gw.remote_exec("x = channel.receive(); channel.send(x * 6)")
+    channel.send(7)
+    res = channel.receive()
+    assert res == 42
+
+def test_ssh():
+    py.test.skip("Bootstrapping")
+    gw = SshGateway('codespeak.net')
+    channel = gw.remote_exec("""
+        import socket
+        channel.send(socket.gethostname())
+    """)
+    res = channel.receive()
+    assert res.endswith('codespeak.net')
+
+def test_remote_error():
+    gw = PopenGateway()
+    channel = gw.remote_exec("x = channel.receive(); channel.send(x + 1)")
+    channel.send("hello")
+    py.test.raises(RemoteError, channel.receive)
+
+def test_invalid_object():
+    class X(object):
+        pass
+    gw = PopenGateway()
+    channel = gw.remote_exec("x = channel.receive(); channel.send(x + 1)")
+    channel.send(X())
+    py.test.raises(RemoteError, channel.receive)
+
+def test_channel_over_channel():
+    gw = PopenGateway()
+    chan1 = gw.newchannel()
+    channel = gw.remote_exec("chan1 = channel.receive(); chan1.send(42)")
+    channel.send(chan1)
+    res = chan1.receive()
+    assert res == 42

Added: py/trunk/py/net/test/test_greensock2.py
==============================================================================
--- (empty file)
+++ py/trunk/py/net/test/test_greensock2.py	Tue Mar  6 10:07:53 2007
@@ -0,0 +1,213 @@
+import py
+from socket import *
+from py.__.net.greensock2 import *
+
+def test_meetingpoint():
+    giv1, acc1 = meetingpoint()
+    giv2, acc2 = meetingpoint()
+    giv3, acc3 = meetingpoint()
+
+    lst = []
+
+    def g1():
+        lst.append(0)
+        x = acc2.accept()
+        assert x == 'hello'
+        lst.append(2)
+        giv1.give('world')
+        lst.append(5)
+        x = acc3.accept()
+        assert x == 'middle'
+        lst.append(6)
+        giv3.give('done')
+
+    def g2():
+        lst.append(1)
+        giv2.give('hello')
+        lst.append(3)
+        y = acc1.accept()
+        assert y == 'world'
+        lst.append(4)
+
+    autogreenlet(g1)
+    autogreenlet(g2)
+    giv3.give('middle')
+    tag = acc3.accept()
+    assert tag == 'done'
+    assert lst == [0, 1, 2, 3, 4, 5, 6]
+
+
+def test_producer():
+    lst = []
+
+    def prod(n):
+        lst.append(1)
+        yield n
+        lst.append(2)
+        yield 87
+        lst.append(3)
+
+    def cons():
+        lst.append(4)
+        accepter = producer(prod, 145)
+        lst.append(5)
+        lst.append(accepter.accept())
+        lst.append(6)
+        lst.append(accepter.accept())
+        lst.append(7)
+        try:
+            accepter.accept()
+        except Interrupted:
+            lst.append(8)
+
+    g = autogreenlet(cons)
+    wait(g)
+    assert lst == [4, 5, 1, 145, 6, 2, 87, 7, 3, 8]
+
+
+def test_timer():
+    lst = []
+
+    def g1():
+        sleep(0.1, g_1)
+        lst.append(1)
+        sleep(0.2, g_1)
+        lst.append(3)
+
+    def g2():
+        lst.append(0)
+        sleep(0.2, g_2)
+        lst.append(2)
+        sleep(0.2, g_2)
+        lst.append(4)
+
+    g_1 = autogreenlet(g1)
+    g_2 = autogreenlet(g2)
+    wait(g_1)
+    wait(g_2)
+    assert lst == [0, 1, 2, 3, 4]
+
+
+def test_socket():
+    s1 = socket(AF_INET, SOCK_DGRAM)
+    s2 = socket(AF_INET, SOCK_DGRAM)
+    s1.bind(('', INADDR_ANY))
+    s2.bind(('', INADDR_ANY))
+    s1.connect(s2.getsockname())
+    s2.connect(s1.getsockname())
+
+    lst = []
+
+    def g1():
+        lst.append(0)
+        x = recv(s1, 5)
+        assert x == 'hello'
+        lst.append(3)
+        sendall(s1, 'world')
+        lst.append(4)
+
+    def g2():
+        lst.append(1)
+        sendall(s2, 'hello')
+        lst.append(2)
+        y = recv(s2, 5)
+        assert y == 'world'
+        lst.append(5)
+
+    g_1 = autogreenlet(g1)
+    g_2 = autogreenlet(g2)
+    wait(g_1)
+    wait(g_2)
+    assert lst == [0, 1, 2, 3, 4, 5]
+
+
+##def test_Queue():
+
+##    def g1():
+##        lst.append(5)
+##        q.put(6)
+##        lst.append(7)
+##        q.put(8)
+##        lst.append(9)
+##        q.put(10)
+##        lst.append(11)
+##        q.put(12)    # not used
+
+##    def g2():
+##        lst.append(1)
+##        lst.append(q.get())
+##        lst.append(2)
+##        lst.append(q.get())
+##        lst.append(3)
+##        lst.append(q.get())
+##        lst.append(4)
+
+##    q = Queue()
+##    lst = []
+##    autogreenlet(g1)
+##    autogreenlet(g2)
+##    wait()
+##    assert lst == [5, 7, 9, 11, 1, 6, 2, 8, 3, 10, 4]
+
+##    q = Queue()
+##    lst = []
+##    autogreenlet(g2)
+##    autogreenlet(g1)
+##    wait()
+##    assert lst == [1, 5, 7, 9, 11, 6, 2, 8, 3, 10, 4]
+
+
+##def test_Event():
+
+##    def g1():
+##        assert not e.isSet()
+##        e.wait()
+##        assert not e.isSet()   # no longer set
+##        lst.append(1)
+##        e.set()
+##        e.wait()
+##        lst.append(2)
+##        assert e.isSet()
+##        e.clear()
+##        assert not e.isSet()
+##        lst.append(0)
+##        e.set()
+##        lst.append(3)
+##        assert e.isSet()
+
+##    def g2():
+##        assert not e.isSet()
+##        lst.append(4)
+##        e.set()
+##        lst.append(7)
+##        e.clear()
+##        e.set()
+##        e.clear()
+##        assert not e.isSet()
+##        lst.append(5)
+##        e.wait()
+##        assert e.isSet()
+##        lst.append(6)
+
+##    e = Event()
+##    lst = []
+##    autogreenlet(g1)
+##    autogreenlet(g2)
+##    wait()
+##    assert lst == [4, 7, 5, 1, 2, 0, 3, 6]
+
+
+##def test_Event_timeout():
+##    def g1():
+##        lst.append(5)
+##        e.wait(0.1)
+##        lst.append(e.isSet())
+##        e.wait(60.0)
+##        lst.append(e.isSet())
+##    lst = []
+##    e = Event()
+##    autogreenlet(g1)
+##    sleep(0.5)
+##    e.set()
+##    wait()
+##    assert lst == [5, False, True]

Added: py/trunk/py/net/test/test_pipelayer.py
==============================================================================
--- (empty file)
+++ py/trunk/py/net/test/test_pipelayer.py	Tue Mar  6 10:07:53 2007
@@ -0,0 +1,215 @@
+import os, random
+from py.__.net.pipelayer import PipeLayer, pipe_over_udp, PipeOverUdp
+
+def test_simple():
+    data1 = os.urandom(1000)
+    data2 = os.urandom(1000)
+    p1 = PipeLayer()
+    p2 = PipeLayer()
+    p2._dump_indent = 40
+    p1.queue(data1)
+    p1.queue(data2)
+    recv = ''
+    while len(recv) < 2000:
+        raw = p1.encode(64)
+        assert raw is not None
+        res = p2.decode(raw)
+        assert res is not None
+        recv += res
+    assert recv == data1 + data2
+    raw = p1.encode(64)
+    assert raw is None
+
+def test_stabilize():
+    data1 = os.urandom(28)
+    p1 = PipeLayer()
+    p2 = PipeLayer()
+    p2._dump_indent = 40
+    p1.queue(data1)
+    recv = ''
+    t = 0.0
+    print
+    while True:
+        delay1 = p1.settime(t)
+        delay2 = p2.settime(t)
+        t += 0.100000001
+        if delay1 is delay2 is None:
+            break
+        if delay1 == 0:
+            raw = p1.encode(10)
+            p1.dump('OUT', raw)
+            assert raw is not None
+            res = p2.decode(raw)
+            assert res is not None
+            recv += res
+        if delay2 == 0:
+            raw = p2.encode(10)
+            p2.dump('OUT', raw)
+            assert raw is not None
+            res = p1.decode(raw)
+            assert res == ''
+    assert recv == data1
+
+def test_bidir():
+    data1 = os.urandom(1000)
+    data2 = os.urandom(1000)
+    p1 = PipeLayer()
+    p2 = PipeLayer()
+    p2._dump_indent = 40
+    p1.queue(data1)
+    p2.queue(data2)
+    recv = ['', '']
+    while len(recv[0]) < 1000 or len(recv[1]) < 1000:
+        progress = False
+        for (me, other, i) in [(p1, p2, 1), (p2, p1, 0)]:
+            raw = me.encode(64)
+            if raw is not None:
+                res = other.decode(raw)
+                assert res is not None
+                recv[i] += res
+                if res:
+                    progress = True
+        assert progress
+    assert recv[0] == data2
+    assert recv[1] == data1
+    raw = p1.encode(64)
+    assert raw is None
+    raw = p2.encode(64)
+    assert raw is None
+
+def test_with_loss():
+    data1 = os.urandom(10000).encode('hex')
+    data2 = os.urandom(10000).encode('hex')
+    #data1 = '0123456789'
+    #data2 = 'ABCDEFGHIJ'
+    p1 = PipeLayer()
+    p2 = PipeLayer()
+    p2._dump_indent = 40
+    p1.queue(data1)
+    p2.queue(data2)
+    recv = ['', '']
+    time = 0
+    active = 1
+    while active:
+        active = 0
+        time += 0.2
+        #print '.'
+        exchange = []
+        for (me, other, i) in [(p1, p2, 1), (p2, p1, 0)]:
+            to = me.settime(time)
+            packet = me.encode(12)
+            assert (packet is not None) == (to == 0)
+            if to is not None:
+                active = 1
+                if to == 0:
+                    exchange.append((packet, other, i))
+        for (packet, other, i) in exchange:
+            if random.random() < 0.5:
+                pass   # drop packet
+            else:
+                res = other.decode(packet)
+                assert res is not None
+                recv[i] += res
+        assert data2.startswith(recv[0])
+        assert data1.startswith(recv[1])
+    assert recv[0] == data2
+    assert recv[1] == data1
+    print time
+
+def test_massive_reordering():
+    data1 = os.urandom(10000).encode('hex')
+    data2 = os.urandom(10000).encode('hex')
+    #data1 = '0123456789'
+    #data2 = 'ABCDEFGHIJ'
+    p1 = PipeLayer()
+    p2 = PipeLayer()
+    p2._dump_indent = 40
+    p1.queue(data1)
+    p2.queue(data2)
+    recv = ['', '']
+    time = 0
+    active = 1
+    exchange = []
+    while active or exchange:
+        active = 0
+        time += 0.2
+        #print '.'
+        for (me, other, i) in [(p1, p2, 1), (p2, p1, 0)]:
+            to = me.settime(time)
+            packet = me.encode(12)
+            assert (packet is not None) == (to == 0)
+            if to is not None:
+                active = 1
+                if to == 0:
+                    exchange.append((packet, other, i))
+        if random.random() < 0.02:
+            random.shuffle(exchange)
+            for (packet, other, i) in exchange:
+                res = other.decode(packet)
+                assert res is not None
+                recv[i] += res
+            exchange = []
+        assert data2.startswith(recv[0])
+        assert data1.startswith(recv[1])
+    assert recv[0] == data2
+    assert recv[1] == data1
+    print time
+
+def udpsockpair():
+    from socket import socket, AF_INET, SOCK_DGRAM, INADDR_ANY
+    s1 = socket(AF_INET, SOCK_DGRAM)
+    s2 = socket(AF_INET, SOCK_DGRAM)
+    s1.bind(('127.0.0.1', INADDR_ANY))
+    s2.bind(('127.0.0.1', INADDR_ANY))
+    s2.connect(s1.getsockname())
+    s1.connect(s2.getsockname())
+    return s1, s2
+
+def test_pipe_over_udp():
+    import thread
+    s1, s2 = udpsockpair()
+
+    fd1 = os.open(__file__, os.O_RDONLY)
+    fd2 = os.open('test_pipelayer.py~copy', os.O_WRONLY|os.O_CREAT|os.O_TRUNC)
+
+    thread.start_new_thread(pipe_over_udp, (s1, fd1))
+    pipe_over_udp(s2, recv_fd=fd2, inactivity_timeout=2.5)
+    os.close(fd1)
+    os.close(fd2)
+    f = open(__file__, 'rb')
+    data1 = f.read()
+    f.close()
+    f = open('test_pipelayer.py~copy', 'rb')
+    data2 = f.read()
+    f.close()
+    assert data1 == data2
+    os.unlink('test_pipelayer.py~copy')
+
+def test_PipeOverUdp():
+    s1, s2 = udpsockpair()
+    p1 = PipeOverUdp(s1, timeout=0.2)
+    p2 = PipeOverUdp(s2, timeout=0.2)
+    p2.sendall('goodbye')
+    for k in range(10):
+        p1.sendall('hello world')
+        input = p2.recvall(11)
+        assert input == 'hello world'
+    input = p1.recvall(7)
+    assert input == 'goodbye'
+
+    bigchunk1 = os.urandom(500000)
+    bigchunk2 = os.urandom(500000)
+    i1 = i2 = 0
+    j1 = j2 = 0
+    while j1 < len(bigchunk1) or j2 < len(bigchunk2):
+        i1 += p1.send(bigchunk1[i1:i1+512])
+        i2 += p2.send(bigchunk2[i2:i2+512])
+        data = p1.recv(512)
+        assert data == bigchunk2[j2:j2+len(data)]
+        j2 += len(data)
+        data = p2.recv(512)
+        assert data == bigchunk1[j1:j1+len(data)]
+        j1 += len(data)
+        #print i1, i2, j1, j2
+    p1.close()
+    p2.close()


More information about the py-svn mailing list