[py-svn] r39985 - in py/branch/lessthread: . py py/execnet py/execnet/testing
hpk at codespeak.net
hpk at codespeak.net
Tue Mar 6 15:00:49 CET 2007
Author: hpk
Date: Tue Mar 6 15:00:48 2007
New Revision: 39985
Added:
py/branch/lessthread/
- copied from r39978, py/trunk/
py/branch/lessthread/py/
- copied from r39981, py/trunk/py/
py/branch/lessthread/py/execnet/channel.py
- copied unchanged from r39982, py/trunk/py/execnet/channel.py
py/branch/lessthread/py/execnet/gateway.py
- copied, changed from r39982, py/trunk/py/execnet/gateway.py
Modified:
py/branch/lessthread/py/execnet/inputoutput.py
py/branch/lessthread/py/execnet/testing/test_gateway.py
Log:
create branch for reducing of threads in execnet
Copied: py/branch/lessthread/py/execnet/gateway.py (from r39982, py/trunk/py/execnet/gateway.py)
==============================================================================
--- py/trunk/py/execnet/gateway.py (original)
+++ py/branch/lessthread/py/execnet/gateway.py Tue Mar 6 15:00:48 2007
@@ -23,7 +23,6 @@
from py.__.execnet.message import Message
ThreadOut = py._thread.ThreadOut
WorkerPool = py._thread.WorkerPool
- NamedThreadPool = py._thread.NamedThreadPool
import os
debug = 0 # open('/tmp/execnet-debug-%d' % os.getpid() , 'wa')
@@ -38,17 +37,18 @@
inputoutput object and 'execthreads' execution
threads.
"""
- global registered_cleanup
+ global registered_cleanup, _activegateways
self._execpool = WorkerPool(maxthreads=execthreads)
self._io = io
- self._outgoing = Queue.Queue()
self._channelfactory = ChannelFactory(self, _startcount)
if not registered_cleanup:
atexit.register(cleanup_atexit)
registered_cleanup = True
- _active_sendqueues[self._outgoing] = True
- self._pool = NamedThreadPool(receiver = self._thread_receiver,
- sender = self._thread_sender)
+ _activegateways[self] = True
+ self._receiverthread = threading.Thread(name="receiver",
+ target=self._thread_receiver)
+ self._receiverthread.setDaemon(0)
+ self._receiverthread.start()
def __repr__(self):
""" return string representing gateway type and status. """
@@ -58,10 +58,9 @@
else:
addr = ''
try:
- r = (len(self._pool.getstarted('receiver'))
- and "receiving" or "not receiving")
- s = (len(self._pool.getstarted('sender'))
- and "sending" or "not sending")
+ r = (self._receiverthread.isAlive() and "receiving" or
+ "not receiving")
+ s = "sending" # XXX
i = len(self._channelfactory.channels())
except AttributeError:
r = s = "uninitialized"
@@ -116,30 +115,19 @@
self._trace('leaving %r' % threading.currentThread())
def _send(self, msg):
- self._outgoing.put(msg)
-
- def _thread_sender(self):
- """ thread to send Messages over the wire. """
- try:
- from sys import exc_info
- while 1:
- msg = self._outgoing.get()
- try:
- if msg is None:
- self._io.close_write()
- break
- msg.writeto(self._io)
- except:
- excinfo = exc_info()
- self._traceex(excinfo)
- if msg is not None:
- msg.post_sent(self, excinfo)
- break
- else:
- self._trace('sent -> %r' % msg)
- msg.post_sent(self)
- finally:
- self._trace('leaving %r' % threading.currentThread())
+ from sys import exc_info
+ if msg is None:
+ self._io.close_write()
+ else:
+ try:
+ msg.writeto(self._io)
+ except:
+ excinfo = exc_info()
+ self._traceex(excinfo)
+ msg.post_sent(self, excinfo)
+ else:
+ msg.post_sent(self)
+ self._trace('sent -> %r' % msg)
def _local_redirect_thread_output(self, outid, errid):
l = []
@@ -258,23 +246,16 @@
def exit(self):
""" Try to stop all IO activity. """
- try:
- del _active_sendqueues[self._outgoing]
- except KeyError:
- pass
- else:
- self._send(None)
+ self._send(None)
def join(self, joinexec=True):
""" Wait for all IO (and by default all execution activity)
to stop.
"""
current = threading.currentThread()
- for x in self._pool.getstarted():
- if x != current:
- self._trace("joining %s" % x)
- x.join()
- self._trace("joining sender/reciver threads finished, current %r" % current)
+ if self._receiverthread.isAlive():
+ self._trace("joining receiver thread")
+ self._receiverthread.join()
if joinexec:
self._execpool.join()
self._trace("joining execution threads finished, current %r" % current)
@@ -288,14 +269,12 @@
return x
registered_cleanup = False
-_active_sendqueues = weakref.WeakKeyDictionary()
+_activegateways = weakref.WeakKeyDictionary()
def cleanup_atexit():
if debug:
print >>debug, "="*20 + "cleaning up" + "=" * 20
debug.flush()
- while True:
- try:
- queue, ignored = _active_sendqueues.popitem()
- except KeyError:
- break
- queue.put(None)
+ while _activegateways:
+ gw, ignored = _activegateways.popitem()
+ gw.exit()
+ #gw.join() should work as well
Modified: py/branch/lessthread/py/execnet/inputoutput.py
==============================================================================
--- py/trunk/py/execnet/inputoutput.py (original)
+++ py/branch/lessthread/py/execnet/inputoutput.py Tue Mar 6 15:00:48 2007
@@ -43,11 +43,17 @@
def close_read(self):
if self.readable:
- self.sock.shutdown(0)
+ try:
+ self.sock.shutdown(0)
+ except socket.error:
+ pass
self.readable = None
def close_write(self):
if self.writeable:
- self.sock.shutdown(1)
+ try:
+ self.sock.shutdown(1)
+ except socket.error:
+ pass
self.writeable = None
class Popen2IO:
Modified: py/branch/lessthread/py/execnet/testing/test_gateway.py
==============================================================================
--- py/trunk/py/execnet/testing/test_gateway.py (original)
+++ py/branch/lessthread/py/execnet/testing/test_gateway.py Tue Mar 6 15:00:48 2007
@@ -83,8 +83,7 @@
class BasicRemoteExecution:
def test_correct_setup(self):
- for x in 'sender', 'receiver':
- assert self.gw._pool.getstarted(x)
+ assert self.gw._receiverthread.isAlive()
def test_repr_doesnt_crash(self):
assert isinstance(repr(self), str)
More information about the py-svn
mailing list