[shpy-commit] r2823 - in shpy/trunk/dist/shpy/net: . test

hpk@codespeak.net hpk@codespeak.net
Sun, 18 Jan 2004 18:14:03 +0100 (MET)


Author: hpk
Date: Sun Jan 18 18:14:02 2004
New Revision: 2823

Modified:
   shpy/trunk/dist/shpy/net/gateway.py
   shpy/trunk/dist/shpy/net/startserver.py
   shpy/trunk/dist/shpy/net/test/test_basic.py
Log:
- new way of executing remote code
  done via incoming and outgoing queue

- see README.txt for a basic explanation

- there is only a one-shot asynchronous 
  interface so far (exec_remote which executes a string on
  the other side)



Modified: shpy/trunk/dist/shpy/net/gateway.py
==============================================================================
--- shpy/trunk/dist/shpy/net/gateway.py	(original)
+++ shpy/trunk/dist/shpy/net/gateway.py	Sun Jan 18 18:14:02 2004
@@ -1,67 +1,83 @@
-import threading, pickle
+import autopath
+
+import threading, pickle, Queue, select
 
 from shpy.net import common 
 
-class SocketGateway(threading.Thread):
+class SocketGateway:
     def __init__(self, sock):
-        import threading # XXX well 
+        self.sock = sock
+        self.incoming = Queue.Queue()
+        self.outgoing = Queue.Queue()
+        self.threads = [
+            SockQueueReader(self.sock, self.incoming),
+            QueueExecutor(self.incoming),
+            SockQueueSender(self.sock, self.outgoing)
+            ]
+        for t in self.threads:
+            t.setDaemon(1)
+            t.running = 1
+            t.gateway = self
+            t.start()
+
+    def exit(self):
+        for t in self.threads:
+            t.running = 0
+
+    def exec_remote(self, source):
+        self.outgoing.put(source)
+
+class SockQueueReader(threading.Thread):
+    def __init__(self, sock, queue):
         threading.Thread.__init__(self)
         self.sock = sock 
-        self.lock = threading.RLock()
-        self.running = 1
-        self.setDaemon(1)
-        self.start()
+        self.queue = queue
 
     def run(self):
         # here we want to select for incoming calls. 
         # but only if we have the lock.  
-        import select
         while self.running:
-            self.lock.acquire()
+            readlist = [self.sock]
+            r,w,e = select.select(readlist, [], [], 0.2)
+            if self.sock in r:
+                try:
+                    sockfile = self.sock.makefile('r+b')
+                    unpickler = pickle.Unpickler(sockfile)
+                    string = unpickler.load()
+                    self.queue.put(string)
+                except:
+                    import traceback
+                    traceback.print_exc()
+                    self.gateway.exit()
+
+
+class QueueExecutor(threading.Thread):
+    def __init__(self, queue):
+        threading.Thread.__init__(self)
+        self.queue = queue
+
+    def run(self):
+        while self.running:
+            source = self.queue.get()
             try:
-                readlist = [self.sock]
-                r,w,e = select.select(readlist, [], [], 0.1)
-                if self.sock in r:
-                    self.exec_incoming()
-            finally:
-                self.lock.release()
-
-    def exec_incoming(self):
-        # we have the lock here already!
-        print "executing incoming data. "
-        sockfile = self.sock.makefile('r+b')
-        pickler = pickle.Pickler(sockfile)
-        unpickler = pickle.Unpickler(sockfile)
-
-        source, funcname, args, kwargs = unpickler.load()
-        print "received source", source
-        print "will call %s(*%s, **%s)" % (funcname, args, kwargs)
-
-        try:
-            ns = {}
-            exec source in ns 
-            func = ns[funcname] 
-            res = func(*args, **kwargs)
-        except:
-            import sys
-            res = common.ExceptionInfo(sys.exc_info())
-        
-        pickler.dump(res)
-
-    def exec_remote(self, func, *args, **kwargs):
-        import inspect
-        self.lock.acquire()
-        try:
-            funcsource = inspect.getsource(func)
-            sockfile = self.sock.makefile('r+b')
-            pickler = pickle.Pickler(sockfile)
-            unpickler = pickle.Unpickler(sockfile)
-
-            print "sending %s(*%s, **%s)" %(func.func_name,
-                                            args, kwargs)
-            pickler.dump((funcsource, func.func_name, args, kwargs))
-            res = unpickler.load()
-            print "received result", res
-            return res
-        finally:
-            self.lock.release()
+                ns = { 'fromgateway' : self.gateway}
+                print "executing source", source[:10]
+                exec source in ns
+            except:
+                import traceback, sys
+                l = traceback.format_exception(*sys.exc_info())
+                errortext = "".join(l)
+                self.gateway.exec_remote("print %r" % errortext)
+
+class SockQueueSender(threading.Thread):
+    def __init__(self, sock, queue):
+        threading.Thread.__init__(self)
+        self.sock = sock
+        self.queue = queue
+    def run(self):
+        while self.running:
+            obj = self.queue.get()
+            f = self.sock.makefile('wb')
+            pickler = pickle.Pickler(f)
+            pickler.dump(obj)
+

Modified: shpy/trunk/dist/shpy/net/startserver.py
==============================================================================
--- shpy/trunk/dist/shpy/net/startserver.py	(original)
+++ shpy/trunk/dist/shpy/net/startserver.py	Sun Jan 18 18:14:02 2004
@@ -6,6 +6,13 @@
 #
 import autopath
 
+# XXX
+# a hack for convenience of displaying tracebacks 
+from unittest2.tool import dyncode  # just convenience 
+import linecache
+linecache.getline = dyncode.getline
+# end hack
+
 progname = 'unpickling_exec_server-1.0'
 # starting the agent server
 import sys, pickle
@@ -26,7 +33,12 @@
             clientfile = clientsock.makefile('r+b',0)
             unpickler = pickle.Unpickler(clientfile)
             pickler = pickle.Pickler(clientfile)
-            exec unpickler.load()
+            g = {'clientsock' : clientsock, 
+                 'address' : address,
+                 'unpickler' : unpickler,
+                 'pickler' : pickler}
+            co = dyncode.makecode(unpickler.load())
+            exec co in g,g 
         except KeyboardInterrupt:
             sys.exit(1)
         except:
@@ -34,7 +46,6 @@
             print traceback.print_exc()
             break
 
-
 def startserver(hostport):
     if isinstance(hostport, str):
         host, port = hostport.split(':')

Modified: shpy/trunk/dist/shpy/net/test/test_basic.py
==============================================================================
--- shpy/trunk/dist/shpy/net/test/test_basic.py	(original)
+++ shpy/trunk/dist/shpy/net/test/test_basic.py	Sun Jan 18 18:14:02 2004
@@ -25,11 +25,14 @@
     gw = register(hostport)
     atexit.register(setattr, gw, 'running', 0)
     started.append(gw)
+    return gw
 
-def test_server_initialization():
+def test_1_server_initialization():
     gw = getservergateway()
-    res = gw.exec_remote(functest.f)
-    check.equal(res, 42)
+
+    gw.exec_remote('print hello\n')
+    #res = gw.call_synchronous(functest.f)
+    #check.equal(res, 42)
 
 def test_callback():
     gw = getservergateway()