[py-svn] r40000 - in py/branch/lessthread/py/execnet: . testing
hpk at codespeak.net
hpk at codespeak.net
Tue Mar 6 22:06:32 CET 2007
Author: hpk
Date: Tue Mar 6 22:06:30 2007
New Revision: 40000
Modified:
py/branch/lessthread/py/execnet/gateway.py
py/branch/lessthread/py/execnet/testing/test_gateway.py
Log:
re-adding thread support as a method "remote_init_threads"
on top of the normal (by default non-thread) mechanisms
Modified: py/branch/lessthread/py/execnet/gateway.py
==============================================================================
--- py/branch/lessthread/py/execnet/gateway.py (original)
+++ py/branch/lessthread/py/execnet/gateway.py Tue Mar 6 22:06:30 2007
@@ -27,6 +27,8 @@
debug = 0 # open('/tmp/execnet-debug-%d' % os.getpid() , 'wa')
sysex = (KeyboardInterrupt, SystemExit)
+class StopExecLoop(Exception):
+ pass
class Gateway(object):
_ThreadOut = ThreadOut
@@ -156,12 +158,32 @@
if item is None:
self._stopsend()
break
- self._executetask(item) # could be done in an exec thread
+ stop = self._executetask(item) # could be done in an exec thread
+ if stop:
+ break
finally:
self._trace("_servemain finished")
if self.joining:
self.join()
+ def remote_init_threads(self, num=None):
+ """ start up to 'num' threads for subsequent
+ remote_exec() invocations to allow concurrent
+ execution.
+ """
+ from py.__.thread import pool
+ source = py.code.Source(pool, """
+ execpool = WorkerPool(maxthreads=%r)
+ gw = channel.gateway
+ while 1:
+ task = gw._requestqueue.get()
+ if task is None:
+ execpool.shutdown()
+ raise StopExecLoop
+ execpool.dispatch(gw._executetask, task)
+ """ % num)
+ self.remote_exec(source)
+
def _executetask(self, item):
""" execute channel/source items. """
from sys import exc_info
@@ -179,6 +201,9 @@
self._trace("execution finished:", repr(source)[:50])
except (KeyboardInterrupt, SystemExit):
pass
+ except StopExecLoop:
+ channel.close()
+ return True
except:
excinfo = exc_info()
l = traceback.format_exception(*excinfo)
Modified: py/branch/lessthread/py/execnet/testing/test_gateway.py
==============================================================================
--- py/branch/lessthread/py/execnet/testing/test_gateway.py (original)
+++ py/branch/lessthread/py/execnet/testing/test_gateway.py Tue Mar 6 22:06:30 2007
@@ -485,3 +485,19 @@
# now it did
py.test.raises(IOError, gw.remote_exec, "...")
+def test_threads():
+ gw = py.execnet.PopenGateway()
+ gw.remote_init_threads(3)
+ c1 = gw.remote_exec("channel.send(channel.receive())")
+ c2 = gw.remote_exec("channel.send(channel.receive())")
+ c2.send(1)
+ res = c2.receive()
+ assert res == 1
+ c1.send(42)
+ res = c1.receive()
+ assert res == 42
+ gw.exit()
+
+
+
+
More information about the py-svn
mailing list