[py-svn] r35804 - in py/dist/py/magic: . testing

stephan at codespeak.net stephan at codespeak.net
Fri Dec 15 15:53:59 CET 2006


Author: stephan
Date: Fri Dec 15 15:53:58 2006
New Revision: 35804

Added:
   py/dist/py/magic/coroutine.py
   py/dist/py/magic/testing/test_coroutine.py
Log:
coroutine is a thin experimental wrapper around greenlets

Added: py/dist/py/magic/coroutine.py
==============================================================================
--- (empty file)
+++ py/dist/py/magic/coroutine.py	Fri Dec 15 15:53:58 2006
@@ -0,0 +1,76 @@
+"""
+This is a thin wrapper around the greenlet module to provide the
+coroutine interface as used in pypy.
+
+>>> from py.magic import coroutine
+>>> def f(txt, coro=None):
+...     print txt,
+...     if coro is not None:
+...             coro.switch()
+...
+>>> coro1 = coroutine()
+>>> coro2 = coroutine()
+>>> coro1.bind(f,'hello',coro2)
+>>> coro2.bind(f,'world')
+>>> coro1.switch()
+hello world
+>>>
+"""
+
+try:
+    from functools import partial
+except ImportError: # we are not running python 2.5
+    class partial(object):
+        # just enough of 'partial' to be usefull
+        def __init__(self, func, *argl, **argd):
+            self.func = func
+            self.argl = argl
+            self.argd = argd
+
+        def __call__(self):
+            return self.func(*self.argl, **self.argd)
+
+from py.magic import greenlet
+
+reg = {}
+
+class coroutine(object):
+
+    def __init__(self):
+        self._frame = greenlet()
+        reg[self._frame] = self
+
+    def __getattr__(self, attr):
+        return getattr(self._frame, attr)
+
+    def bind(self, func, *argl, **argd):
+        if self._frame.dead:
+            self._frame = greenlet()
+        if hasattr(self._frame, 'run') and self._frame.run:
+            raise ValueError("cannot bind a bound coroutine")
+        self._frame.run = partial(func, *argl, **argd)
+
+    def switch(self):
+        return greenlet.switch(self._frame)
+
+    def kill(self):
+        self._frame.throw()
+
+    @property
+    def is_zombie(self):
+        return self._frame.dead
+
+    @property
+    def is_alive(self):
+        return not self._frame.dead
+
+    @staticmethod
+    def getcurrent():
+        return reg[greenlet.getcurrent()]
+
+maincoro = coroutine()
+maingreenlet = greenlet.getcurrent()
+maincoro._frame = maingreenlet
+reg[maingreenlet] = maincoro
+del maincoro
+del maingreenlet

Added: py/dist/py/magic/testing/test_coroutine.py
==============================================================================
--- (empty file)
+++ py/dist/py/magic/testing/test_coroutine.py	Fri Dec 15 15:53:58 2006
@@ -0,0 +1,92 @@
+from py.magic import coroutine
+
+from py.test import skip, raises
+
+class Test_Coroutine:
+
+    def test_is_zombie(self):
+        co = coroutine()
+        def f():
+            print 'in coro'
+        co.bind(f)
+        assert not co.is_zombie
+
+    def test_raise_propagate(self):
+        co = coroutine()
+        def f():
+            return 1/0
+        co.bind(f)
+        try:
+            co.switch()
+        except ZeroDivisionError:
+            pass
+        else:
+            raise AssertionError("exception not propagated")
+
+    def test_strange_test(self):
+        def f():
+            return 42
+        def create():
+            b = coroutine()
+            b.bind(f)
+            b.switch()
+            return b
+        a = coroutine()
+        a.bind(create)
+        b = a.switch()
+        def nothing():
+            pass
+        a.bind(nothing)
+        def kill():
+            a.kill()
+        b.bind(kill)
+        b.switch()
+
+    def test_kill(self):
+        co = coroutine()
+        def f():
+            pass
+        co.bind(f)
+        assert co.is_alive
+        co.kill()
+        assert not co.is_alive
+
+    def test_bogus_bind(self):
+        co = coroutine()
+        def f():
+            pass
+        co.bind(f)
+        raises(ValueError, co.bind, f)
+
+    def test_simple_task(self):
+        maintask = coroutine.getcurrent()
+        def f():pass
+        co = coroutine()
+        co.bind(f)
+        co.switch()
+        assert not co.is_alive
+        assert maintask is coroutine.getcurrent()
+
+    def test_backto_main(self):
+        maintask = coroutine.getcurrent()
+        def f(task):
+            task.switch()
+        co = coroutine()
+        co.bind(f,maintask)
+        co.switch()
+
+    def test_wrapped_main(self):
+        class mwrap(object):
+            def __init__(self, coro):
+                self._coro = coro
+
+            def __getattr__(self, attr):
+                return getattr(self._coro, attr)
+
+        maintask = mwrap(coroutine.getcurrent())
+        def f(task):
+            task.switch()
+        co = coroutine()
+        co.bind(f,maintask)
+        co.switch()
+


More information about the py-svn mailing list