[py-svn] r39973 - in py/trunk/py/net: . pipe server test
fijal at codespeak.net
fijal at codespeak.net
Tue Mar 6 10:07:55 CET 2007
Author: fijal
Date: Tue Mar 6 10:07:53 2007
New Revision: 39973
Added:
py/trunk/py/net/ (props changed)
py/trunk/py/net/__init__.py (contents, props changed)
py/trunk/py/net/greenexecnet.py (contents, props changed)
py/trunk/py/net/greensock2.py (contents, props changed)
py/trunk/py/net/msgstruct.py (contents, props changed)
py/trunk/py/net/pipe/ (props changed)
py/trunk/py/net/pipe/__init__.py (contents, props changed)
py/trunk/py/net/pipe/common.py (contents, props changed)
py/trunk/py/net/pipe/fd.py (contents, props changed)
py/trunk/py/net/pipe/gsocket.py (contents, props changed)
py/trunk/py/net/pipe/mp.py (contents, props changed)
py/trunk/py/net/pipelayer.py (contents, props changed)
py/trunk/py/net/server/ (props changed)
py/trunk/py/net/server/__init__.py
py/trunk/py/net/server/httpserver.py (contents, props changed)
py/trunk/py/net/test/ (props changed)
py/trunk/py/net/test/test_greenexecnet.py (contents, props changed)
py/trunk/py/net/test/test_greensock2.py (contents, props changed)
py/trunk/py/net/test/test_pipelayer.py (contents, props changed)
Log:
Add a (stolen almost directly from arigo/hack) network layer
based on top of greenlets. Needs some tweaking, so it's not exposed
as a py.xxx, but rather py.__.net
Added: py/trunk/py/net/__init__.py
==============================================================================
Added: py/trunk/py/net/greenexecnet.py
==============================================================================
--- (empty file)
+++ py/trunk/py/net/greenexecnet.py Tue Mar 6 10:07:53 2007
@@ -0,0 +1,203 @@
+import sys, os, py, inspect
+from py.__.net import greensock2
+from py.__.net.msgstruct import message, decodemessage
+
+MSG_REMOTE_EXEC = 'r'
+MSG_OBJECT = 'o'
+MSG_ERROR = 'e'
+MSG_CHAN_CLOSE = 'c'
+MSG_FATAL = 'f'
+MSG_CHANNEL = 'n'
+
+class Gateway(object):
+
+ def __init__(self, input, output, is_remote=False):
+ self.input = input
+ self.output = output
+ self.nextchannum = int(is_remote)
+ self.receivers = {}
+ self.greenlet = greensock2.autogreenlet(self.serve_forever, is_remote)
+
+ def remote_exec(self, remote_source):
+ remote_source = py.code.Source(remote_source)
+ chan = self.newchannel()
+ msg = message(MSG_REMOTE_EXEC, chan.n, str(remote_source))
+ self.output.sendall(msg)
+ return chan
+
+ def newchannel(self):
+ n = self.nextchannum
+ self.nextchannum += 2
+ return self.make_channel(n)
+
+ def make_channel(self, n):
+ giver, accepter = greensock2.meetingpoint()
+ assert n not in self.receivers
+ self.receivers[n] = giver
+ return Channel(self, n, accepter)
+
+ def serve_forever(self, is_remote=False):
+ try:
+ buffer = ""
+ while 1:
+ msg, buffer = decodemessage(buffer)
+ if msg is None:
+ buffer += self.input.recv(16384)
+ else:
+ handler = HANDLERS[msg[0]]
+ handler(self, *msg[1:])
+ except greensock2.greenlet.GreenletExit:
+ raise
+ except:
+ if is_remote:
+ msg = message(MSG_FATAL, format_error(*sys.exc_info()))
+ self.output.sendall(msg)
+ else:
+ raise
+
+ def msg_remote_exec(self, n, source):
+ def do_execute(channel):
+ try:
+ d = {'channel': channel}
+ exec source in d
+ except:
+ channel.report_error(*sys.exc_info())
+ else:
+ channel.close()
+ greensock2.autogreenlet(do_execute, self.make_channel(n))
+
+ def msg_object(self, n, objrepr):
+ obj = eval(objrepr)
+ if n in self.receivers:
+ self.receivers[n].give_queued(obj)
+
+ def msg_error(self, n, s):
+ if n in self.receivers:
+ self.receivers[n].give_queued(RemoteError(s))
+ self.receivers[n].close()
+ del self.receivers[n]
+
+ def msg_chan_close(self, n):
+ if n in self.receivers:
+ self.receivers[n].close()
+ del self.receivers[n]
+
+ def msg_channel(self, n, m):
+ if n in self.receivers:
+ self.receivers[n].give_queued(self.make_channel(m))
+
+ def msg_fatal(self, s):
+ raise RemoteError(s)
+
+HANDLERS = {
+ MSG_REMOTE_EXEC: Gateway.msg_remote_exec,
+ MSG_OBJECT: Gateway.msg_object,
+ MSG_ERROR: Gateway.msg_error,
+ MSG_CHAN_CLOSE: Gateway.msg_chan_close,
+ MSG_CHANNEL: Gateway.msg_channel,
+ MSG_FATAL: Gateway.msg_fatal,
+ }
+
+
+class Channel(object):
+
+ def __init__(self, gw, n, accepter):
+ self.gw = gw
+ self.n = n
+ self.accepter = accepter
+
+ def send(self, obj):
+ if isinstance(obj, Channel):
+ assert obj.gw is self.gw
+ msg = message(MSG_CHANNEL, self.n, obj.n)
+ else:
+ msg = message(MSG_OBJECT, self.n, repr(obj))
+ self.gw.output.sendall(msg)
+
+ def receive(self):
+ obj = self.accepter.accept()
+ if isinstance(obj, RemoteError):
+ raise obj
+ else:
+ return obj
+
+ def close(self):
+ try:
+ self.gw.output.sendall(message(MSG_CHAN_CLOSE, self.n))
+ except OSError:
+ pass
+
+ def report_error(self, exc_type, exc_value, exc_traceback=None):
+ s = format_error(exc_type, exc_value, exc_traceback)
+ try:
+ self.gw.output.sendall(message(MSG_ERROR, self.n, s))
+ except OSError:
+ pass
+
+
+class RemoteError(Exception):
+ pass
+
+def format_error(exc_type, exc_value, exc_traceback=None):
+ import traceback, StringIO
+ s = StringIO.StringIO()
+ traceback.print_exception(exc_type, exc_value, exc_traceback, file=s)
+ return s.getvalue()
+
+
+class PopenCmdGateway(Gateway):
+ action = "exec input()"
+
+ def __init__(self, cmdline):
+ from py.__.net.pipe.fd import FDInput, FDOutput
+ child_in, child_out = os.popen2(cmdline, 't', 0)
+ fdin = FDInput(child_out.fileno(), child_out.close)
+ fdout = FDOutput(child_in.fileno(), child_in.close)
+ fdout.sendall(self.get_bootstrap_code())
+ super(PopenCmdGateway, self).__init__(input = fdin, output = fdout)
+
+ def get_bootstrap_code():
+ # XXX assumes that the py lib is installed on the remote side
+ src = []
+ src.append('from py.__.net import greenexecnet')
+ src.append('greenexecnet.PopenCmdGateway.run_server()')
+ src.append('')
+ return '%r\n' % ('\n'.join(src),)
+ get_bootstrap_code = staticmethod(get_bootstrap_code)
+
+ def run_server():
+ from py.__.net.pipe.fd import FDInput, FDOutput
+ gw = Gateway(input = FDInput(os.dup(0)),
+ output = FDOutput(os.dup(1)),
+ is_remote = True)
+ # for now, ignore normal I/O
+ fd = os.open('/dev/null', os.O_RDWR)
+ os.dup2(fd, 0)
+ os.dup2(fd, 1)
+ os.close(fd)
+ greensock2.wait(gw.greenlet)
+ run_server = staticmethod(run_server)
+
+class PopenGateway(PopenCmdGateway):
+ def __init__(self, python=sys.executable):
+ cmdline = '"%s" -u -c "%s"' % (python, self.action)
+ super(PopenGateway, self).__init__(cmdline)
+
+class SshGateway(PopenCmdGateway):
+ def __init__(self, sshaddress, remotepython='python', identity=None):
+ self.sshaddress = sshaddress
+ remotecmd = '%s -u -c "%s"' % (remotepython, self.action)
+ cmdline = [sshaddress, remotecmd]
+ # XXX Unix style quoting
+ for i in range(len(cmdline)):
+ cmdline[i] = "'" + cmdline[i].replace("'", "'\\''") + "'"
+ cmd = 'ssh -C'
+ if identity is not None:
+ cmd += ' -i %s' % (identity,)
+ cmdline.insert(0, cmd)
+ super(SshGateway, self).__init__(' '.join(cmdline))
+
+
+##f = open('LOG', 'a')
+##import os; print >> f, '[%d] READY' % (os.getpid(),)
+##f.close()
Added: py/trunk/py/net/greensock2.py
==============================================================================
--- (empty file)
+++ py/trunk/py/net/greensock2.py Tue Mar 6 10:07:53 2007
@@ -0,0 +1,526 @@
+import os, sys
+try:
+ from stackless import greenlet
+except ImportError:
+ import py
+ greenlet = py.magic.greenlet
+from collections import deque
+from select import select as _select
+from time import time as _time
+from heapq import heappush, heappop, heapify
+
+TRACE = True
+
+def meetingpoint():
+ senders = deque() # list of senders, or [None] if Giver closed
+ receivers = deque() # list of receivers, or [None] if Receiver closed
+ return (MeetingPointGiver(senders, receivers),
+ MeetingPointAccepter(senders, receivers))
+
+def producer(func, *args, **kwds):
+ iterable = func(*args, **kwds)
+ giver, accepter = meetingpoint()
+ def autoproducer():
+ try:
+ giver.wait()
+ for obj in iterable:
+ giver.give(obj)
+ giver.wait()
+ finally:
+ giver.close()
+ autogreenlet(autoproducer)
+ return accepter
+
+
+class MeetingPointBase(object):
+
+ def __init__(self, senders, receivers):
+ self.senders = senders
+ self.receivers = receivers
+ self.g_active = g_active
+
+ def close(self):
+ while self.senders:
+ if self.senders[0] is None:
+ break
+ packet = self.senders.popleft()
+ if packet.g_from is not None:
+ self.g_active.append(packet.g_from)
+ else:
+ self.senders.append(None)
+ while self.receivers:
+ if self.receivers[0] is None:
+ break
+ other = self.receivers.popleft()
+ self.g_active.append(other)
+ else:
+ self.receivers.append(None)
+
+ __del__ = close
+
+ def closed(self):
+ return self.receivers and self.receivers[0] is None
+
+
+class MeetingPointGiver(MeetingPointBase):
+
+ def give(self, obj):
+ if self.receivers:
+ if self.receivers[0] is None:
+ raise MeetingPointClosed
+ other = self.receivers.popleft()
+ g_active.append(g_getcurrent())
+ packet = _Packet()
+ packet.payload = obj
+ other.switch(packet)
+ if not packet.accepted:
+ raise Interrupted("packet not accepted")
+ else:
+ packet = _Packet()
+ packet.g_from = g_getcurrent()
+ packet.payload = obj
+ try:
+ self.senders.append(packet)
+ g_dispatcher.switch()
+ if not packet.accepted:
+ raise Interrupted("packet not accepted")
+ except:
+ remove_by_id(self.senders, packet)
+ raise
+
+ def give_queued(self, obj):
+ if self.receivers:
+ self.give(obj)
+ else:
+ packet = _Packet()
+ packet.g_from = None
+ packet.payload = obj
+ self.senders.append(packet)
+
+ def ready(self):
+ return self.receivers and self.receivers[0] is not None
+
+ def wait(self):
+ if self.receivers:
+ if self.receivers[0] is None:
+ raise MeetingPointClosed
+ else:
+ packet = _Packet()
+ packet.g_from = g_getcurrent()
+ packet.empty = True
+ self.senders.append(packet)
+ try:
+ g_dispatcher.switch()
+ if not packet.accepted:
+ raise Interrupted("no accepter found")
+ except:
+ remove_by_id(self.senders, packet)
+ raise
+
+ def trigger(self):
+ if self.ready():
+ self.give(None)
+
+
+class MeetingPointAccepter(MeetingPointBase):
+
+ def accept(self):
+ while self.senders:
+ if self.senders[0] is None:
+ raise MeetingPointClosed
+ packet = self.senders.popleft()
+ packet.accepted = True
+ if packet.g_from is not None:
+ g_active.append(packet.g_from)
+ if not packet.empty:
+ return packet.payload
+ g = g_getcurrent()
+ self.receivers.append(g)
+ try:
+ packet = g_dispatcher.switch()
+ except:
+ remove_by_id(self.receivers, g)
+ raise
+ if type(packet) is not _Packet:
+ remove_by_id(self.receivers, g)
+ raise Interrupted("no packet")
+ packet.accepted = True
+ return packet.payload
+
+ def ready(self):
+ for packet in self.senders:
+ if packet is None:
+ return False
+ if not packet.empty:
+ return True
+ return False
+
+ def wait_trigger(self, timeout=None, default=None):
+ if timeout is None:
+ return self.accept()
+ else:
+ timer = Timer(timeout)
+ try:
+ try:
+ return self.accept()
+ finally:
+ timer.stop()
+ except Interrupted:
+ if timer.finished:
+ return default
+ raise
+
+
+class MeetingPointClosed(greenlet.GreenletExit):
+ pass
+
+class Interrupted(greenlet.GreenletExit):
+ pass
+
+class ConnexionClosed(greenlet.GreenletExit):
+ pass
+
+class _Packet(object):
+ empty = False
+ accepted = False
+
+def remove_by_id(d, obj):
+ lst = [x for x in d if x is not obj]
+ d.clear()
+ d.extend(lst)
+
+# ____________________________________________________________
+
+##class Queue(object):
+
+## def __init__(self):
+## self.giver, self.accepter = meetingpoint()
+## self.pending = deque()
+
+## def put(self, item): # preserve the caller's atomicity
+## self.pending.append(item)
+## if self.accepter.ready():
+## self.accepter.accept()
+
+## def get(self, block=True):
+## if self.pending:
+## return self.pending.popleft()
+## elif block:
+## self.giver.give(None)
+## return self.pending.popleft()
+## else:
+## raise Empty
+
+##class Empty(Interrupted):
+## pass
+
+##class Event(object):
+
+## def __init__(self):
+## self.giver, self.accepter = meetingpoint()
+
+## clear = __init__
+
+## def isSet(self):
+## return self.accepter is None
+
+## def set(self): # preserve the caller's atomicity
+## if self.accepter is not None:
+## accepter = self.accepter
+## self.giver = self.accepter = None
+## while accepter.ready(): # wake up all waiters
+## accepter.accept()
+
+## def wait(self, timeout=None):
+## if self.accepter is not None:
+## if timeout is None:
+## self.giver.give(None)
+## else:
+## timer = Timer(timeout)
+## try:
+## try:
+## self.giver.give(None)
+## except Interrupted:
+## pass
+## finally:
+## timer.stop()
+
+##class Semaphore(object):
+
+## def __init__(self, value=1):
+## self.giver, self.accepter = meetingpoint()
+## for i in range(value):
+## self.release()
+
+## def acquire(self, blocking=True):
+## if blocking or self.accepter.ready():
+## return self.accepter.accept()
+## else:
+## return False
+
+## def release(self):
+## autogreenlet(self.giver.put, True)
+
+# ____________________________________________________________
+
+def wait_input(sock):
+ _register(g_iwtd, sock)
+
+def recv(sock, bufsize):
+ wait_input(sock)
+ buf = sock.recv(bufsize)
+ if not buf:
+ raise ConnexionClosed("inbound connexion closed")
+ return buf
+
+def recvall(sock, bufsize):
+ in_front = False
+ data = []
+ while bufsize > 0:
+ _register(g_iwtd, sock, in_front=in_front)
+ buf = sock.recv(bufsize)
+ if not buf:
+ raise ConnexionClosed("inbound connexion closed")
+ data.append(buf)
+ bufsize -= len(buf)
+ in_front = True
+ return ''.join(data)
+
+def read(fd, bufsize):
+ assert fd >= 0
+ wait_input(fd)
+ buf = os.read(fd, bufsize)
+ if not buf:
+ raise ConnexionClosed("inbound connexion closed")
+ return buf
+
+def readall(fd, bufsize):
+ assert fd >= 0
+ in_front = False
+ data = []
+ while bufsize > 0:
+ _register(g_iwtd, fd, in_front=in_front)
+ buf = os.read(fd, bufsize)
+ if not buf:
+ raise ConnexionClosed("inbound connexion closed")
+ data.append(buf)
+ bufsize -= len(buf)
+ in_front = True
+ return ''.join(data)
+
+
+def wait_output(sock):
+ _register(g_owtd, sock)
+
+def sendall(sock, buffer):
+ in_front = False
+ while buffer:
+ _register(g_owtd, sock, in_front=in_front)
+ count = sock.send(buffer)
+ buffer = buffer[count:]
+ in_front = True
+
+def writeall(fd, buffer):
+ assert fd >= 0
+ in_front = False
+ while buffer:
+ _register(g_owtd, fd, in_front=in_front)
+ count = os.write(fd, buffer)
+ if not count:
+ raise ConnexionClosed("outbound connexion closed")
+ buffer = buffer[count:]
+ in_front = True
+
+
+def sleep(duration, *greenlets):
+ timer = Timer(duration)
+ try:
+ wait(*greenlets)
+ finally:
+ ok = timer.finished
+ timer.stop()
+ if not ok:
+ raise Interrupted
+
+def _wait():
+ g_dispatcher.switch()
+
+def wait(*greenlets):
+ assert greenlets#, "should not wait without events to wait on"
+ current = g_getcurrent()
+ for g in greenlets:
+ if g in g_waiters:
+ g_waiters[g].append(current)
+ else:
+ g_waiters[g] = [current]
+ g_dispatcher.switch()
+
+class Timer(object):
+ started = False
+ finished = False
+
+ def __init__(self, timeout):
+ self.g = g_getcurrent()
+ entry = (_time() + timeout, self)
+ if g_timers_mixed:
+ g_timers.append(entry)
+ else:
+ heappush(g_timers, entry)
+
+ def stop(self):
+ global g_timers_mixed
+ if not self.finished:
+ for i, (activationtime, timer) in enumerate(g_timers):
+ if timer is self:
+ g_timers[i] = g_timers[-1]
+ g_timers.pop()
+ g_timers_mixed = True
+ break
+ self.finished = True
+
+# ____________________________________________________________
+
+class autogreenlet(greenlet):
+ def __init__(self, function, *args, **kwds):
+ self.parent = g_dispatcher
+ self.function = function
+ self.args = args
+ self.kwds = kwds
+ g_active.append(self)
+
+ def run(self):
+ self.trace("start")
+ try:
+ self.function(*self.args, **self.kwds)
+ except Exception, e:
+ self.trace("stop (%s%s)", e.__class__.__name__,
+ str(e) and (': '+str(e)))
+ raise
+ else:
+ self.trace("done")
+
+ def __repr__(self):
+## args = ', '.join([repr(s) for s in self.args] +
+## ['%s=%r' % keyvalue for keyvalue in self.kwds.items()])
+## return '<autogreenlet %s(%s)>' % (self.function.__name__, args)
+ return '<autogreenlet %s at %s>' % (self.function.__name__,
+ hex(id(self)))
+
+ def trace(self, msg, *args):
+ if TRACE:
+ print self, msg % args
+
+ def interrupt(self):
+ self.throw(Interrupted)
+
+
+g_active = deque()
+g_iwtd = {}
+g_owtd = {}
+g_timers = []
+g_timers_mixed = False
+
+g_getcurrent = greenlet.getcurrent
+
+def _register(g_wtd, sock, in_front=False):
+ d = g_wtd.setdefault(sock, deque())
+ g = g_getcurrent()
+ if in_front:
+ d.appendleft(g)
+ else:
+ d.append(g)
+ try:
+ if g_dispatcher.switch() is not g_wtd:
+ raise Interrupted
+ except:
+ remove_by_id(d, g)
+ raise
+
+##def _unregister_timer():
+## ...
+
+
+def check_dead_greenlets(mapping):
+ to_remove = [i for i, v in mapping.items() if not v]
+ for k in to_remove:
+ del mapping[k]
+
+def check_waiters(active):
+ if active in g_waiters:
+ for g in g_waiters[active]:
+ g.switch()
+ del g_waiters[active]
+
+
+def dispatcher_mainloop():
+ global g_timers_mixed
+ while 1:
+ try:
+ while g_active:
+ print 'active:', g_active[0]
+ active = g_active.popleft()
+ active.switch()
+ if active.dead:
+ check_waiters(active)
+ del active
+ if g_timers:
+ if g_timers_mixed:
+ heapify(g_timers)
+ g_timers_mixed = False
+ activationtime, timer = g_timers[0]
+ delay = activationtime - _time()
+ if delay <= 0.0:
+ if timer.started:
+ heappop(g_timers)
+ #print 'timeout:', g
+ timer.finished = True
+ timer.g.switch()
+ if timer.g.dead:
+ check_waiters(timer.g)
+ continue
+ delay = 0.0
+ timer.started = True
+ else:
+ check_dead_greenlets(g_iwtd)
+ check_dead_greenlets(g_owtd)
+ if not (g_iwtd or g_owtd):
+ # nothing to do, switch to the main greenlet
+ g_dispatcher.parent.switch()
+ continue
+ delay = None
+
+ print 'selecting...', g_iwtd.keys(), g_owtd.keys(), delay
+ iwtd, owtd, _ = _select(g_iwtd.keys(), g_owtd.keys(), [], delay)
+ print 'done'
+ for s in owtd:
+ if s in g_owtd:
+ d = g_owtd[s]
+ #print 'owtd:', d[0]
+ g = d.popleft()
+ if not d:
+ try:
+ del g_owtd[s]
+ except KeyError:
+ pass
+ g.switch(g_owtd)
+ if g.dead:
+ check_waiters(g)
+ for s in iwtd:
+ if s in g_iwtd:
+ d = g_iwtd[s]
+ #print 'iwtd:', d[0]
+ g = d.popleft()
+ if not d:
+ try:
+ del g_iwtd[s]
+ except KeyError:
+ pass
+ g.switch(g_iwtd)
+ if g.dead:
+ check_waiters(g)
+ except:
+ import sys
+ g_dispatcher.parent.throw(*sys.exc_info())
+
+g_dispatcher = greenlet(dispatcher_mainloop)
+g_waiters = {}
Added: py/trunk/py/net/msgstruct.py
==============================================================================
--- (empty file)
+++ py/trunk/py/net/msgstruct.py Tue Mar 6 10:07:53 2007
@@ -0,0 +1,29 @@
+from struct import pack, unpack, calcsize
+
+
+def message(tp, *values):
+ strtype = type('')
+ typecodes = ['']
+ for v in values:
+ if type(v) is strtype:
+ typecodes.append('%ds' % len(v))
+ elif 0 <= v < 256:
+ typecodes.append('B')
+ else:
+ typecodes.append('l')
+ typecodes = ''.join(typecodes)
+ assert len(typecodes) < 256
+ return pack(("!B%dsc" % len(typecodes)) + typecodes,
+ len(typecodes), typecodes, tp, *values)
+
+def decodemessage(data):
+ if data:
+ limit = ord(data[0]) + 1
+ if len(data) >= limit:
+ typecodes = "!c" + data[1:limit]
+ end = limit + calcsize(typecodes)
+ if len(data) >= end:
+ return unpack(typecodes, data[limit:end]), data[end:]
+ #elif end > 1000000:
+ # raise OverflowError
+ return None, data
Added: py/trunk/py/net/pipe/__init__.py
==============================================================================
Added: py/trunk/py/net/pipe/common.py
==============================================================================
--- (empty file)
+++ py/trunk/py/net/pipe/common.py Tue Mar 6 10:07:53 2007
@@ -0,0 +1,38 @@
+import greensock2
+from pypeers.tool import log
+
+
+class BufferedInput(object):
+ in_buf = ''
+
+ def recv(self, bufsize):
+ self.wait_input()
+ buf = self.in_buf[:bufsize]
+ self.in_buf = self.in_buf[bufsize:]
+ return buf
+
+ def recvall(self, bufsize):
+ result = []
+ while bufsize > 0:
+ buf = self.recv(bufsize)
+ result.append(buf)
+ bufsize -= len(buf)
+ return ''.join(result)
+
+# ____________________________________________________________
+
+def forwardpipe(s1, s2):
+ try:
+ while 1:
+ s2.wait_output()
+ buffer = s1.recv(32768)
+ log('[%r -> %r] %r', s1, s2, buffer)
+ s2.sendall(buffer)
+ del buffer
+ finally:
+ s2.shutdown_wr()
+ s1.shutdown_rd()
+
+def linkpipes(s1, s2):
+ greensock2.autogreenlet(forwardpipe, s1, s2)
+ greensock2.autogreenlet(forwardpipe, s2, s1)
Added: py/trunk/py/net/pipe/fd.py
==============================================================================
--- (empty file)
+++ py/trunk/py/net/pipe/fd.py Tue Mar 6 10:07:53 2007
@@ -0,0 +1,69 @@
+import os
+from py.__.net import greensock2
+
+
+class FDInput(object):
+
+ def __init__(self, read_fd, close=True):
+ self.read_fd = read_fd
+ self._close = close # a flag or a callback
+
+ def shutdown_rd(self):
+ fd = self.read_fd
+ if fd is not None:
+ self.read_fd = None
+ close = self._close
+ if close:
+ self._close = False
+ if close == True:
+ os.close(fd)
+ else:
+ close()
+
+ __del__ = shutdown_rd
+
+ def wait_input(self):
+ greensock2.wait_input(self.read_fd)
+
+ def recv(self, bufsize):
+## f = open('LOG', 'a')
+## import os; print >> f, '[%d] RECV' % (os.getpid(),)
+## f.close()
+ res = greensock2.read(self.read_fd, bufsize)
+## f = open('LOG', 'a')
+## import os; print >> f, '[%d] RECV %r' % (os.getpid(), res)
+## f.close()
+ return res
+
+ def recvall(self, bufsize):
+ return greensock2.readall(self.read_fd, bufsize)
+
+
+class FDOutput(object):
+
+ def __init__(self, write_fd, close=True):
+ self.write_fd = write_fd
+ self._close = close # a flag or a callback
+
+ def shutdown_wr(self):
+ fd = self.write_fd
+ if fd is not None:
+ self.write_fd = None
+ close = self._close
+ if close:
+ self._close = False
+ if close == True:
+ os.close(fd)
+ else:
+ close()
+
+ __del__ = shutdown_wr
+
+ def wait_output(self):
+ greensock2.wait_output(self.write_fd)
+
+ def sendall(self, buffer):
+## f = open('LOG', 'a')
+## import os; print >> f, '[%d] %r' % (os.getpid(), buffer)
+## f.close()
+ greensock2.writeall(self.write_fd, buffer)
Added: py/trunk/py/net/pipe/gsocket.py
==============================================================================
--- (empty file)
+++ py/trunk/py/net/pipe/gsocket.py Tue Mar 6 10:07:53 2007
@@ -0,0 +1,114 @@
+import greensock2
+import socket, errno, os
+
+error = socket.error
+
+
+class _delegate(object):
+ def __init__(self, methname):
+ self.methname = methname
+ def __get__(self, obj, typ=None):
+ result = getattr(obj._s, self.methname)
+ setattr(obj, self.methname, result)
+ return result
+
+
+class GreenSocket(object):
+
+ def __init__(self, family = socket.AF_INET,
+ type = socket.SOCK_STREAM,
+ proto = 0):
+ self._s = socket.socket(family, type, proto)
+ self._s.setblocking(False)
+
+ def fromsocket(cls, s):
+ if isinstance(s, GreenSocket):
+ s = s._s
+ result = GreenSocket.__new__(cls)
+ result._s = s
+ s.setblocking(False)
+ return result
+ fromsocket = classmethod(fromsocket)
+
+ def accept(self):
+ while 1:
+ try:
+ s, addr = self._s.accept()
+ break
+ except error, e:
+ import pdb;pdb.set_trace()
+ if e.args[0] not in (errno.EAGAIN, errno.EWOULDBLOCK):
+ raise
+ self.wait_input()
+ return self.fromsocket(s), addr
+
+ bind = _delegate("bind")
+ close = _delegate("close")
+
+ def connect(self, addr):
+ err = self.connect_ex(addr)
+ if err:
+ raise error(err, os.strerror(err))
+
+ def connect_ex(self, addr):
+ err = self._s.connect_ex(addr)
+ if err == errno.EINPROGRESS:
+ greensock2.wait_output(self._s)
+ err = self._s.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
+ return err
+
+ #XXX dup
+ fileno = _delegate("fileno")
+ getpeername = _delegate("getpeername")
+ getsockname = _delegate("getsockname")
+ getsockopt = _delegate("getsockopt")
+ listen = _delegate("listen")
+
+ def makefile(self, mode='r', bufsize=-1):
+ # hack, but reusing the internal socket._fileobject should just work
+ return socket._fileobject(self, mode, bufsize)
+
+ def recv(self, bufsize):
+ return greensock2.recv(self._s, bufsize)
+
+ def recvall(self, bufsize):
+ return greensock2.recvall(self._s, bufsize)
+
+ def recvfrom(self, bufsize):
+ self.wait_input()
+ buf, addr = self._s.recvfrom(bufsize)
+ if not buf:
+ raise ConnexionClosed("inbound connexion closed")
+ return buf, addr
+
+ def send(self, data):
+ self.wait_output()
+ return self._s.send(data)
+
+ def sendto(self, data, addr):
+ self.wait_output()
+ return self._s.sendto(data, addr)
+
+ def sendall(self, data):
+ greensock2.sendall(self._s, data)
+
+ setsockopt = _delegate("setsockopt")
+ shutdown = _delegate("shutdown")
+
+ def shutdown_rd(self):
+ try:
+ self._s.shutdown(socket.SHUT_RD)
+ except error:
+ pass
+
+ def shutdown_wr(self):
+ try:
+ self._s.shutdown(socket.SHUT_WR)
+ except error:
+ pass
+
+ def wait_input(self):
+ greensock2.wait_input(self._s)
+
+ def wait_output(self):
+ greensock2.wait_output(self._s)
Added: py/trunk/py/net/pipe/mp.py
==============================================================================
--- (empty file)
+++ py/trunk/py/net/pipe/mp.py Tue Mar 6 10:07:53 2007
@@ -0,0 +1,29 @@
+from pypeers.stream.common import BufferedInput
+
+
+class MeetingPointInput(BufferedInput):
+
+ def __init__(self, accepter):
+ self.accepter = accepter
+
+ def wait_input(self):
+ while not self.in_buf:
+ self.in_buf = self.accepter.accept()
+
+ def shutdown_rd(self):
+ self.accepter.close()
+
+
+class MeetingPointOutput(BufferedInput):
+
+ def __init__(self, giver):
+ self.giver = giver
+
+ def wait_output(self):
+ self.giver.wait()
+
+ def sendall(self, buffer):
+ self.giver.give(buffer)
+
+ def shutdown_wr(self):
+ self.giver.close()
Added: py/trunk/py/net/pipelayer.py
==============================================================================
--- (empty file)
+++ py/trunk/py/net/pipelayer.py Tue Mar 6 10:07:53 2007
@@ -0,0 +1,306 @@
+#import os
+import struct
+from collections import deque
+
+
+class InvalidPacket(Exception):
+ pass
+
+
+FLAG_NAK1 = 0xE0
+FLAG_NAK = 0xE1
+FLAG_REG = 0xE2
+FLAG_CFRM = 0xE3
+
+FLAG_RANGE_START = 0xE0
+FLAG_RANGE_STOP = 0xE4
+
+max_old_packets = 256 # must be <= 256
+
+
+class PipeLayer(object):
+ timeout = 1
+ headersize = 4
+
+ def __init__(self):
+ #self.localid = os.urandom(4)
+ #self.remoteid = None
+ self.cur_time = 0
+ self.out_queue = deque()
+ self.out_nextseqid = 0
+ self.out_nextrepeattime = None
+ self.in_nextseqid = 0
+ self.in_outoforder = {}
+ self.out_oldpackets = deque()
+ self.out_flags = FLAG_REG
+ self.out_resend = 0
+ self.out_resend_skip = False
+
+ def queue(self, data):
+ if data:
+ self.out_queue.appendleft(data)
+
+ def queue_size(self):
+ total = 0
+ for data in self.out_queue:
+ total += len(data)
+ return total
+
+ def in_sync(self):
+ return not self.out_queue and self.out_nextrepeattime is None
+
+ def settime(self, curtime):
+ self.cur_time = curtime
+ if self.out_queue:
+ if len(self.out_oldpackets) < max_old_packets:
+ return 0 # more data to send now
+ if self.out_nextrepeattime is not None:
+ return max(0, self.out_nextrepeattime - curtime)
+ else:
+ return None
+
+ def encode(self, maxlength):
+ #print ' '*self._dump_indent, '--- OUTQ', self.out_resend, self.out_queue
+ if len(self.out_oldpackets) >= max_old_packets:
+ # congestion, stalling
+ payload = 0
+ else:
+ payload = maxlength - 4
+ if payload <= 0:
+ raise ValueError("encode(): buffer too small")
+ if (self.out_nextrepeattime is not None and
+ self.out_nextrepeattime <= self.cur_time):
+ # no ACK received so far, send a packet (possibly empty)
+ if not self.out_queue:
+ payload = 0
+ else:
+ if not self.out_queue: # no more data to send
+ return None
+ if payload == 0: # congestion
+ return None
+ # prepare a packet
+ seqid = self.out_nextseqid
+ flags = self.out_flags
+ self.out_flags = FLAG_REG # clear out the flags for the next time
+ if payload > 0:
+ self.out_nextseqid = (seqid + 1) & 0xFFFF
+ data = self.out_queue.pop()
+ packetlength = len(data)
+ if self.out_resend > 0:
+ if packetlength > payload:
+ raise ValueError("XXX need constant buffer size for now")
+ self.out_resend -= 1
+ if self.out_resend_skip:
+ if self.out_resend > 0:
+ self.out_queue.pop()
+ self.out_resend -= 1
+ self.out_nextseqid = (seqid + 2) & 0xFFFF
+ self.out_resend_skip = False
+ packetpayload = data
+ else:
+ packet = []
+ while packetlength <= payload:
+ packet.append(data)
+ if not self.out_queue:
+ break
+ data = self.out_queue.pop()
+ packetlength += len(data)
+ else:
+ rest = len(data) + payload - packetlength
+ packet.append(data[:rest])
+ self.out_queue.append(data[rest:])
+ packetpayload = ''.join(packet)
+ self.out_oldpackets.appendleft(packetpayload)
+ #print ' '*self._dump_indent, '--- OLDPK', self.out_oldpackets
+ else:
+ # a pure ACK packet, no payload
+ if self.out_oldpackets and flags == FLAG_REG:
+ flags = FLAG_CFRM
+ packetpayload = ''
+ packet = struct.pack("!BBH", flags,
+ self.in_nextseqid & 0xFF,
+ seqid) + packetpayload
+ if self.out_oldpackets:
+ self.out_nextrepeattime = self.cur_time + self.timeout
+ else:
+ self.out_nextrepeattime = None
+ #self.dump('OUT', packet)
+ return packet
+
+ def decode(self, rawdata):
+ if len(rawdata) < 4:
+ raise InvalidPacket
+ #print ' '*self._dump_indent, '------ out %d (+%d) in %d' % (self.out_nextseqid, self.out_resend, self.in_nextseqid)
+ #self.dump('IN ', rawdata)
+ in_flags, ack_seqid, in_seqid = struct.unpack("!BBH", rawdata[:4])
+ if not (FLAG_RANGE_START <= in_flags < FLAG_RANGE_STOP):
+ raise InvalidPacket
+ in_diff = (in_seqid - self.in_nextseqid ) & 0xFFFF
+ ack_diff = (self.out_nextseqid + self.out_resend - ack_seqid) & 0xFF
+ if in_diff >= max_old_packets:
+ return '' # invalid, but can occur as a late repetition
+ if ack_diff != len(self.out_oldpackets):
+ # forget all acknowledged packets
+ if ack_diff > len(self.out_oldpackets):
+ return '' # invalid, but can occur with packet reordering
+ while len(self.out_oldpackets) > ack_diff:
+ #print ' '*self._dump_indent, '--- POP', repr(self.out_oldpackets[-1])
+ self.out_oldpackets.pop()
+ if self.out_oldpackets:
+ self.out_nextrepeattime = self.cur_time + self.timeout
+ else:
+ self.out_nextrepeattime = None # all packets ACKed
+ if in_flags == FLAG_NAK or in_flags == FLAG_NAK1:
+ # this is a NAK: resend the old packets as far as they've not
+ # also been ACK'ed in the meantime (can occur with reordering)
+ while self.out_resend < len(self.out_oldpackets):
+ self.out_queue.append(self.out_oldpackets[self.out_resend])
+ self.out_resend += 1
+ self.out_nextseqid = (self.out_nextseqid - 1) & 0xFFFF
+ #print ' '*self._dump_indent, '--- REP', self.out_nextseqid, repr(self.out_queue[-1])
+ self.out_resend_skip = in_flags == FLAG_NAK1
+ elif in_flags == FLAG_CFRM:
+ # this is a CFRM: request for confirmation
+ self.out_nextrepeattime = self.cur_time
+ # receive this packet's payload if it is the next in the sequence
+ if in_diff == 0:
+ if len(rawdata) > 4:
+ #print ' '*self._dump_indent, 'RECV ', self.in_nextseqid, repr(rawdata[4:])
+ self.in_nextseqid = (self.in_nextseqid + 1) & 0xFFFF
+ result = [rawdata[4:]]
+ while self.in_nextseqid in self.in_outoforder:
+ result.append(self.in_outoforder.pop(self.in_nextseqid))
+ self.in_nextseqid = (self.in_nextseqid + 1) & 0xFFFF
+ return ''.join(result)
+ else:
+ # we missed at least one intermediate packet: send a NAK
+ if len(rawdata) > 4:
+ self.in_outoforder[in_seqid] = rawdata[4:]
+ if ((self.in_nextseqid + 1) & 0xFFFF) in self.in_outoforder:
+ self.out_flags = FLAG_NAK1
+ else:
+ self.out_flags = FLAG_NAK
+ self.out_nextrepeattime = self.cur_time
+ return ''
+
+ _dump_indent = 0
+ def dump(self, dir, rawdata):
+ in_flags, ack_seqid, in_seqid = struct.unpack("!BBH", rawdata[:4])
+ print ' ' * self._dump_indent, dir,
+ if in_flags == FLAG_NAK:
+ print 'NAK',
+ elif in_flags == FLAG_NAK1:
+ print 'NAK1',
+ elif in_flags == FLAG_CFRM:
+ print 'CFRM',
+ #print ack_seqid, in_seqid, '(%d bytes)' % (len(rawdata)-4,)
+ print ack_seqid, in_seqid, repr(rawdata[4:])
+
+
+def pipe_over_udp(udpsock, send_fd=-1, recv_fd=-1,
+ timeout=1.0, inactivity_timeout=None):
+ """Example: send all data showing up in send_fd over the given UDP
+ socket, and write incoming data into recv_fd. The send_fd and
+ recv_fd are plain file descriptors. When an EOF is read from
+ send_fd, this function returns (after making sure that all data was
+ received by the remote side).
+ """
+ import os
+ from select import select
+ from time import time
+ p = PipeLayer()
+ p.timeout = timeout
+ iwtdlist = [udpsock]
+ if send_fd >= 0:
+ iwtdlist.append(send_fd)
+ running = True
+ while running or not p.in_sync():
+ delay = delay1 = p.settime(time())
+ if delay is None:
+ delay = inactivity_timeout
+ iwtd, owtd, ewtd = select(iwtdlist, [], [], delay)
+ if iwtd:
+ if send_fd in iwtd:
+ data = os.read(send_fd, 1500 - p.headersize)
+ if not data:
+ # EOF
+ iwtdlist.remove(send_fd)
+ running = False
+ else:
+ #print 'queue', len(data)
+ p.queue(data)
+ if udpsock in iwtd:
+ packet = udpsock.recv(65535)
+ #print 'decode', len(packet)
+ p.settime(time())
+ data = p.decode(packet)
+ i = 0
+ while i < len(data):
+ i += os.write(recv_fd, data[i:])
+ elif delay1 is None:
+ break # long inactivity
+ p.settime(time())
+ packet = p.encode(1500)
+ if packet:
+ #print 'send', len(packet)
+ #if os.urandom(1) >= '\x08': # emulate packet losses
+ udpsock.send(packet)
+
+
+class PipeOverUdp(object):
+
+ def __init__(self, udpsock, timeout=1.0):
+ import thread, os
+ self.os = os
+ self.sendpipe = os.pipe()
+ self.recvpipe = os.pipe()
+ thread.start_new_thread(pipe_over_udp, (udpsock,
+ self.sendpipe[0],
+ self.recvpipe[1],
+ timeout))
+
+ def __del__(self):
+ os = self.os
+ if self.sendpipe:
+ os.close(self.sendpipe[0])
+ os.close(self.sendpipe[1])
+ self.sendpipe = None
+ if self.recvpipe:
+ os.close(self.recvpipe[0])
+ os.close(self.recvpipe[1])
+ self.recvpipe = None
+
+ close = __del__
+
+ def send(self, data):
+ if not self.sendpipe:
+ raise IOError("I/O operation on a closed PipeOverUdp")
+ return self.os.write(self.sendpipe[1], data)
+
+ def sendall(self, data):
+ i = 0
+ while i < len(data):
+ i += self.send(data[i:])
+
+ def recv(self, bufsize):
+ if not self.recvpipe:
+ raise IOError("I/O operation on a closed PipeOverUdp")
+ return self.os.read(self.recvpipe[0], bufsize)
+
+ def recvall(self, bufsize):
+ buf = []
+ while bufsize > 0:
+ data = self.recv(bufsize)
+ buf.append(data)
+ bufsize -= len(data)
+ return ''.join(buf)
+
+ def fileno(self):
+ if not self.recvpipe:
+ raise IOError("I/O operation on a closed PipeOverUdp")
+ return self.recvpipe[0]
+
+ def ofileno(self):
+ if not self.sendpipe:
+ raise IOError("I/O operation on a closed PipeOverUdp")
+ return self.sendpipe[1]
Added: py/trunk/py/net/server/__init__.py
==============================================================================
Added: py/trunk/py/net/server/httpserver.py
==============================================================================
--- (empty file)
+++ py/trunk/py/net/server/httpserver.py Tue Mar 6 10:07:53 2007
@@ -0,0 +1,42 @@
+import BaseHTTPServer
+from pypeers import greensock2
+from pypeers.pipe.gsocket import GreenSocket
+
+
+class GreenMixIn:
+ """Mix-in class to handle each request in a new greenlet."""
+
+ def process_request_greenlet(self, request, client_address):
+ """Same as in BaseServer but as a greenlet.
+ In addition, exception handling is done here.
+ """
+ try:
+ self.finish_request(request, client_address)
+ self.close_request(request)
+ except:
+ self.handle_error(request, client_address)
+ self.close_request(request)
+
+ def process_request(self, request, client_address):
+ """Start a new greenlet to process the request."""
+ greensock2.autogreenlet(self.process_request_greenlet,
+ request, client_address)
+
+
+class GreenHTTPServer(GreenMixIn, BaseHTTPServer.HTTPServer):
+ protocol_version = "HTTP/1.1"
+
+ def server_bind(self):
+ self.socket = GreenSocket.fromsocket(self.socket)
+ BaseHTTPServer.HTTPServer.server_bind(self)
+
+
+def test_simple(handler_class=None):
+ if handler_class is None:
+ from SimpleHTTPServer import SimpleHTTPRequestHandler
+ handler_class = SimpleHTTPRequestHandler
+ server_address = ('', 8000)
+ httpd = GreenHTTPServer(server_address, handler_class)
+ sa = httpd.socket.getsockname()
+ print "Serving HTTP on", sa[0], "port", sa[1], "..."
+ httpd.serve_forever()
Added: py/trunk/py/net/test/test_greenexecnet.py
==============================================================================
--- (empty file)
+++ py/trunk/py/net/test/test_greenexecnet.py Tue Mar 6 10:07:53 2007
@@ -0,0 +1,41 @@
+import py
+from py.__.net.greenexecnet import *
+
+def test_simple():
+ gw = PopenGateway()
+ channel = gw.remote_exec("x = channel.receive(); channel.send(x * 6)")
+ channel.send(7)
+ res = channel.receive()
+ assert res == 42
+
+def test_ssh():
+ py.test.skip("Bootstrapping")
+ gw = SshGateway('codespeak.net')
+ channel = gw.remote_exec("""
+ import socket
+ channel.send(socket.gethostname())
+ """)
+ res = channel.receive()
+ assert res.endswith('codespeak.net')
+
+def test_remote_error():
+ gw = PopenGateway()
+ channel = gw.remote_exec("x = channel.receive(); channel.send(x + 1)")
+ channel.send("hello")
+ py.test.raises(RemoteError, channel.receive)
+
+def test_invalid_object():
+ class X(object):
+ pass
+ gw = PopenGateway()
+ channel = gw.remote_exec("x = channel.receive(); channel.send(x + 1)")
+ channel.send(X())
+ py.test.raises(RemoteError, channel.receive)
+
+def test_channel_over_channel():
+ gw = PopenGateway()
+ chan1 = gw.newchannel()
+ channel = gw.remote_exec("chan1 = channel.receive(); chan1.send(42)")
+ channel.send(chan1)
+ res = chan1.receive()
+ assert res == 42
Added: py/trunk/py/net/test/test_greensock2.py
==============================================================================
--- (empty file)
+++ py/trunk/py/net/test/test_greensock2.py Tue Mar 6 10:07:53 2007
@@ -0,0 +1,213 @@
+import py
+from socket import *
+from py.__.net.greensock2 import *
+
+def test_meetingpoint():
+ giv1, acc1 = meetingpoint()
+ giv2, acc2 = meetingpoint()
+ giv3, acc3 = meetingpoint()
+
+ lst = []
+
+ def g1():
+ lst.append(0)
+ x = acc2.accept()
+ assert x == 'hello'
+ lst.append(2)
+ giv1.give('world')
+ lst.append(5)
+ x = acc3.accept()
+ assert x == 'middle'
+ lst.append(6)
+ giv3.give('done')
+
+ def g2():
+ lst.append(1)
+ giv2.give('hello')
+ lst.append(3)
+ y = acc1.accept()
+ assert y == 'world'
+ lst.append(4)
+
+ autogreenlet(g1)
+ autogreenlet(g2)
+ giv3.give('middle')
+ tag = acc3.accept()
+ assert tag == 'done'
+ assert lst == [0, 1, 2, 3, 4, 5, 6]
+
+
+def test_producer():
+ lst = []
+
+ def prod(n):
+ lst.append(1)
+ yield n
+ lst.append(2)
+ yield 87
+ lst.append(3)
+
+ def cons():
+ lst.append(4)
+ accepter = producer(prod, 145)
+ lst.append(5)
+ lst.append(accepter.accept())
+ lst.append(6)
+ lst.append(accepter.accept())
+ lst.append(7)
+ try:
+ accepter.accept()
+ except Interrupted:
+ lst.append(8)
+
+ g = autogreenlet(cons)
+ wait(g)
+ assert lst == [4, 5, 1, 145, 6, 2, 87, 7, 3, 8]
+
+
+def test_timer():
+ lst = []
+
+ def g1():
+ sleep(0.1, g_1)
+ lst.append(1)
+ sleep(0.2, g_1)
+ lst.append(3)
+
+ def g2():
+ lst.append(0)
+ sleep(0.2, g_2)
+ lst.append(2)
+ sleep(0.2, g_2)
+ lst.append(4)
+
+ g_1 = autogreenlet(g1)
+ g_2 = autogreenlet(g2)
+ wait(g_1)
+ wait(g_2)
+ assert lst == [0, 1, 2, 3, 4]
+
+
+def test_socket():
+ s1 = socket(AF_INET, SOCK_DGRAM)
+ s2 = socket(AF_INET, SOCK_DGRAM)
+ s1.bind(('', INADDR_ANY))
+ s2.bind(('', INADDR_ANY))
+ s1.connect(s2.getsockname())
+ s2.connect(s1.getsockname())
+
+ lst = []
+
+ def g1():
+ lst.append(0)
+ x = recv(s1, 5)
+ assert x == 'hello'
+ lst.append(3)
+ sendall(s1, 'world')
+ lst.append(4)
+
+ def g2():
+ lst.append(1)
+ sendall(s2, 'hello')
+ lst.append(2)
+ y = recv(s2, 5)
+ assert y == 'world'
+ lst.append(5)
+
+ g_1 = autogreenlet(g1)
+ g_2 = autogreenlet(g2)
+ wait(g_1)
+ wait(g_2)
+ assert lst == [0, 1, 2, 3, 4, 5]
+
+
+##def test_Queue():
+
+## def g1():
+## lst.append(5)
+## q.put(6)
+## lst.append(7)
+## q.put(8)
+## lst.append(9)
+## q.put(10)
+## lst.append(11)
+## q.put(12) # not used
+
+## def g2():
+## lst.append(1)
+## lst.append(q.get())
+## lst.append(2)
+## lst.append(q.get())
+## lst.append(3)
+## lst.append(q.get())
+## lst.append(4)
+
+## q = Queue()
+## lst = []
+## autogreenlet(g1)
+## autogreenlet(g2)
+## wait()
+## assert lst == [5, 7, 9, 11, 1, 6, 2, 8, 3, 10, 4]
+
+## q = Queue()
+## lst = []
+## autogreenlet(g2)
+## autogreenlet(g1)
+## wait()
+## assert lst == [1, 5, 7, 9, 11, 6, 2, 8, 3, 10, 4]
+
+
+##def test_Event():
+
+## def g1():
+## assert not e.isSet()
+## e.wait()
+## assert not e.isSet() # no longer set
+## lst.append(1)
+## e.set()
+## e.wait()
+## lst.append(2)
+## assert e.isSet()
+## e.clear()
+## assert not e.isSet()
+## lst.append(0)
+## e.set()
+## lst.append(3)
+## assert e.isSet()
+
+## def g2():
+## assert not e.isSet()
+## lst.append(4)
+## e.set()
+## lst.append(7)
+## e.clear()
+## e.set()
+## e.clear()
+## assert not e.isSet()
+## lst.append(5)
+## e.wait()
+## assert e.isSet()
+## lst.append(6)
+
+## e = Event()
+## lst = []
+## autogreenlet(g1)
+## autogreenlet(g2)
+## wait()
+## assert lst == [4, 7, 5, 1, 2, 0, 3, 6]
+
+
+##def test_Event_timeout():
+## def g1():
+## lst.append(5)
+## e.wait(0.1)
+## lst.append(e.isSet())
+## e.wait(60.0)
+## lst.append(e.isSet())
+## lst = []
+## e = Event()
+## autogreenlet(g1)
+## sleep(0.5)
+## e.set()
+## wait()
+## assert lst == [5, False, True]
Added: py/trunk/py/net/test/test_pipelayer.py
==============================================================================
--- (empty file)
+++ py/trunk/py/net/test/test_pipelayer.py Tue Mar 6 10:07:53 2007
@@ -0,0 +1,215 @@
+import os, random
+from py.__.net.pipelayer import PipeLayer, pipe_over_udp, PipeOverUdp
+
+def test_simple():
+ data1 = os.urandom(1000)
+ data2 = os.urandom(1000)
+ p1 = PipeLayer()
+ p2 = PipeLayer()
+ p2._dump_indent = 40
+ p1.queue(data1)
+ p1.queue(data2)
+ recv = ''
+ while len(recv) < 2000:
+ raw = p1.encode(64)
+ assert raw is not None
+ res = p2.decode(raw)
+ assert res is not None
+ recv += res
+ assert recv == data1 + data2
+ raw = p1.encode(64)
+ assert raw is None
+
+def test_stabilize():
+ data1 = os.urandom(28)
+ p1 = PipeLayer()
+ p2 = PipeLayer()
+ p2._dump_indent = 40
+ p1.queue(data1)
+ recv = ''
+ t = 0.0
+ print
+ while True:
+ delay1 = p1.settime(t)
+ delay2 = p2.settime(t)
+ t += 0.100000001
+ if delay1 is delay2 is None:
+ break
+ if delay1 == 0:
+ raw = p1.encode(10)
+ p1.dump('OUT', raw)
+ assert raw is not None
+ res = p2.decode(raw)
+ assert res is not None
+ recv += res
+ if delay2 == 0:
+ raw = p2.encode(10)
+ p2.dump('OUT', raw)
+ assert raw is not None
+ res = p1.decode(raw)
+ assert res == ''
+ assert recv == data1
+
+def test_bidir():
+ data1 = os.urandom(1000)
+ data2 = os.urandom(1000)
+ p1 = PipeLayer()
+ p2 = PipeLayer()
+ p2._dump_indent = 40
+ p1.queue(data1)
+ p2.queue(data2)
+ recv = ['', '']
+ while len(recv[0]) < 1000 or len(recv[1]) < 1000:
+ progress = False
+ for (me, other, i) in [(p1, p2, 1), (p2, p1, 0)]:
+ raw = me.encode(64)
+ if raw is not None:
+ res = other.decode(raw)
+ assert res is not None
+ recv[i] += res
+ if res:
+ progress = True
+ assert progress
+ assert recv[0] == data2
+ assert recv[1] == data1
+ raw = p1.encode(64)
+ assert raw is None
+ raw = p2.encode(64)
+ assert raw is None
+
+def test_with_loss():
+ data1 = os.urandom(10000).encode('hex')
+ data2 = os.urandom(10000).encode('hex')
+ #data1 = '0123456789'
+ #data2 = 'ABCDEFGHIJ'
+ p1 = PipeLayer()
+ p2 = PipeLayer()
+ p2._dump_indent = 40
+ p1.queue(data1)
+ p2.queue(data2)
+ recv = ['', '']
+ time = 0
+ active = 1
+ while active:
+ active = 0
+ time += 0.2
+ #print '.'
+ exchange = []
+ for (me, other, i) in [(p1, p2, 1), (p2, p1, 0)]:
+ to = me.settime(time)
+ packet = me.encode(12)
+ assert (packet is not None) == (to == 0)
+ if to is not None:
+ active = 1
+ if to == 0:
+ exchange.append((packet, other, i))
+ for (packet, other, i) in exchange:
+ if random.random() < 0.5:
+ pass # drop packet
+ else:
+ res = other.decode(packet)
+ assert res is not None
+ recv[i] += res
+ assert data2.startswith(recv[0])
+ assert data1.startswith(recv[1])
+ assert recv[0] == data2
+ assert recv[1] == data1
+ print time
+
+def test_massive_reordering():
+ data1 = os.urandom(10000).encode('hex')
+ data2 = os.urandom(10000).encode('hex')
+ #data1 = '0123456789'
+ #data2 = 'ABCDEFGHIJ'
+ p1 = PipeLayer()
+ p2 = PipeLayer()
+ p2._dump_indent = 40
+ p1.queue(data1)
+ p2.queue(data2)
+ recv = ['', '']
+ time = 0
+ active = 1
+ exchange = []
+ while active or exchange:
+ active = 0
+ time += 0.2
+ #print '.'
+ for (me, other, i) in [(p1, p2, 1), (p2, p1, 0)]:
+ to = me.settime(time)
+ packet = me.encode(12)
+ assert (packet is not None) == (to == 0)
+ if to is not None:
+ active = 1
+ if to == 0:
+ exchange.append((packet, other, i))
+ if random.random() < 0.02:
+ random.shuffle(exchange)
+ for (packet, other, i) in exchange:
+ res = other.decode(packet)
+ assert res is not None
+ recv[i] += res
+ exchange = []
+ assert data2.startswith(recv[0])
+ assert data1.startswith(recv[1])
+ assert recv[0] == data2
+ assert recv[1] == data1
+ print time
+
+def udpsockpair():
+ from socket import socket, AF_INET, SOCK_DGRAM, INADDR_ANY
+ s1 = socket(AF_INET, SOCK_DGRAM)
+ s2 = socket(AF_INET, SOCK_DGRAM)
+ s1.bind(('127.0.0.1', INADDR_ANY))
+ s2.bind(('127.0.0.1', INADDR_ANY))
+ s2.connect(s1.getsockname())
+ s1.connect(s2.getsockname())
+ return s1, s2
+
+def test_pipe_over_udp():
+ import thread
+ s1, s2 = udpsockpair()
+
+ fd1 = os.open(__file__, os.O_RDONLY)
+ fd2 = os.open('test_pipelayer.py~copy', os.O_WRONLY|os.O_CREAT|os.O_TRUNC)
+
+ thread.start_new_thread(pipe_over_udp, (s1, fd1))
+ pipe_over_udp(s2, recv_fd=fd2, inactivity_timeout=2.5)
+ os.close(fd1)
+ os.close(fd2)
+ f = open(__file__, 'rb')
+ data1 = f.read()
+ f.close()
+ f = open('test_pipelayer.py~copy', 'rb')
+ data2 = f.read()
+ f.close()
+ assert data1 == data2
+ os.unlink('test_pipelayer.py~copy')
+
+def test_PipeOverUdp():
+ s1, s2 = udpsockpair()
+ p1 = PipeOverUdp(s1, timeout=0.2)
+ p2 = PipeOverUdp(s2, timeout=0.2)
+ p2.sendall('goodbye')
+ for k in range(10):
+ p1.sendall('hello world')
+ input = p2.recvall(11)
+ assert input == 'hello world'
+ input = p1.recvall(7)
+ assert input == 'goodbye'
+
+ bigchunk1 = os.urandom(500000)
+ bigchunk2 = os.urandom(500000)
+ i1 = i2 = 0
+ j1 = j2 = 0
+ while j1 < len(bigchunk1) or j2 < len(bigchunk2):
+ i1 += p1.send(bigchunk1[i1:i1+512])
+ i2 += p2.send(bigchunk2[i2:i2+512])
+ data = p1.recv(512)
+ assert data == bigchunk2[j2:j2+len(data)]
+ j2 += len(data)
+ data = p2.recv(512)
+ assert data == bigchunk1[j1:j1+len(data)]
+ j1 += len(data)
+ #print i1, i2, j1, j2
+ p1.close()
+ p2.close()
More information about the py-svn
mailing list