[py-svn] r39993 - py/branch/lessthread/py/execnet
hpk at codespeak.net
hpk at codespeak.net
Tue Mar 6 18:51:34 CET 2007
Author: hpk
Date: Tue Mar 6 18:51:31 2007
New Revision: 39993
Modified:
py/branch/lessthread/py/execnet/gateway.py
py/branch/lessthread/py/execnet/register.py
Log:
strike support for exec threads, and disallow execution
from remote by default.
Modified: py/branch/lessthread/py/execnet/gateway.py
==============================================================================
--- py/branch/lessthread/py/execnet/gateway.py (original)
+++ py/branch/lessthread/py/execnet/gateway.py Tue Mar 6 18:51:31 2007
@@ -22,7 +22,6 @@
from py.__.execnet.channel import ChannelFactory, Channel
from py.__.execnet.message import Message
ThreadOut = py._thread.ThreadOut
- WorkerPool = py._thread.WorkerPool
import os
debug = 0 # open('/tmp/execnet-debug-%d' % os.getpid() , 'wa')
@@ -31,15 +30,11 @@
class Gateway(object):
_ThreadOut = ThreadOut
- _workerpool = None
+ _requestqueue = None
remoteaddress = ""
- def __init__(self, io, execthreads=None, _startcount=2):
+ def __init__(self, io, allowexec=False, _startcount=2):
""" initialize core gateway, using the given
- inputoutput object and 'execthreads' execution
- threads. if 'execthreads' is -1, no execution
- will be allowed on this side. If it is None,
- requests will be queued and _servemain() may
- be used to execute them.
+ inputoutput object.
"""
global registered_cleanup, _activegateways
self._io = io
@@ -48,12 +43,8 @@
atexit.register(cleanup_atexit)
registered_cleanup = True
_activegateways[self] = True
- if execthreads != -1:
- if execthreads is not None:
- self._workerpool = WorkerPool(maxthreads=execthreads)
- else:
- self._requestqueue = Queue.Queue()
-
+ if allowexec:
+ self._requestqueue = Queue.Queue()
self._receiverthread = threading.Thread(name="receiver",
target=self._thread_receiver)
self._receiverthread.setDaemon(0)
@@ -154,9 +145,7 @@
return close
def _local_schedulexec(self, channel, sourcetask):
- if self._workerpool:
- self._workerpool.dispatch(self._executetask, channel, sourcetask)
- elif hasattr(self, '_requestqueue'):
+ if self._requestqueue is not None:
self._requestqueue.put((channel, sourcetask))
def _servemain(self, joining=True):
@@ -280,23 +269,17 @@
self._send(None)
def _stopexec(self):
- if hasattr(self, '_requestqueue'):
+ if self._requestqueue is not None:
self._requestqueue.put(None)
- if self._workerpool:
- self._workerpool.shutdown()
def join(self, joinexec=True):
""" Wait for all IO (and by default all execution activity)
- to stop.
+ to stop. the joinexec parameter is obsolete.
"""
current = threading.currentThread()
if self._receiverthread.isAlive():
self._trace("joining receiver thread")
self._receiverthread.join()
- if joinexec:
- if self._workerpool:
- self._workerpool.join()
- self._trace("joining execution threads finished, current %r" % current)
def getid(gw, cache={}):
name = gw.__class__.__name__
Modified: py/branch/lessthread/py/execnet/register.py
==============================================================================
--- py/branch/lessthread/py/execnet/register.py (original)
+++ py/branch/lessthread/py/execnet/register.py Tue Mar 6 18:51:31 2007
@@ -11,7 +11,6 @@
startup_modules = [
'py.__.thread.io',
- 'py.__.thread.pool',
'py.__.execnet.inputoutput',
'py.__.execnet.gateway',
'py.__.execnet.message',
@@ -41,7 +40,7 @@
bootstrap = [extra]
bootstrap += [getsource(x) for x in startup_modules]
bootstrap += [io.server_stmt,
- "Gateway(io=io, _startcount=2)._servemain()",
+ "Gateway(io=io, allowexec=True, _startcount=2)._servemain()",
]
source = "\n".join(bootstrap)
self._trace("sending gateway bootstrap code")
More information about the py-svn
mailing list