[shpy-commit] r2823 - in shpy/trunk/dist/shpy/net: . test
hpk@codespeak.net
hpk@codespeak.net
Sun, 18 Jan 2004 18:14:03 +0100 (MET)
Author: hpk
Date: Sun Jan 18 18:14:02 2004
New Revision: 2823
Modified:
shpy/trunk/dist/shpy/net/gateway.py
shpy/trunk/dist/shpy/net/startserver.py
shpy/trunk/dist/shpy/net/test/test_basic.py
Log:
- new way of executing remote code
done via incoming and outgoing queue
- see README.txt for a basic explanation
- there is only a one-shot asynchronous
interface so far (exec_remote which executes a string on
the other side)
Modified: shpy/trunk/dist/shpy/net/gateway.py
==============================================================================
--- shpy/trunk/dist/shpy/net/gateway.py (original)
+++ shpy/trunk/dist/shpy/net/gateway.py Sun Jan 18 18:14:02 2004
@@ -1,67 +1,83 @@
-import threading, pickle
+import autopath
+
+import threading, pickle, Queue, select
from shpy.net import common
-class SocketGateway(threading.Thread):
+class SocketGateway:
def __init__(self, sock):
- import threading # XXX well
+ self.sock = sock
+ self.incoming = Queue.Queue()
+ self.outgoing = Queue.Queue()
+ self.threads = [
+ SockQueueReader(self.sock, self.incoming),
+ QueueExecutor(self.incoming),
+ SockQueueSender(self.sock, self.outgoing)
+ ]
+ for t in self.threads:
+ t.setDaemon(1)
+ t.running = 1
+ t.gateway = self
+ t.start()
+
+ def exit(self):
+ for t in self.threads:
+ t.running = 0
+
+ def exec_remote(self, source):
+ self.outgoing.put(source)
+
+class SockQueueReader(threading.Thread):
+ def __init__(self, sock, queue):
threading.Thread.__init__(self)
self.sock = sock
- self.lock = threading.RLock()
- self.running = 1
- self.setDaemon(1)
- self.start()
+ self.queue = queue
def run(self):
# here we want to select for incoming calls.
# but only if we have the lock.
- import select
while self.running:
- self.lock.acquire()
+ readlist = [self.sock]
+ r,w,e = select.select(readlist, [], [], 0.2)
+ if self.sock in r:
+ try:
+ sockfile = self.sock.makefile('r+b')
+ unpickler = pickle.Unpickler(sockfile)
+ string = unpickler.load()
+ self.queue.put(string)
+ except:
+ import traceback
+ traceback.print_exc()
+ self.gateway.exit()
+
+
+class QueueExecutor(threading.Thread):
+ def __init__(self, queue):
+ threading.Thread.__init__(self)
+ self.queue = queue
+
+ def run(self):
+ while self.running:
+ source = self.queue.get()
try:
- readlist = [self.sock]
- r,w,e = select.select(readlist, [], [], 0.1)
- if self.sock in r:
- self.exec_incoming()
- finally:
- self.lock.release()
-
- def exec_incoming(self):
- # we have the lock here already!
- print "executing incoming data. "
- sockfile = self.sock.makefile('r+b')
- pickler = pickle.Pickler(sockfile)
- unpickler = pickle.Unpickler(sockfile)
-
- source, funcname, args, kwargs = unpickler.load()
- print "received source", source
- print "will call %s(*%s, **%s)" % (funcname, args, kwargs)
-
- try:
- ns = {}
- exec source in ns
- func = ns[funcname]
- res = func(*args, **kwargs)
- except:
- import sys
- res = common.ExceptionInfo(sys.exc_info())
-
- pickler.dump(res)
-
- def exec_remote(self, func, *args, **kwargs):
- import inspect
- self.lock.acquire()
- try:
- funcsource = inspect.getsource(func)
- sockfile = self.sock.makefile('r+b')
- pickler = pickle.Pickler(sockfile)
- unpickler = pickle.Unpickler(sockfile)
-
- print "sending %s(*%s, **%s)" %(func.func_name,
- args, kwargs)
- pickler.dump((funcsource, func.func_name, args, kwargs))
- res = unpickler.load()
- print "received result", res
- return res
- finally:
- self.lock.release()
+ ns = { 'fromgateway' : self.gateway}
+ print "executing source", source[:10]
+ exec source in ns
+ except:
+ import traceback, sys
+ l = traceback.format_exception(*sys.exc_info())
+ errortext = "".join(l)
+ self.gateway.exec_remote("print %r" % errortext)
+
+class SockQueueSender(threading.Thread):
+ def __init__(self, sock, queue):
+ threading.Thread.__init__(self)
+ self.sock = sock
+ self.queue = queue
+ def run(self):
+ while self.running:
+ obj = self.queue.get()
+ f = self.sock.makefile('wb')
+ pickler = pickle.Pickler(f)
+ pickler.dump(obj)
+
Modified: shpy/trunk/dist/shpy/net/startserver.py
==============================================================================
--- shpy/trunk/dist/shpy/net/startserver.py (original)
+++ shpy/trunk/dist/shpy/net/startserver.py Sun Jan 18 18:14:02 2004
@@ -6,6 +6,13 @@
#
import autopath
+# XXX
+# a hack for convenience of displaying tracebacks
+from unittest2.tool import dyncode # just convenience
+import linecache
+linecache.getline = dyncode.getline
+# end hack
+
progname = 'unpickling_exec_server-1.0'
# starting the agent server
import sys, pickle
@@ -26,7 +33,12 @@
clientfile = clientsock.makefile('r+b',0)
unpickler = pickle.Unpickler(clientfile)
pickler = pickle.Pickler(clientfile)
- exec unpickler.load()
+ g = {'clientsock' : clientsock,
+ 'address' : address,
+ 'unpickler' : unpickler,
+ 'pickler' : pickler}
+ co = dyncode.makecode(unpickler.load())
+ exec co in g,g
except KeyboardInterrupt:
sys.exit(1)
except:
@@ -34,7 +46,6 @@
print traceback.print_exc()
break
-
def startserver(hostport):
if isinstance(hostport, str):
host, port = hostport.split(':')
Modified: shpy/trunk/dist/shpy/net/test/test_basic.py
==============================================================================
--- shpy/trunk/dist/shpy/net/test/test_basic.py (original)
+++ shpy/trunk/dist/shpy/net/test/test_basic.py Sun Jan 18 18:14:02 2004
@@ -25,11 +25,14 @@
gw = register(hostport)
atexit.register(setattr, gw, 'running', 0)
started.append(gw)
+ return gw
-def test_server_initialization():
+def test_1_server_initialization():
gw = getservergateway()
- res = gw.exec_remote(functest.f)
- check.equal(res, 42)
+
+ gw.exec_remote('print hello\n')
+ #res = gw.call_synchronous(functest.f)
+ #check.equal(res, 42)
def test_callback():
gw = getservergateway()