[py-svn] r40000 - in py/branch/lessthread/py/execnet: . testing

hpk at codespeak.net hpk at codespeak.net
Tue Mar 6 22:06:32 CET 2007


Author: hpk
Date: Tue Mar  6 22:06:30 2007
New Revision: 40000

Modified:
   py/branch/lessthread/py/execnet/gateway.py
   py/branch/lessthread/py/execnet/testing/test_gateway.py
Log:
re-adding thread support as a method "remote_init_threads" 
on top of the normal (by default non-thread) mechanisms


Modified: py/branch/lessthread/py/execnet/gateway.py
==============================================================================
--- py/branch/lessthread/py/execnet/gateway.py	(original)
+++ py/branch/lessthread/py/execnet/gateway.py	Tue Mar  6 22:06:30 2007
@@ -27,6 +27,8 @@
 debug = 0 # open('/tmp/execnet-debug-%d' % os.getpid()  , 'wa')
 
 sysex = (KeyboardInterrupt, SystemExit)
+class StopExecLoop(Exception):
+    pass
 
 class Gateway(object):
     _ThreadOut = ThreadOut 
@@ -156,12 +158,32 @@
                 if item is None:
                     self._stopsend()
                     break
-                self._executetask(item) # could be done in an exec thread 
+                stop = self._executetask(item) # could be done in an exec thread 
+                if stop:
+                    break
         finally:
             self._trace("_servemain finished") 
         if self.joining:
             self.join()
 
+    def remote_init_threads(self, num=None):
+        """ start up to 'num' threads for subsequent 
+            remote_exec() invocations to allow concurrent
+            execution. 
+        """
+        from py.__.thread import pool
+        source = py.code.Source(pool, """
+            execpool = WorkerPool(maxthreads=%r)
+            gw = channel.gateway
+            while 1:
+                task = gw._requestqueue.get()
+                if task is None:
+                    execpool.shutdown()
+                    raise StopExecLoop
+                execpool.dispatch(gw._executetask, task)
+        """ % num)
+        self.remote_exec(source)
+
     def _executetask(self, item):
         """ execute channel/source items. """
         from sys import exc_info
@@ -179,6 +201,9 @@
                 self._trace("execution finished:", repr(source)[:50])
         except (KeyboardInterrupt, SystemExit):
             pass 
+        except StopExecLoop:
+            channel.close()
+            return True
         except:
             excinfo = exc_info()
             l = traceback.format_exception(*excinfo)

Modified: py/branch/lessthread/py/execnet/testing/test_gateway.py
==============================================================================
--- py/branch/lessthread/py/execnet/testing/test_gateway.py	(original)
+++ py/branch/lessthread/py/execnet/testing/test_gateway.py	Tue Mar  6 22:06:30 2007
@@ -485,3 +485,19 @@
             # now it did
             py.test.raises(IOError, gw.remote_exec, "...")
 
+def test_threads():
+    gw = py.execnet.PopenGateway()
+    gw.remote_init_threads(3)
+    c1 = gw.remote_exec("channel.send(channel.receive())")
+    c2 = gw.remote_exec("channel.send(channel.receive())")
+    c2.send(1)
+    res = c2.receive()
+    assert res == 1
+    c1.send(42)
+    res = c1.receive()
+    assert res == 42
+    gw.exit()
+    
+ 
+    
+    


More information about the py-svn mailing list