[py-svn] r39985 - in py/branch/lessthread: . py py/execnet py/execnet/testing

hpk at codespeak.net hpk at codespeak.net
Tue Mar 6 15:00:49 CET 2007


Author: hpk
Date: Tue Mar  6 15:00:48 2007
New Revision: 39985

Added:
   py/branch/lessthread/
      - copied from r39978, py/trunk/
   py/branch/lessthread/py/
      - copied from r39981, py/trunk/py/
   py/branch/lessthread/py/execnet/channel.py
      - copied unchanged from r39982, py/trunk/py/execnet/channel.py
   py/branch/lessthread/py/execnet/gateway.py
      - copied, changed from r39982, py/trunk/py/execnet/gateway.py
Modified:
   py/branch/lessthread/py/execnet/inputoutput.py
   py/branch/lessthread/py/execnet/testing/test_gateway.py
Log:
create branch for reducing of threads in execnet

Copied: py/branch/lessthread/py/execnet/gateway.py (from r39982, py/trunk/py/execnet/gateway.py)
==============================================================================
--- py/trunk/py/execnet/gateway.py	(original)
+++ py/branch/lessthread/py/execnet/gateway.py	Tue Mar  6 15:00:48 2007
@@ -23,7 +23,6 @@
     from py.__.execnet.message import Message
     ThreadOut = py._thread.ThreadOut 
     WorkerPool = py._thread.WorkerPool 
-    NamedThreadPool = py._thread.NamedThreadPool 
 
 import os
 debug = 0 # open('/tmp/execnet-debug-%d' % os.getpid()  , 'wa')
@@ -38,17 +37,18 @@
             inputoutput object and 'execthreads' execution
             threads. 
         """
-        global registered_cleanup
+        global registered_cleanup, _activegateways
         self._execpool = WorkerPool(maxthreads=execthreads)
         self._io = io
-        self._outgoing = Queue.Queue()
         self._channelfactory = ChannelFactory(self, _startcount)
         if not registered_cleanup:
             atexit.register(cleanup_atexit)
             registered_cleanup = True
-        _active_sendqueues[self._outgoing] = True
-        self._pool = NamedThreadPool(receiver = self._thread_receiver, 
-                                     sender = self._thread_sender)
+        _activegateways[self] = True
+        self._receiverthread = threading.Thread(name="receiver", 
+                                 target=self._thread_receiver)
+        self._receiverthread.setDaemon(0)
+        self._receiverthread.start() 
 
     def __repr__(self):
         """ return string representing gateway type and status. """
@@ -58,10 +58,9 @@
         else:
             addr = ''
         try:
-            r = (len(self._pool.getstarted('receiver'))
-                 and "receiving" or "not receiving")
-            s = (len(self._pool.getstarted('sender')) 
-                 and "sending" or "not sending")
+            r = (self._receiverthread.isAlive() and "receiving" or 
+                 "not receiving")
+            s = "sending" # XXX
             i = len(self._channelfactory.channels())
         except AttributeError:
             r = s = "uninitialized"
@@ -116,30 +115,19 @@
             self._trace('leaving %r' % threading.currentThread())
 
     def _send(self, msg):
-        self._outgoing.put(msg)
-
-    def _thread_sender(self):
-        """ thread to send Messages over the wire. """
-        try:
-            from sys import exc_info
-            while 1:
-                msg = self._outgoing.get()
-                try:
-                    if msg is None:
-                        self._io.close_write()
-                        break
-                    msg.writeto(self._io)
-                except:
-                    excinfo = exc_info()
-                    self._traceex(excinfo)
-                    if msg is not None:
-                        msg.post_sent(self, excinfo)
-                    break
-                else:
-                    self._trace('sent -> %r' % msg)
-                    msg.post_sent(self)
-        finally:
-            self._trace('leaving %r' % threading.currentThread())
+        from sys import exc_info
+        if msg is None:
+            self._io.close_write()
+        else:
+            try:
+                msg.writeto(self._io) 
+            except: 
+                excinfo = exc_info()
+                self._traceex(excinfo)
+                msg.post_sent(self, excinfo)
+            else:
+                msg.post_sent(self)
+                self._trace('sent -> %r' % msg)
 
     def _local_redirect_thread_output(self, outid, errid): 
         l = []
@@ -258,23 +246,16 @@
 
     def exit(self):
         """ Try to stop all IO activity. """
-        try:
-            del _active_sendqueues[self._outgoing]
-        except KeyError:
-            pass
-        else:
-            self._send(None)
+        self._send(None) 
 
     def join(self, joinexec=True):
         """ Wait for all IO (and by default all execution activity) 
             to stop. 
         """
         current = threading.currentThread()
-        for x in self._pool.getstarted(): 
-            if x != current: 
-                self._trace("joining %s" % x)
-                x.join()
-        self._trace("joining sender/reciver threads finished, current %r" % current) 
+        if self._receiverthread.isAlive():
+            self._trace("joining receiver thread")
+            self._receiverthread.join()
         if joinexec: 
             self._execpool.join()
             self._trace("joining execution threads finished, current %r" % current) 
@@ -288,14 +269,12 @@
         return x
 
 registered_cleanup = False
-_active_sendqueues = weakref.WeakKeyDictionary()
+_activegateways = weakref.WeakKeyDictionary()
 def cleanup_atexit():
     if debug:
         print >>debug, "="*20 + "cleaning up" + "=" * 20
         debug.flush()
-    while True:
-        try:
-            queue, ignored = _active_sendqueues.popitem()
-        except KeyError:
-            break
-        queue.put(None)
+    while _activegateways:
+        gw, ignored = _activegateways.popitem()
+        gw.exit()
+        #gw.join() should work as well

Modified: py/branch/lessthread/py/execnet/inputoutput.py
==============================================================================
--- py/trunk/py/execnet/inputoutput.py	(original)
+++ py/branch/lessthread/py/execnet/inputoutput.py	Tue Mar  6 15:00:48 2007
@@ -43,11 +43,17 @@
 
     def close_read(self):
         if self.readable:
-            self.sock.shutdown(0)
+            try:
+                self.sock.shutdown(0)
+            except socket.error:
+                pass
             self.readable = None
     def close_write(self):
         if self.writeable:
-            self.sock.shutdown(1)
+            try:
+                self.sock.shutdown(1)
+            except socket.error:
+                pass
             self.writeable = None
 
 class Popen2IO:

Modified: py/branch/lessthread/py/execnet/testing/test_gateway.py
==============================================================================
--- py/trunk/py/execnet/testing/test_gateway.py	(original)
+++ py/branch/lessthread/py/execnet/testing/test_gateway.py	Tue Mar  6 15:00:48 2007
@@ -83,8 +83,7 @@
 
 class BasicRemoteExecution:
     def test_correct_setup(self):
-        for x in 'sender', 'receiver':  
-            assert self.gw._pool.getstarted(x) 
+        assert self.gw._receiverthread.isAlive()
 
     def test_repr_doesnt_crash(self):
         assert isinstance(repr(self), str)


More information about the py-svn mailing list