From commits-noreply at bitbucket.org Mon Oct 24 20:42:06 2011 From: commits-noreply at bitbucket.org (Bitbucket) Date: Mon, 24 Oct 2011 18:42:06 -0000 Subject: [execnet-commit] commit/execnet: 3 new changesets Message-ID: <20111024184206.20749.79644@bitbucket02.managed.contegix.com> 3 new commits in execnet: https://bitbucket.org/hpk42/execnet/changeset/9d6f3620d52a/ changeset: 9d6f3620d52a user: RonnyPfannschmidt date: 2011-10-24 16:00:22 summary: turn the rinfo retrieval into a pure function affected #: 1 file diff -r b6e1dfaa89d1f975a31786a692ddaf36fffb2e67 -r 9d6f3620d52a222417c260b7ac31663d64366d7f execnet/gateway.py --- a/execnet/gateway.py +++ b/execnet/gateway.py @@ -152,16 +152,15 @@ RemoteStatus = RInfo -rinfo_source = """ -import sys, os -channel.send(dict( - executable = sys.executable, - version_info = sys.version_info[:5], - platform = sys.platform, - cwd = os.getcwd(), - pid = os.getpid(), -)) -""" +def rinfo_source(channel): + import sys, os + channel.send(dict( + executable = sys.executable, + version_info = sys.version_info[:5], + platform = sys.platform, + cwd = os.getcwd(), + pid = os.getpid(), + )) def _find_non_builtin_globals(source, codeobj): https://bitbucket.org/hpk42/execnet/changeset/404c1c4108e9/ changeset: 404c1c4108e9 user: RonnyPfannschmidt date: 2011-10-24 17:58:36 summary: add kwargs support to Group.remote_exec affected #: 2 files diff -r 9d6f3620d52a222417c260b7ac31663d64366d7f -r 404c1c4108e94b967ba1a2c1f4b417b1bbd91115 execnet/multi.py --- a/execnet/multi.py +++ b/execnet/multi.py @@ -168,13 +168,13 @@ if popen: killpopen(popen) - def remote_exec(self, source): + def remote_exec(self, source, **kwargs): """ remote_exec source on all member gateways and return MultiChannel connecting to all sub processes. """ channels = [] for gw in self: - channels.append(gw.remote_exec(source)) + channels.append(gw.remote_exec(source, **kwargs)) return MultiChannel(channels) class MultiChannel: diff -r 9d6f3620d52a222417c260b7ac31663d64366d7f -r 404c1c4108e94b967ba1a2c1f4b417b1bbd91115 testing/test_multi.py --- a/testing/test_multi.py +++ b/testing/test_multi.py @@ -161,3 +161,12 @@ assert gw in newlist assert gw not in oldlist + def test_remote_exec_args(self): + group = Group() + gw = group.makegateway('popen') + + def fun(channel, arg): + channel.send(arg) + mch = group.remote_exec(fun, arg=1) + result = mch.receive_each() + assert result == [1] https://bitbucket.org/hpk42/execnet/changeset/8900be7ee856/ changeset: 8900be7ee856 user: RonnyPfannschmidt date: 2011-10-24 18:03:24 summary: update changelog affected #: 1 file diff -r 404c1c4108e94b967ba1a2c1f4b417b1bbd91115 -r 8900be7ee8563378561b8d433a2bb257fd7e139e CHANGELOG --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ 1.0.10.dev -------------------------------- +- group.remote_exec now supports *kwargs as well + - added a ``dont_write_bytecode`` option to Popen gateways, this sets the ``sys.dont_write_bytecode`` flag on the spawned process, this only works on CPython 2.6 and higher. Thanks to Alex Gaynor. Repository URL: https://bitbucket.org/hpk42/execnet/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email. From commits-noreply at bitbucket.org Sat Nov 19 08:34:09 2011 From: commits-noreply at bitbucket.org (Bitbucket) Date: Sat, 19 Nov 2011 07:34:09 -0000 Subject: [execnet-commit] commit/execnet: 2 new changesets Message-ID: <20111119073409.6326.55491@bitbucket05.managed.contegix.com> 2 new commits in execnet: https://bitbucket.org/hpk42/execnet/changeset/1a63deea3424/ changeset: 1a63deea3424 user: hpk42 date: 2011-11-19 08:24:11 summary: try harder to get the actual debuffilename when testing filetracing aka EXECNET_DEBUG=1 affected #: 1 file diff -r 8900be7ee8563378561b8d433a2bb257fd7e139e -r 1a63deea34245078197ab0dfd06bb480feb4749c testing/test_gateway.py --- a/testing/test_gateway.py +++ b/testing/test_gateway.py @@ -350,8 +350,11 @@ monkeypatch.setenv("TEMP", tmpdir) # windows monkeypatch.setenv('EXECNET_DEBUG', "1") gw = execnet.makegateway("popen") - pid = gw.remote_exec("import os ; channel.send(os.getpid())").receive() - slavefile = tmpdir.join("execnet-debug-%s" % pid) + # hack out the debuffilename + fn = gw.remote_exec( + "import execnet;channel.send(execnet.gateway_base.fn)" + ).receive() + slavefile = py.path.local(fn) assert slavefile.check() slave_line = "creating slavegateway" for line in slavefile.readlines(): https://bitbucket.org/hpk42/execnet/changeset/02bbee2639c2/ changeset: 02bbee2639c2 user: hpk42 date: 2011-11-19 08:33:54 summary: adapt to pytest-2.2 and update tests abit, there still is one palce with metafunc.addcall()s remaining in test_serializer.py affected #: 3 files diff -r 1a63deea34245078197ab0dfd06bb480feb4749c -r 02bbee2639c2c8ea63ccc4b486172f3e670634ae conftest.py --- a/conftest.py +++ b/conftest.py @@ -13,6 +13,8 @@ 'python2.5': r'C:\Python25\python.exe', 'python2.4': r'C:\Python24\python.exe', 'python3.1': r'C:\Python31\python.exe', + 'python3.2': r'C:\Python31\python.exe', + 'python3.3': r'C:\Python31\python.exe', } def pytest_runtest_setup(item, __multicall__): @@ -81,12 +83,12 @@ gwtypes = [metafunc.cls.gwtype] else: gwtypes = ['popen', 'socket', 'ssh'] - for gwtype in gwtypes: - metafunc.addcall(id=gwtype, param=gwtype) + metafunc.parametrize("gw", gwtypes, indirect=True) elif 'anypython' in metafunc.funcargnames: - for name in ('python3.1', 'python2.4', 'python2.5', 'python2.6', - 'python2.7', 'pypy-c', 'jython'): - metafunc.addcall(id=name, param=name) + metafunc.parametrize("anypython", indirect=True, argvalues= + ('python3.3', 'python3.2', 'python2.4', 'python2.5', + 'python2.6', 'python2.7', 'pypy-c', 'jython') + ) def getexecutable(name, cache={}): try: @@ -113,6 +115,7 @@ executable = py.path.local(executable) if executable.check(): return executable + executable = None py.test.skip("no %s found" % (name,)) return executable diff -r 1a63deea34245078197ab0dfd06bb480feb4749c -r 02bbee2639c2c8ea63ccc4b486172f3e670634ae execnet/gateway_base.py --- a/execnet/gateway_base.py +++ b/execnet/gateway_base.py @@ -45,6 +45,7 @@ elif DEBUG: import tempfile, os.path fn = os.path.join(tempfile.gettempdir(), 'execnet-debug-%d' % pid) + #sys.stderr.write("execnet-debug at %r" %(fn,)) debugfile = open(fn, 'w') def trace(*msg): try: diff -r 1a63deea34245078197ab0dfd06bb480feb4749c -r 02bbee2639c2c8ea63ccc4b486172f3e670634ae tox.ini --- a/tox.ini +++ b/tox.ini @@ -14,7 +14,7 @@ [testenv] changedir=testing -deps=pytest +deps=pytest>=2.2.0.dev8 commands=py.test -rsfxX --junitxml={envlogdir}/junit-{envname}.xml [] [testenv:py26-py134] @@ -30,7 +30,7 @@ basepython=python changedir=doc deps=:pypi:sphinx - py + pytest commands= py.test \ -rsfxX -v --junitxml={envlogdir}/junit-{envname}.xml check_sphinx.py Repository URL: https://bitbucket.org/hpk42/execnet/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email. From commits-noreply at bitbucket.org Mon Nov 21 21:15:29 2011 From: commits-noreply at bitbucket.org (Bitbucket) Date: Mon, 21 Nov 2011 20:15:29 -0000 Subject: [execnet-commit] commit/execnet: 2 new changesets Message-ID: <20111121201529.10695.98993@bitbucket05.managed.contegix.com> 2 new commits in execnet: https://bitbucket.org/hpk42/execnet/changeset/7c591c4dd050/ changeset: 7c591c4dd050 user: gutworth date: 2011-11-21 21:13:24 summary: replace pytest_generate_tests with the parametrize decorator affected #: 1 file diff -r 02bbee2639c2c8ea63ccc4b486172f3e670634ae -r 7c591c4dd050868538a592e4bcb7225eb3306a5d testing/test_serializer.py --- a/testing/test_serializer.py +++ b/testing/test_serializer.py @@ -111,38 +111,23 @@ py_dump = request.getfuncargvalue(request.param[1]) return py_dump.load -def pytest_generate_tests(metafunc): - if 'dump' in metafunc.funcargnames and 'load' in metafunc.funcargnames: - pys = 'py2', 'py3' - for dump in pys: - for load in pys: - param = (dump, load) - conversion = '%s to %s'%param - if 'repr' not in metafunc.funcargnames: - metafunc.addcall(id=conversion, param=param) - else: - for tp, repr in simple_tests.items(): - metafunc.addcall( - id='%s:%s'%(tp, conversion), - param=param, - funcargs={'tp_name':tp, 'repr':repr}, - ) +simple_tests = [ +# type: expected before/after repr + ('int', '4'), + ('float', '3.25'), + ('list', '[1, 2, 3]'), + ('tuple', '(1, 2, 3)'), + ('dict', '{(1, 2, 3): 32}'), +] - -simple_tests = { -# type: expected before/after repr - 'int': '4', - 'float':'3.25', - 'list': '[1, 2, 3]', - 'tuple': '(1, 2, 3)', - 'dict': '{(1, 2, 3): 32}', -} - -def test_simple(tp_name, repr, dump, load): - p = dump(repr) - tp , v = load(p) - assert tp == tp_name - assert v == repr + at py.test.mark.parametrize(["tp_name", "repr"], simple_tests) +def test_simple(tp_name, repr, py2, py3): + for load in py2.load, py3.load: + for dump in py3.dump, py2.dump: + p = dump(repr) + tp , v = load(p) + assert tp == tp_name + assert v == repr def test_set(py2, py3): for dump in py2.dump, py3.dump: https://bitbucket.org/hpk42/execnet/changeset/4cd5a2a6afbb/ changeset: 4cd5a2a6afbb user: gutworth date: 2011-11-21 21:15:19 summary: remove dead funcargs affected #: 1 file diff -r 7c591c4dd050868538a592e4bcb7225eb3306a5d -r 4cd5a2a6afbb784fe3c13f915331d30dd6aea3f4 testing/test_serializer.py --- a/testing/test_serializer.py +++ b/testing/test_serializer.py @@ -103,14 +103,6 @@ def pytest_funcarg__py3(request): return _py3_wrapper -def pytest_funcarg__dump(request): - py_dump = request.getfuncargvalue(request.param[0]) - return py_dump.dump - -def pytest_funcarg__load(request): - py_dump = request.getfuncargvalue(request.param[1]) - return py_dump.load - simple_tests = [ # type: expected before/after repr ('int', '4'), Repository URL: https://bitbucket.org/hpk42/execnet/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email. From commits-noreply at bitbucket.org Tue Nov 22 09:49:02 2011 From: commits-noreply at bitbucket.org (Bitbucket) Date: Tue, 22 Nov 2011 08:49:02 -0000 Subject: [execnet-commit] commit/execnet: 22 new changesets Message-ID: <20111122084902.23151.87982@bitbucket12.managed.contegix.com> 22 new commits in execnet: https://bitbucket.org/hpk42/execnet/changeset/6aa62cca635c/ changeset: 6aa62cca635c user: RonnyPfannschmidt date: 2011-10-24 21:04:02 summary: fix the readloop of Popen2IO affected #: 3 files diff -r 8900be7ee8563378561b8d433a2bb257fd7e139e -r 6aa62cca635c2268c53158e29936dbfd1221b1c5 CHANGELOG --- a/CHANGELOG +++ b/CHANGELOG @@ -3,6 +3,8 @@ - group.remote_exec now supports *kwargs as well +- Popen2IO.read now reads correct amounts of bytes from nonblocking fd's + - added a ``dont_write_bytecode`` option to Popen gateways, this sets the ``sys.dont_write_bytecode`` flag on the spawned process, this only works on CPython 2.6 and higher. Thanks to Alex Gaynor. diff -r 8900be7ee8563378561b8d433a2bb257fd7e139e -r 6aa62cca635c2268c53158e29936dbfd1221b1c5 execnet/gateway_base.py --- a/execnet/gateway_base.py +++ b/execnet/gateway_base.py @@ -81,8 +81,8 @@ """Read exactly 'numbytes' bytes from the pipe. """ # a file in non-blocking mode may return less bytes, so we loop buf = bytes() - while len(buf) < numbytes: - data = self._read(numbytes) + while numbytes > len(buf): + data = self._read(numbytes-len(buf)) if not data: raise EOFError("expected %d bytes, got %d" %(numbytes, len(buf))) buf += data diff -r 8900be7ee8563378561b8d433a2bb257fd7e139e -r 6aa62cca635c2268c53158e29936dbfd1221b1c5 testing/test_basics.py --- a/testing/test_basics.py +++ b/testing/test_basics.py @@ -4,7 +4,12 @@ import execnet from execnet import gateway_base, gateway from execnet.gateway_base import Message, Channel, ChannelFactory, serialize, \ - Unserializer + Unserializer, Popen2IO + +try: + from StringIO import StringIO as BytesIO +except: + from io import BytesIO def test_errors_on_execnet(): assert hasattr(execnet, 'RemoteError') @@ -107,6 +112,17 @@ ret = proc.wait() assert "hello".encode('ascii') in stdout +def test_popen_io_readloop(monkeypatch): + sio = BytesIO('test'.encode('ascii')) + io = Popen2IO(sio, sio) + real_read = io._read + def newread(numbytes): + if numbytes > 1: + numbytes = numbytes-1 + return real_read(numbytes) + io._read = newread + result = io.read(3) + assert result == 'tes'.encode('ascii') def test_rinfo_source(anypython, tmpdir): check = tmpdir.join("check.py") https://bitbucket.org/hpk42/execnet/changeset/9adf43adec3b/ changeset: 9adf43adec3b branch: message-framing user: RonnyPfannschmidt date: 2011-10-24 23:21:56 summary: first step torwards more generic message transfer and per channel string serialization configuration affected #: 4 files diff -r 6aa62cca635c2268c53158e29936dbfd1221b1c5 -r 9adf43adec3b2ed39df97aa5c5262e7e11eca56e execnet/gateway.py --- a/execnet/gateway.py +++ b/execnet/gateway.py @@ -6,7 +6,7 @@ import sys, os, inspect, types, linecache import textwrap import execnet -from execnet.gateway_base import Message, Popen2IO +from execnet.gateway_base import Message, Popen2IO, serialize from execnet import gateway_base importdir = os.path.dirname(os.path.dirname(execnet.__file__)) @@ -54,9 +54,9 @@ the default is to try to convert py2 str as py3 str, but not to try and convert py3 str to py2 str """ - self._unserializer.py2str_as_py3str = py2str_as_py3str - self._unserializer.py3str_as_py2str = py3str_as_py2str - self._send(Message.RECONFIGURE, data=(py2str_as_py3str, py3str_as_py2str)) + self._strconfig = (py2str_as_py3str, py3str_as_py2str) + data = serialize(self._strconfig + (None,)) + self._send(Message.RECONFIGURE, data=data) def _remote_bootstrap_gateway(self, io): """ send gateway bootstrap code to a remote Python interpreter @@ -124,7 +124,7 @@ channel = self.newchannel() self._send(Message.CHANNEL_EXEC, channel.id, - (source, call_name, kwargs)) + serialize((source, call_name, kwargs))) return channel def remote_init_threads(self, num=None): diff -r 6aa62cca635c2268c53158e29936dbfd1221b1c5 -r 9adf43adec3b2ed39df97aa5c5262e7e11eca56e execnet/gateway_base.py --- a/execnet/gateway_base.py +++ b/execnet/gateway_base.py @@ -12,6 +12,11 @@ except ImportError: import Queue as queue +try: + from io import BytesIO +except: + from StringIO import StringIO as BytesIO + ISPY3 = sys.version_info >= (3, 0) if ISPY3: exec("def do_exec(co, loc): exec(co, loc)\n" @@ -109,6 +114,16 @@ self.channelid = channelid self.data = data + @staticmethod + def from_io(io): + header = io.read(9) # type 1, channel 4, payload 4 + msgtype, channel, payload = struct.unpack('!bii', header) + return Message(msgtype, channel, io.read(payload)) + + def to_io(self, io): + header = struct.pack('!bii', self.msgcode, self.channelid, len(self.data)) + io.write(header+self.data) + def received(self, gateway): self._types[self.msgcode](self, gateway) @@ -119,7 +134,7 @@ return "" %(name, self.channelid, len(r)) else: - return "" %(name, + return "" %(name, self.channelid, self.data) def _setupmessages(): @@ -135,11 +150,11 @@ 'numchannels': len(active_channels), 'numexecuting': numexec } - gateway._send(Message.CHANNEL_DATA, message.channelid, d) + gateway._send(Message.CHANNEL_DATA, message.channelid, serialize(d)) def channel_exec(message, gateway): channel = gateway._channelfactory.new(message.channelid) - gateway._local_schedulexec(channel=channel, sourcetask=message.data) + gateway._local_schedulexec(channel=channel,sourcetask=message.data) def channel_data(message, gateway): gateway._channelfactory._local_receive(message.channelid, message.data) @@ -148,7 +163,7 @@ gateway._channelfactory._local_close(message.channelid) def channel_close_error(message, gateway): - remote_error = RemoteError(message.data) + remote_error = RemoteError(deserialize(message.data)) gateway._channelfactory._local_close(message.channelid, remote_error) def channel_last_message(message, gateway): @@ -159,9 +174,10 @@ raise SystemExit(0) def reconfigure(message, gateway): - py2str_as_py3str, py3str_as_py2str = message.data - gateway._unserializer.py2str_as_py3str = py2str_as_py3str - gateway._unserializer.py3str_as_py2str = py3str_as_py2str + py2str_as_py3str, py3str_as_py2str, target = deserialize(message.data) + if target is None: + target = gateway + target._strconfig = py2str_as_py3str, py3str_as_py2str types = [ status, reconfigure, gateway_terminate, @@ -340,7 +356,7 @@ if not self._receiveclosed.isSet(): put = self.gateway._send if error is not None: - put(Message.CHANNEL_CLOSE_ERROR, self.id, error) + put(Message.CHANNEL_CLOSE_ERROR, self.id, serialize(error)) else: put(Message.CHANNEL_CLOSE, self.id) self._trace("sent channel close message") @@ -380,7 +396,7 @@ """ if self.isclosed(): raise IOError("cannot send to %r" %(self,)) - self.gateway._send(Message.CHANNEL_DATA, self.id, item) + self.gateway._send(Message.CHANNEL_DATA, self.id, serialize(item)) def receive(self, timeout=-1): """receive a data item that was sent from the other side. @@ -494,6 +510,7 @@ def _local_receive(self, id, data): # executes in receiver thread + data = deserialize(data, self) try: callback, endmarker = self._callbacks[id] except KeyError: @@ -512,7 +529,7 @@ excinfo = sys.exc_info() self.gateway._trace("exception during callback: %s" % excinfo[1]) errortext = self.gateway._geterrortext(excinfo) - self.gateway._send(Message.CHANNEL_CLOSE_ERROR, id, errortext) + self.gateway._send(Message.CHANNEL_CLOSE_ERROR, id, serialize(errortext)) self._local_close(id, errortext) def _finished_receiving(self): @@ -589,7 +606,6 @@ self._io = io self.id = id self._channelfactory = ChannelFactory(self, _startcount) - self._unserializer = Unserializer(self._io, self._channelfactory) self._receivelock = threading.RLock() # globals may be NONE at process-termination self._trace = trace @@ -607,10 +623,11 @@ def _thread_receiver(self): self._trace("RECEIVERTHREAD: starting to run") eof = False + io = self._io try: try: while 1: - msg = Message(*self._unserializer.load()) + msg = Message.from_io(io) self._trace("received", msg) _receivelock = self._receivelock _receivelock.acquire() @@ -643,8 +660,9 @@ def _terminate_execution(self): pass - def _send(self, msgcode, channelid=0, data=''): - serialize(self._io, (msgcode, channelid, data)) + def _send(self, msgcode, channelid=0, data=bytes()): + header = struct.pack('!bii', msgcode, channelid, len(data)) + self._io.write(header+data) self._trace('sent', Message(msgcode, channelid, data)) def _local_schedulexec(self, channel, sourcetask): @@ -671,6 +689,7 @@ class SlaveGateway(BaseGateway): def _local_schedulexec(self, channel, sourcetask): + sourcetask = deserialize(sourcetask, self._channelfactory) self._execqueue.put((channel, sourcetask)) def _terminate_execution(self): @@ -782,9 +801,15 @@ py2str_as_py3str = True # True py3str_as_py2str = False # false means py2 will get unicode - def __init__(self, stream, channelfactory=None): + def __init__(self, stream, channel_or_gateway=None): + strconfig = default = self.py2str_as_py3str, self.py3str_as_py2str + gateway = getattr(channel_or_gateway, 'gateway', channel_or_gateway) + strconfig = getattr(channel_or_gateway, '_strconfig', default) + if strconfig is default: + strconfig = getattr(gateway, '_strconfig', default) + self.py2str_as_py3str, self.py3str_as_py2str = strconfig self.stream = stream - self.channelfactory = channelfactory + self.channelfactory = gateway and gateway._channelfactory def load(self): self.stack = [] @@ -930,8 +955,15 @@ _buildopcodes() -def serialize(io, obj): +def serialize(obj): + io = BytesIO() _Serializer(io).save(obj) + return io.getvalue() + +def deserialize(data, channelfactory=None): + io = BytesIO(data) + return Unserializer(io, channelfactory).load() + class _Serializer(object): _dispatch = {} diff -r 6aa62cca635c2268c53158e29936dbfd1221b1c5 -r 9adf43adec3b2ed39df97aa5c5262e7e11eca56e testing/test_basics.py --- a/testing/test_basics.py +++ b/testing/test_basics.py @@ -77,16 +77,16 @@ print ("checking %s %s" %(i, handler)) for data in "hello", "hello".encode('ascii'): msg1 = Message(i, i, data) - serialize(io, (i, i, data)) + msg1.to_io(io) x = io.outfile.getvalue() io.outfile.truncate(0) io.outfile.seek(0) io.infile.seek(0) io.infile.write(x) io.infile.seek(0) - msg2 = Message(*unserializer.load()) + msg2 = Message.from_io(io) assert msg1.channelid == msg2.channelid, (msg1, msg2) - assert msg1.data == msg2.data + assert msg1.data == msg2.data, (msg1.data, msg2.data) assert msg1.msgcode == msg2.msgcode print ("all passed") """)) @@ -194,9 +194,9 @@ for i, handler in enumerate(Message._types): one = py.io.BytesIO() data = '23'.encode('ascii') - serialize(one, (i, 42, data)) + Message(i, 42, data).to_io(one) two = py.io.BytesIO(one.getvalue()) - msg = Message(*Unserializer(two, None).load()) + msg = Message.from_io(two) assert msg.msgcode == i assert isinstance(msg, Message) assert msg.channelid == 42 diff -r 6aa62cca635c2268c53158e29936dbfd1221b1c5 -r 9adf43adec3b2ed39df97aa5c5262e7e11eca56e testing/test_serializer.py --- a/testing/test_serializer.py +++ b/testing/test_serializer.py @@ -56,7 +56,7 @@ from execnet import gateway_base as serializer if sys.version_info > (3, 0): # Need binary output sys.stdout = sys.stdout.detach() -saver = serializer.serialize(sys.stdout, %s) +sys.stdout.write(serializer.serialize(%s)) """ % (pyimportdir, obj_rep,)) popen = subprocess.Popen([str(self.executable), str(script_file)], stdin=subprocess.PIPE, https://bitbucket.org/hpk42/execnet/changeset/7619dfe47e41/ changeset: 7619dfe47e41 branch: message-framing user: RonnyPfannschmidt date: 2011-10-25 00:15:19 summary: give messages a deseralized repr with fake channels affected #: 1 file diff -r 9adf43adec3b2ed39df97aa5c5262e7e11eca56e -r 7619dfe47e415b88eb7f5aa90220c04eadbcad98 execnet/gateway_base.py --- a/execnet/gateway_base.py +++ b/execnet/gateway_base.py @@ -128,14 +128,26 @@ self._types[self.msgcode](self, gateway) def __repr__(self): + class FakeChannel(object): + _strconfig = False, False # never transform, never fail + def __init__(self, id): + self.id = id + def __repr__(self): + return '' % self.id + FakeChannel.new = FakeChannel + FakeChannel.gateway = FakeChannel name = self._types[self.msgcode].__name__.upper() - r = repr(self.data) + try: + data = deserialize(self.data, FakeChannel) + except UnserializationError: + data = self.data + r = repr(data) if len(r) > 50: return "" %(name, self.channelid, len(r)) else: - return "" %(name, - self.channelid, self.data) + return "" %(name, + self.channelid, r) def _setupmessages(): def status(message, gateway): https://bitbucket.org/hpk42/execnet/changeset/5aa89a3e602b/ changeset: 5aa89a3e602b branch: message-framing user: RonnyPfannschmidt date: 2011-10-25 00:17:34 summary: initial support for per channel string coercion affected #: 4 files diff -r 7619dfe47e415b88eb7f5aa90220c04eadbcad98 -r 5aa89a3e602b8d8a18ccf62b9d7f9b0e0639f850 CHANGELOG --- a/CHANGELOG +++ b/CHANGELOG @@ -3,6 +3,8 @@ - group.remote_exec now supports *kwargs as well +- support per channel string coercion configuration + - Popen2IO.read now reads correct amounts of bytes from nonblocking fd's - added a ``dont_write_bytecode`` option to Popen gateways, this sets the diff -r 7619dfe47e415b88eb7f5aa90220c04eadbcad98 -r 5aa89a3e602b8d8a18ccf62b9d7f9b0e0639f850 execnet/gateway.py --- a/execnet/gateway.py +++ b/execnet/gateway.py @@ -55,7 +55,7 @@ but not to try and convert py3 str to py2 str """ self._strconfig = (py2str_as_py3str, py3str_as_py2str) - data = serialize(self._strconfig + (None,)) + data = serialize(self._strconfig) self._send(Message.RECONFIGURE, data=data) def _remote_bootstrap_gateway(self, io): diff -r 7619dfe47e415b88eb7f5aa90220c04eadbcad98 -r 5aa89a3e602b8d8a18ccf62b9d7f9b0e0639f850 execnet/gateway_base.py --- a/execnet/gateway_base.py +++ b/execnet/gateway_base.py @@ -186,10 +186,11 @@ raise SystemExit(0) def reconfigure(message, gateway): - py2str_as_py3str, py3str_as_py2str, target = deserialize(message.data) - if target is None: + if message.channelid == 0: target = gateway - target._strconfig = py2str_as_py3str, py3str_as_py2str + else: + target = gateway._channelfactory.new(message.channelid) + target._strconfig = deserialize(message.data, gateway) types = [ status, reconfigure, gateway_terminate, @@ -452,6 +453,17 @@ raise StopIteration __next__ = next + + def reconfigure(self, py2str_as_py3str=True, py3str_as_py2str=False): + """ + set the string coercion for this channel + the default is to try to convert py2 str as py3 str, + but not to try and convert py3 str to py2 str + """ + self._strconfig = (py2str_as_py3str, py3str_as_py2str) + data = serialize(self._strconfig) + self.gateway._send(Message.RECONFIGURE, self.id, data=data) + ENDMARKER = object() INTERRUPT_TEXT = "keyboard-interrupted" @@ -522,7 +534,6 @@ def _local_receive(self, id, data): # executes in receiver thread - data = deserialize(data, self) try: callback, endmarker = self._callbacks[id] except KeyError: @@ -531,9 +542,10 @@ if queue is None: pass # drop data else: - queue.put(data) + queue.put(deserialize(data, channel)) else: try: + data = deserialize(data, self.gateway) #XXX loss of coercion data callback(data) # even if channel may be already closed except KeyboardInterrupt: raise @@ -821,7 +833,7 @@ strconfig = getattr(gateway, '_strconfig', default) self.py2str_as_py3str, self.py3str_as_py2str = strconfig self.stream = stream - self.channelfactory = gateway and gateway._channelfactory + self.channelfactory = getattr(gateway, '_channelfactory', gateway) def load(self): self.stack = [] diff -r 7619dfe47e415b88eb7f5aa90220c04eadbcad98 -r 5aa89a3e602b8d8a18ccf62b9d7f9b0e0639f850 testing/test_channel.py --- a/testing/test_channel.py +++ b/testing/test_channel.py @@ -5,10 +5,10 @@ import py import execnet from execnet import gateway_base, gateway +from testing.test_gateway import _find_version needs_early_gc = py.test.mark.skipif("not hasattr(sys, 'getrefcount')") needs_osdup = py.test.mark.skipif("not hasattr(os, 'dup')") queue = py.builtin._tryimport('queue', 'Queue') - TESTTIMEOUT = 10.0 # seconds class TestChannelBasicBehaviour: @@ -341,3 +341,37 @@ +class TestStringCoerce: + @py.test.mark.skipif('sys.version>="3.0"') + def test_2to3(self): + python = _find_version('3') + gw = execnet.makegateway('popen//python=%s'%python) + ch = gw.remote_exec('channel.send(channel.receive());'*2) + ch.send('a') + res = ch.receive() + assert isinstance(res, unicode) + + ch.reconfigure(py3str_as_py2str=True) + + ch.send('a') + res = ch.receive() + assert isinstance(res, str) + gw.exit() + + @py.test.mark.skipif('sys.version<"3.0"') + def test_3to2(self): + python = _find_version('2') + gw = execnet.makegateway('popen//python=%s'%python) + + ch = gw.remote_exec('channel.send(channel.receive());'*2) + ch.send(bytes('a', 'ascii')) + res = ch.receive() + assert isinstance(res, str) + + ch.reconfigure(py3str_as_py2str=True, py2str_as_py3str=False) + + ch.send('a') + res = ch.receive() + assert isinstance(res, bytes) + gw.exit() + https://bitbucket.org/hpk42/execnet/changeset/0c0c56a3ee94/ changeset: 0c0c56a3ee94 branch: message-framing user: RonnyPfannschmidt date: 2011-10-25 00:23:06 summary: fix string coercion data for channel local receive affected #: 1 file diff -r 5aa89a3e602b8d8a18ccf62b9d7f9b0e0639f850 -r 0c0c56a3ee94f1e8d301b2e54821d869d2773e31 execnet/gateway_base.py --- a/execnet/gateway_base.py +++ b/execnet/gateway_base.py @@ -536,6 +536,7 @@ # executes in receiver thread try: callback, endmarker = self._callbacks[id] + channel = self._channels.get(id) except KeyError: channel = self._channels.get(id) queue = channel and channel._items @@ -545,7 +546,7 @@ queue.put(deserialize(data, channel)) else: try: - data = deserialize(data, self.gateway) #XXX loss of coercion data + data = deserialize(data, channel) callback(data) # even if channel may be already closed except KeyboardInterrupt: raise https://bitbucket.org/hpk42/execnet/changeset/b68bc001748d/ changeset: b68bc001748d branch: message-framing user: RonnyPfannschmidt date: 2011-10-25 10:21:01 summary: we dont need a bytesio for serializing affected #: 1 file diff -r 0c0c56a3ee94f1e8d301b2e54821d869d2773e31 -r b68bc001748da04e389e2e0059cd6e29cda16f03 execnet/gateway_base.py --- a/execnet/gateway_base.py +++ b/execnet/gateway_base.py @@ -981,9 +981,7 @@ _buildopcodes() def serialize(obj): - io = BytesIO() - _Serializer(io).save(obj) - return io.getvalue() + return _Serializer().save(obj) def deserialize(data, channelfactory=None): io = BytesIO(data) @@ -993,8 +991,7 @@ class _Serializer(object): _dispatch = {} - def __init__(self, stream): - self._stream = stream + def __init__(self): self._streamlist = [] def _write(self, data): @@ -1008,7 +1005,7 @@ self._write(opcode.STOP) s = type(self._streamlist[0])().join(self._streamlist) # atomic write - self._stream.write(s) + return s def _save(self, obj): tp = type(obj) https://bitbucket.org/hpk42/execnet/changeset/f49be4b565c7/ changeset: f49be4b565c7 branch: message-framing user: RonnyPfannschmidt date: 2011-10-25 11:41:16 summary: normal debug tracing used exc_info instad of sys.exc_info affected #: 1 file diff -r b68bc001748da04e389e2e0059cd6e29cda16f03 -r f49be4b565c7e348f91129755dd84b4765a0ffd1 execnet/gateway_base.py --- a/execnet/gateway_base.py +++ b/execnet/gateway_base.py @@ -58,7 +58,7 @@ debugfile.flush() except Exception: try: - v = exc_info()[1] + v = sys.exc_info()[1] sys.stderr.write( "[%s] exception during tracing: %r\n" % (pid, v)) except Exception: https://bitbucket.org/hpk42/execnet/changeset/2042207ca5c7/ changeset: 2042207ca5c7 branch: message-framing user: RonnyPfannschmidt date: 2011-10-25 11:44:06 summary: extend the EOFError message string when reading message headers affected #: 1 file diff -r f49be4b565c7e348f91129755dd84b4765a0ffd1 -r 2042207ca5c70b99e1dbbfa55d43cf71688153b1 execnet/gateway_base.py --- a/execnet/gateway_base.py +++ b/execnet/gateway_base.py @@ -116,7 +116,11 @@ @staticmethod def from_io(io): - header = io.read(9) # type 1, channel 4, payload 4 + try: + header = io.read(9) # type 1, channel 4, payload 4 + except EOFError: + e = sys.exc_info()[1] + raise EOFError('couldnt load message header, ' + e.args[0]) msgtype, channel, payload = struct.unpack('!bii', header) return Message(msgtype, channel, io.read(payload)) https://bitbucket.org/hpk42/execnet/changeset/ffeef5a0b8f6/ changeset: ffeef5a0b8f6 branch: message-framing user: RonnyPfannschmidt date: 2011-10-28 15:04:41 summary: remove a * in changelog that makes i invalid restructuredtext affected #: 1 file diff -r 2042207ca5c70b99e1dbbfa55d43cf71688153b1 -r ffeef5a0b8f655999397978cdfd7d9c3abdee48b CHANGELOG --- a/CHANGELOG +++ b/CHANGELOG @@ -1,7 +1,7 @@ 1.0.10.dev -------------------------------- -- group.remote_exec now supports *kwargs as well +- group.remote_exec now supports kwargs as well - support per channel string coercion configuration https://bitbucket.org/hpk42/execnet/changeset/67e3b85ac2a4/ changeset: 67e3b85ac2a4 branch: message-framing user: RonnyPfannschmidt date: 2011-10-29 09:57:52 summary: include string deserialization configuration in callback data affected #: 1 file diff -r ffeef5a0b8f655999397978cdfd7d9c3abdee48b -r 67e3b85ac2a47417f5e5e57e2a82076ffb834447 execnet/gateway_base.py --- a/execnet/gateway_base.py +++ b/execnet/gateway_base.py @@ -284,7 +284,8 @@ olditem = items.get(block=False) except queue.Empty: if not (self._closed or self._receiveclosed.isSet()): - _callbacks[self.id] = (callback, endmarker) + _callbacks[self.id] = (callback, endmarker, + getattr(self, '_strconfig', None)) break else: if olditem is ENDMARKER: @@ -510,7 +511,7 @@ except KeyError: pass try: - callback, endmarker = self._callbacks.pop(id) + callback, endmarker, strconfig = self._callbacks.pop(id) except KeyError: pass else: @@ -539,7 +540,7 @@ def _local_receive(self, id, data): # executes in receiver thread try: - callback, endmarker = self._callbacks[id] + callback, endmarker, strconfig= self._callbacks[id] channel = self._channels.get(id) except KeyError: channel = self._channels.get(id) @@ -550,7 +551,7 @@ queue.put(deserialize(data, channel)) else: try: - data = deserialize(data, channel) + data = deserialize(data, channel, strconfig) callback(data) # even if channel may be already closed except KeyboardInterrupt: raise @@ -830,8 +831,10 @@ py2str_as_py3str = True # True py3str_as_py2str = False # false means py2 will get unicode - def __init__(self, stream, channel_or_gateway=None): - strconfig = default = self.py2str_as_py3str, self.py3str_as_py2str + def __init__(self, stream, channel_or_gateway=None, strconfig=None): + default = self.py2str_as_py3str, self.py3str_as_py2str + if strconfig is None: + strconfig = default gateway = getattr(channel_or_gateway, 'gateway', channel_or_gateway) strconfig = getattr(channel_or_gateway, '_strconfig', default) if strconfig is default: @@ -987,9 +990,9 @@ def serialize(obj): return _Serializer().save(obj) -def deserialize(data, channelfactory=None): +def deserialize(data, channelfactory=None, strconfig=None): io = BytesIO(data) - return Unserializer(io, channelfactory).load() + return Unserializer(io, channelfactory, strconfig).load() class _Serializer(object): https://bitbucket.org/hpk42/execnet/changeset/a6a6d319bd5d/ changeset: a6a6d319bd5d branch: message-framing user: RonnyPfannschmidt date: 2011-10-30 12:10:03 summary: add pytest as dependency for the test env affected #: 1 file diff -r 67e3b85ac2a47417f5e5e57e2a82076ffb834447 -r a6a6d319bd5dcd74d14a38b91452ee59921143ca tox.ini --- a/tox.ini +++ b/tox.ini @@ -31,6 +31,7 @@ changedir=doc deps=:pypi:sphinx py + pytest commands= py.test \ -rsfxX -v --junitxml={envlogdir}/junit-{envname}.xml check_sphinx.py https://bitbucket.org/hpk42/execnet/changeset/283c89d1ecdd/ changeset: 283c89d1ecdd user: RonnyPfannschmidt date: 2011-11-17 21:25:39 summary: dont overwrite BaseGateway._trace with a copy of trace (using __trace instead), since we want that to be used affected #: 1 file diff -r 6aa62cca635c2268c53158e29936dbfd1221b1c5 -r 283c89d1ecddb9b25271c027df353951c4c71be1 execnet/gateway_base.py --- a/execnet/gateway_base.py +++ b/execnet/gateway_base.py @@ -592,11 +592,11 @@ self._unserializer = Unserializer(self._io, self._channelfactory) self._receivelock = threading.RLock() # globals may be NONE at process-termination - self._trace = trace + self.__trace = trace self._geterrortext = geterrortext def _trace(self, *msg): - self._trace(self.id, *msg) + self.__trace(self.id, *msg) def _initreceive(self): self._receiverthread = threading.Thread(name="receiver", https://bitbucket.org/hpk42/execnet/changeset/2a695f79f98d/ changeset: 2a695f79f98d branch: message-framing user: RonnyPfannschmidt date: 2011-11-17 22:08:34 summary: merge default affected #: 1 file diff -r a6a6d319bd5dcd74d14a38b91452ee59921143ca -r 2a695f79f98de731f60ffd9fb62e222aea44aa22 execnet/gateway_base.py --- a/execnet/gateway_base.py +++ b/execnet/gateway_base.py @@ -638,11 +638,11 @@ self._channelfactory = ChannelFactory(self, _startcount) self._receivelock = threading.RLock() # globals may be NONE at process-termination - self._trace = trace + self.__trace = trace self._geterrortext = geterrortext def _trace(self, *msg): - self._trace(self.id, *msg) + self.__trace(self.id, *msg) def _initreceive(self): self._receiverthread = threading.Thread(name="receiver", https://bitbucket.org/hpk42/execnet/changeset/fd16cecde8f3/ changeset: fd16cecde8f3 user: RonnyPfannschmidt date: 2011-11-20 12:30:54 summary: merge upstream affected #: 3 files diff -r 02bbee2639c2c8ea63ccc4b486172f3e670634ae -r fd16cecde8f328cfe9624569aeb61cca95c9f19d CHANGELOG --- a/CHANGELOG +++ b/CHANGELOG @@ -3,6 +3,8 @@ - group.remote_exec now supports *kwargs as well +- Popen2IO.read now reads correct amounts of bytes from nonblocking fd's + - added a ``dont_write_bytecode`` option to Popen gateways, this sets the ``sys.dont_write_bytecode`` flag on the spawned process, this only works on CPython 2.6 and higher. Thanks to Alex Gaynor. diff -r 02bbee2639c2c8ea63ccc4b486172f3e670634ae -r fd16cecde8f328cfe9624569aeb61cca95c9f19d execnet/gateway_base.py --- a/execnet/gateway_base.py +++ b/execnet/gateway_base.py @@ -82,8 +82,8 @@ """Read exactly 'numbytes' bytes from the pipe. """ # a file in non-blocking mode may return less bytes, so we loop buf = bytes() - while len(buf) < numbytes: - data = self._read(numbytes) + while numbytes > len(buf): + data = self._read(numbytes-len(buf)) if not data: raise EOFError("expected %d bytes, got %d" %(numbytes, len(buf))) buf += data @@ -593,11 +593,11 @@ self._unserializer = Unserializer(self._io, self._channelfactory) self._receivelock = threading.RLock() # globals may be NONE at process-termination - self._trace = trace + self.__trace = trace self._geterrortext = geterrortext def _trace(self, *msg): - self._trace(self.id, *msg) + self.__trace(self.id, *msg) def _initreceive(self): self._receiverthread = threading.Thread(name="receiver", diff -r 02bbee2639c2c8ea63ccc4b486172f3e670634ae -r fd16cecde8f328cfe9624569aeb61cca95c9f19d testing/test_basics.py --- a/testing/test_basics.py +++ b/testing/test_basics.py @@ -4,7 +4,12 @@ import execnet from execnet import gateway_base, gateway from execnet.gateway_base import Message, Channel, ChannelFactory, serialize, \ - Unserializer + Unserializer, Popen2IO + +try: + from StringIO import StringIO as BytesIO +except: + from io import BytesIO def test_errors_on_execnet(): assert hasattr(execnet, 'RemoteError') @@ -107,6 +112,17 @@ ret = proc.wait() assert "hello".encode('ascii') in stdout +def test_popen_io_readloop(monkeypatch): + sio = BytesIO('test'.encode('ascii')) + io = Popen2IO(sio, sio) + real_read = io._read + def newread(numbytes): + if numbytes > 1: + numbytes = numbytes-1 + return real_read(numbytes) + io._read = newread + result = io.read(3) + assert result == 'tes'.encode('ascii') def test_rinfo_source(anypython, tmpdir): check = tmpdir.join("check.py") https://bitbucket.org/hpk42/execnet/changeset/8afeb958dd14/ changeset: 8afeb958dd14 branch: message-framing user: RonnyPfannschmidt date: 2011-11-21 12:55:54 summary: close finished message-framing branch affected #: 0 files https://bitbucket.org/hpk42/execnet/changeset/00ecac93ba72/ changeset: 00ecac93ba72 branch: message-framing user: RonnyPfannschmidt date: 2011-11-21 15:36:25 summary: merge from default (causes test fails) affected #: 4 files diff -r 8afeb958dd14888f29c8c75e306cd331d6c341cb -r 00ecac93ba728defdd0a21dd74fbc21eb7ff618d conftest.py --- a/conftest.py +++ b/conftest.py @@ -13,6 +13,8 @@ 'python2.5': r'C:\Python25\python.exe', 'python2.4': r'C:\Python24\python.exe', 'python3.1': r'C:\Python31\python.exe', + 'python3.2': r'C:\Python31\python.exe', + 'python3.3': r'C:\Python31\python.exe', } def pytest_runtest_setup(item, __multicall__): @@ -81,12 +83,12 @@ gwtypes = [metafunc.cls.gwtype] else: gwtypes = ['popen', 'socket', 'ssh'] - for gwtype in gwtypes: - metafunc.addcall(id=gwtype, param=gwtype) + metafunc.parametrize("gw", gwtypes, indirect=True) elif 'anypython' in metafunc.funcargnames: - for name in ('python3.1', 'python2.4', 'python2.5', 'python2.6', - 'python2.7', 'pypy-c', 'jython'): - metafunc.addcall(id=name, param=name) + metafunc.parametrize("anypython", indirect=True, argvalues= + ('python3.3', 'python3.2', 'python2.4', 'python2.5', + 'python2.6', 'python2.7', 'pypy-c', 'jython') + ) def getexecutable(name, cache={}): try: @@ -113,6 +115,7 @@ executable = py.path.local(executable) if executable.check(): return executable + executable = None py.test.skip("no %s found" % (name,)) return executable diff -r 8afeb958dd14888f29c8c75e306cd331d6c341cb -r 00ecac93ba728defdd0a21dd74fbc21eb7ff618d execnet/gateway_base.py --- a/execnet/gateway_base.py +++ b/execnet/gateway_base.py @@ -50,6 +50,7 @@ elif DEBUG: import tempfile, os.path fn = os.path.join(tempfile.gettempdir(), 'execnet-debug-%d' % pid) + #sys.stderr.write("execnet-debug at %r" %(fn,)) debugfile = open(fn, 'w') def trace(*msg): try: diff -r 8afeb958dd14888f29c8c75e306cd331d6c341cb -r 00ecac93ba728defdd0a21dd74fbc21eb7ff618d testing/test_gateway.py --- a/testing/test_gateway.py +++ b/testing/test_gateway.py @@ -350,8 +350,11 @@ monkeypatch.setenv("TEMP", tmpdir) # windows monkeypatch.setenv('EXECNET_DEBUG', "1") gw = execnet.makegateway("popen") - pid = gw.remote_exec("import os ; channel.send(os.getpid())").receive() - slavefile = tmpdir.join("execnet-debug-%s" % pid) + # hack out the debuffilename + fn = gw.remote_exec( + "import execnet;channel.send(execnet.gateway_base.fn)" + ).receive() + slavefile = py.path.local(fn) assert slavefile.check() slave_line = "creating slavegateway" for line in slavefile.readlines(): diff -r 8afeb958dd14888f29c8c75e306cd331d6c341cb -r 00ecac93ba728defdd0a21dd74fbc21eb7ff618d tox.ini --- a/tox.ini +++ b/tox.ini @@ -14,7 +14,7 @@ [testenv] changedir=testing -deps=pytest +deps=pytest>=2.2.0.dev8 commands=py.test -rsfxX --junitxml={envlogdir}/junit-{envname}.xml [] [testenv:py26-py134] @@ -30,7 +30,7 @@ basepython=python changedir=doc deps=:pypi:sphinx - py + pytest pytest commands= py.test \ https://bitbucket.org/hpk42/execnet/changeset/d6ed9c4ddde0/ changeset: d6ed9c4ddde0 branch: message-framing user: RonnyPfannschmidt date: 2011-11-21 15:41:38 summary: correct the test for message io affected #: 1 file diff -r 00ecac93ba728defdd0a21dd74fbc21eb7ff618d -r d6ed9c4ddde0cc2fed86f5a78ea2b2353a9c6cf4 testing/test_basics.py --- a/testing/test_basics.py +++ b/testing/test_basics.py @@ -72,11 +72,10 @@ temp_out = BytesIO() temp_in = BytesIO() io = Popen2IO(temp_out, temp_in) - unserializer = Unserializer(io) for i, handler in enumerate(Message._types): print ("checking %s %s" %(i, handler)) for data in "hello", "hello".encode('ascii'): - msg1 = Message(i, i, data) + msg1 = Message(i, i, serialize(data)) msg1.to_io(io) x = io.outfile.getvalue() io.outfile.truncate(0) https://bitbucket.org/hpk42/execnet/changeset/e19a34aa9599/ changeset: e19a34aa9599 branch: message-framing user: RonnyPfannschmidt date: 2011-11-21 16:08:19 summary: add a Noop flushable object to test_terminate_implicit_does_trykill in order to compensate for cpython 3.2 stdout flush behaviour affected #: 1 file diff -r d6ed9c4ddde0cc2fed86f5a78ea2b2353a9c6cf4 -r e19a34aa959903977ee5c29dac1c48f8673cd1b1 testing/test_termination.py --- a/testing/test_termination.py +++ b/testing/test_termination.py @@ -94,6 +94,13 @@ sys.stdout.write("1\\n") sys.stdout.flush() sys.stdout.close() + class FlushNoOp(object): + def flush(self): + pass + #replace stdout since some python implementations flush and print errors (for example 3.2) + # see Issue #5319 (from the release notes of 3.2 Alpha 2) + sys.stdout = FlushNoOp() + # use process at-exit group.terminate call """ % str(execnetdir)) popen = subprocess.Popen([str(anypython), str(p)], stdout=subprocess.PIPE) https://bitbucket.org/hpk42/execnet/changeset/9cd737edf513/ changeset: 9cd737edf513 branch: message-framing user: RonnyPfannschmidt date: 2011-11-21 16:24:11 summary: close the message-framing branch again affected #: 0 files https://bitbucket.org/hpk42/execnet/changeset/8d4c5071de22/ changeset: 8d4c5071de22 user: RonnyPfannschmidt date: 2011-11-21 16:24:39 summary: merge message-framing to default affected #: 8 files diff -r fd16cecde8f328cfe9624569aeb61cca95c9f19d -r 8d4c5071de22c882196015c47b8defb689b12a8a CHANGELOG --- a/CHANGELOG +++ b/CHANGELOG @@ -1,7 +1,9 @@ 1.0.10.dev -------------------------------- -- group.remote_exec now supports *kwargs as well +- group.remote_exec now supports kwargs as well + +- support per channel string coercion configuration - Popen2IO.read now reads correct amounts of bytes from nonblocking fd's diff -r fd16cecde8f328cfe9624569aeb61cca95c9f19d -r 8d4c5071de22c882196015c47b8defb689b12a8a execnet/gateway.py --- a/execnet/gateway.py +++ b/execnet/gateway.py @@ -6,7 +6,7 @@ import sys, os, inspect, types, linecache import textwrap import execnet -from execnet.gateway_base import Message, Popen2IO +from execnet.gateway_base import Message, Popen2IO, serialize from execnet import gateway_base importdir = os.path.dirname(os.path.dirname(execnet.__file__)) @@ -54,9 +54,9 @@ the default is to try to convert py2 str as py3 str, but not to try and convert py3 str to py2 str """ - self._unserializer.py2str_as_py3str = py2str_as_py3str - self._unserializer.py3str_as_py2str = py3str_as_py2str - self._send(Message.RECONFIGURE, data=(py2str_as_py3str, py3str_as_py2str)) + self._strconfig = (py2str_as_py3str, py3str_as_py2str) + data = serialize(self._strconfig) + self._send(Message.RECONFIGURE, data=data) def _remote_bootstrap_gateway(self, io): """ send gateway bootstrap code to a remote Python interpreter @@ -124,7 +124,7 @@ channel = self.newchannel() self._send(Message.CHANNEL_EXEC, channel.id, - (source, call_name, kwargs)) + serialize((source, call_name, kwargs))) return channel def remote_init_threads(self, num=None): diff -r fd16cecde8f328cfe9624569aeb61cca95c9f19d -r 8d4c5071de22c882196015c47b8defb689b12a8a execnet/gateway_base.py --- a/execnet/gateway_base.py +++ b/execnet/gateway_base.py @@ -12,6 +12,11 @@ except ImportError: import Queue as queue +try: + from io import BytesIO +except: + from StringIO import StringIO as BytesIO + ISPY3 = sys.version_info >= (3, 0) if ISPY3: exec("def do_exec(co, loc): exec(co, loc)\n" @@ -54,7 +59,7 @@ debugfile.flush() except Exception: try: - v = exc_info()[1] + v = sys.exc_info()[1] sys.stderr.write( "[%s] exception during tracing: %r\n" % (pid, v)) except Exception: @@ -110,18 +115,44 @@ self.channelid = channelid self.data = data + @staticmethod + def from_io(io): + try: + header = io.read(9) # type 1, channel 4, payload 4 + except EOFError: + e = sys.exc_info()[1] + raise EOFError('couldnt load message header, ' + e.args[0]) + msgtype, channel, payload = struct.unpack('!bii', header) + return Message(msgtype, channel, io.read(payload)) + + def to_io(self, io): + header = struct.pack('!bii', self.msgcode, self.channelid, len(self.data)) + io.write(header+self.data) + def received(self, gateway): self._types[self.msgcode](self, gateway) def __repr__(self): + class FakeChannel(object): + _strconfig = False, False # never transform, never fail + def __init__(self, id): + self.id = id + def __repr__(self): + return '' % self.id + FakeChannel.new = FakeChannel + FakeChannel.gateway = FakeChannel name = self._types[self.msgcode].__name__.upper() - r = repr(self.data) + try: + data = deserialize(self.data, FakeChannel) + except UnserializationError: + data = self.data + r = repr(data) if len(r) > 50: return "" %(name, self.channelid, len(r)) else: return "" %(name, - self.channelid, self.data) + self.channelid, r) def _setupmessages(): def status(message, gateway): @@ -136,11 +167,11 @@ 'numchannels': len(active_channels), 'numexecuting': numexec } - gateway._send(Message.CHANNEL_DATA, message.channelid, d) + gateway._send(Message.CHANNEL_DATA, message.channelid, serialize(d)) def channel_exec(message, gateway): channel = gateway._channelfactory.new(message.channelid) - gateway._local_schedulexec(channel=channel, sourcetask=message.data) + gateway._local_schedulexec(channel=channel,sourcetask=message.data) def channel_data(message, gateway): gateway._channelfactory._local_receive(message.channelid, message.data) @@ -149,7 +180,7 @@ gateway._channelfactory._local_close(message.channelid) def channel_close_error(message, gateway): - remote_error = RemoteError(message.data) + remote_error = RemoteError(deserialize(message.data)) gateway._channelfactory._local_close(message.channelid, remote_error) def channel_last_message(message, gateway): @@ -160,9 +191,11 @@ raise SystemExit(0) def reconfigure(message, gateway): - py2str_as_py3str, py3str_as_py2str = message.data - gateway._unserializer.py2str_as_py3str = py2str_as_py3str - gateway._unserializer.py3str_as_py2str = py3str_as_py2str + if message.channelid == 0: + target = gateway + else: + target = gateway._channelfactory.new(message.channelid) + target._strconfig = deserialize(message.data, gateway) types = [ status, reconfigure, gateway_terminate, @@ -252,7 +285,8 @@ olditem = items.get(block=False) except queue.Empty: if not (self._closed or self._receiveclosed.isSet()): - _callbacks[self.id] = (callback, endmarker) + _callbacks[self.id] = (callback, endmarker, + getattr(self, '_strconfig', None)) break else: if olditem is ENDMARKER: @@ -341,7 +375,7 @@ if not self._receiveclosed.isSet(): put = self.gateway._send if error is not None: - put(Message.CHANNEL_CLOSE_ERROR, self.id, error) + put(Message.CHANNEL_CLOSE_ERROR, self.id, serialize(error)) else: put(Message.CHANNEL_CLOSE, self.id) self._trace("sent channel close message") @@ -381,7 +415,7 @@ """ if self.isclosed(): raise IOError("cannot send to %r" %(self,)) - self.gateway._send(Message.CHANNEL_DATA, self.id, item) + self.gateway._send(Message.CHANNEL_DATA, self.id, serialize(item)) def receive(self, timeout=-1): """receive a data item that was sent from the other side. @@ -425,6 +459,17 @@ raise StopIteration __next__ = next + + def reconfigure(self, py2str_as_py3str=True, py3str_as_py2str=False): + """ + set the string coercion for this channel + the default is to try to convert py2 str as py3 str, + but not to try and convert py3 str to py2 str + """ + self._strconfig = (py2str_as_py3str, py3str_as_py2str) + data = serialize(self._strconfig) + self.gateway._send(Message.RECONFIGURE, self.id, data=data) + ENDMARKER = object() INTERRUPT_TEXT = "keyboard-interrupted" @@ -467,7 +512,7 @@ except KeyError: pass try: - callback, endmarker = self._callbacks.pop(id) + callback, endmarker, strconfig = self._callbacks.pop(id) except KeyError: pass else: @@ -496,16 +541,18 @@ def _local_receive(self, id, data): # executes in receiver thread try: - callback, endmarker = self._callbacks[id] + callback, endmarker, strconfig= self._callbacks[id] + channel = self._channels.get(id) except KeyError: channel = self._channels.get(id) queue = channel and channel._items if queue is None: pass # drop data else: - queue.put(data) + queue.put(deserialize(data, channel)) else: try: + data = deserialize(data, channel, strconfig) callback(data) # even if channel may be already closed except KeyboardInterrupt: raise @@ -513,7 +560,7 @@ excinfo = sys.exc_info() self.gateway._trace("exception during callback: %s" % excinfo[1]) errortext = self.gateway._geterrortext(excinfo) - self.gateway._send(Message.CHANNEL_CLOSE_ERROR, id, errortext) + self.gateway._send(Message.CHANNEL_CLOSE_ERROR, id, serialize(errortext)) self._local_close(id, errortext) def _finished_receiving(self): @@ -590,7 +637,6 @@ self._io = io self.id = id self._channelfactory = ChannelFactory(self, _startcount) - self._unserializer = Unserializer(self._io, self._channelfactory) self._receivelock = threading.RLock() # globals may be NONE at process-termination self.__trace = trace @@ -608,10 +654,11 @@ def _thread_receiver(self): self._trace("RECEIVERTHREAD: starting to run") eof = False + io = self._io try: try: while 1: - msg = Message(*self._unserializer.load()) + msg = Message.from_io(io) self._trace("received", msg) _receivelock = self._receivelock _receivelock.acquire() @@ -644,8 +691,9 @@ def _terminate_execution(self): pass - def _send(self, msgcode, channelid=0, data=''): - serialize(self._io, (msgcode, channelid, data)) + def _send(self, msgcode, channelid=0, data=bytes()): + header = struct.pack('!bii', msgcode, channelid, len(data)) + self._io.write(header+data) self._trace('sent', Message(msgcode, channelid, data)) def _local_schedulexec(self, channel, sourcetask): @@ -672,6 +720,7 @@ class SlaveGateway(BaseGateway): def _local_schedulexec(self, channel, sourcetask): + sourcetask = deserialize(sourcetask, self._channelfactory) self._execqueue.put((channel, sourcetask)) def _terminate_execution(self): @@ -783,9 +832,17 @@ py2str_as_py3str = True # True py3str_as_py2str = False # false means py2 will get unicode - def __init__(self, stream, channelfactory=None): + def __init__(self, stream, channel_or_gateway=None, strconfig=None): + default = self.py2str_as_py3str, self.py3str_as_py2str + if strconfig is None: + strconfig = default + gateway = getattr(channel_or_gateway, 'gateway', channel_or_gateway) + strconfig = getattr(channel_or_gateway, '_strconfig', default) + if strconfig is default: + strconfig = getattr(gateway, '_strconfig', default) + self.py2str_as_py3str, self.py3str_as_py2str = strconfig self.stream = stream - self.channelfactory = channelfactory + self.channelfactory = getattr(gateway, '_channelfactory', gateway) def load(self): self.stack = [] @@ -931,14 +988,18 @@ _buildopcodes() -def serialize(io, obj): - _Serializer(io).save(obj) +def serialize(obj): + return _Serializer().save(obj) + +def deserialize(data, channelfactory=None, strconfig=None): + io = BytesIO(data) + return Unserializer(io, channelfactory, strconfig).load() + class _Serializer(object): _dispatch = {} - def __init__(self, stream): - self._stream = stream + def __init__(self): self._streamlist = [] def _write(self, data): @@ -952,7 +1013,7 @@ self._write(opcode.STOP) s = type(self._streamlist[0])().join(self._streamlist) # atomic write - self._stream.write(s) + return s def _save(self, obj): tp = type(obj) diff -r fd16cecde8f328cfe9624569aeb61cca95c9f19d -r 8d4c5071de22c882196015c47b8defb689b12a8a testing/test_basics.py --- a/testing/test_basics.py +++ b/testing/test_basics.py @@ -72,21 +72,20 @@ temp_out = BytesIO() temp_in = BytesIO() io = Popen2IO(temp_out, temp_in) - unserializer = Unserializer(io) for i, handler in enumerate(Message._types): print ("checking %s %s" %(i, handler)) for data in "hello", "hello".encode('ascii'): - msg1 = Message(i, i, data) - serialize(io, (i, i, data)) + msg1 = Message(i, i, serialize(data)) + msg1.to_io(io) x = io.outfile.getvalue() io.outfile.truncate(0) io.outfile.seek(0) io.infile.seek(0) io.infile.write(x) io.infile.seek(0) - msg2 = Message(*unserializer.load()) + msg2 = Message.from_io(io) assert msg1.channelid == msg2.channelid, (msg1, msg2) - assert msg1.data == msg2.data + assert msg1.data == msg2.data, (msg1.data, msg2.data) assert msg1.msgcode == msg2.msgcode print ("all passed") """)) @@ -194,9 +193,9 @@ for i, handler in enumerate(Message._types): one = py.io.BytesIO() data = '23'.encode('ascii') - serialize(one, (i, 42, data)) + Message(i, 42, data).to_io(one) two = py.io.BytesIO(one.getvalue()) - msg = Message(*Unserializer(two, None).load()) + msg = Message.from_io(two) assert msg.msgcode == i assert isinstance(msg, Message) assert msg.channelid == 42 diff -r fd16cecde8f328cfe9624569aeb61cca95c9f19d -r 8d4c5071de22c882196015c47b8defb689b12a8a testing/test_channel.py --- a/testing/test_channel.py +++ b/testing/test_channel.py @@ -5,10 +5,10 @@ import py import execnet from execnet import gateway_base, gateway +from testing.test_gateway import _find_version needs_early_gc = py.test.mark.skipif("not hasattr(sys, 'getrefcount')") needs_osdup = py.test.mark.skipif("not hasattr(os, 'dup')") queue = py.builtin._tryimport('queue', 'Queue') - TESTTIMEOUT = 10.0 # seconds class TestChannelBasicBehaviour: @@ -341,3 +341,37 @@ +class TestStringCoerce: + @py.test.mark.skipif('sys.version>="3.0"') + def test_2to3(self): + python = _find_version('3') + gw = execnet.makegateway('popen//python=%s'%python) + ch = gw.remote_exec('channel.send(channel.receive());'*2) + ch.send('a') + res = ch.receive() + assert isinstance(res, unicode) + + ch.reconfigure(py3str_as_py2str=True) + + ch.send('a') + res = ch.receive() + assert isinstance(res, str) + gw.exit() + + @py.test.mark.skipif('sys.version<"3.0"') + def test_3to2(self): + python = _find_version('2') + gw = execnet.makegateway('popen//python=%s'%python) + + ch = gw.remote_exec('channel.send(channel.receive());'*2) + ch.send(bytes('a', 'ascii')) + res = ch.receive() + assert isinstance(res, str) + + ch.reconfigure(py3str_as_py2str=True, py2str_as_py3str=False) + + ch.send('a') + res = ch.receive() + assert isinstance(res, bytes) + gw.exit() + diff -r fd16cecde8f328cfe9624569aeb61cca95c9f19d -r 8d4c5071de22c882196015c47b8defb689b12a8a testing/test_serializer.py --- a/testing/test_serializer.py +++ b/testing/test_serializer.py @@ -56,7 +56,7 @@ from execnet import gateway_base as serializer if sys.version_info > (3, 0): # Need binary output sys.stdout = sys.stdout.detach() -saver = serializer.serialize(sys.stdout, %s) +sys.stdout.write(serializer.serialize(%s)) """ % (pyimportdir, obj_rep,)) popen = subprocess.Popen([str(self.executable), str(script_file)], stdin=subprocess.PIPE, diff -r fd16cecde8f328cfe9624569aeb61cca95c9f19d -r 8d4c5071de22c882196015c47b8defb689b12a8a testing/test_termination.py --- a/testing/test_termination.py +++ b/testing/test_termination.py @@ -94,6 +94,13 @@ sys.stdout.write("1\\n") sys.stdout.flush() sys.stdout.close() + class FlushNoOp(object): + def flush(self): + pass + #replace stdout since some python implementations flush and print errors (for example 3.2) + # see Issue #5319 (from the release notes of 3.2 Alpha 2) + sys.stdout = FlushNoOp() + # use process at-exit group.terminate call """ % str(execnetdir)) popen = subprocess.Popen([str(anypython), str(p)], stdout=subprocess.PIPE) diff -r fd16cecde8f328cfe9624569aeb61cca95c9f19d -r 8d4c5071de22c882196015c47b8defb689b12a8a tox.ini --- a/tox.ini +++ b/tox.ini @@ -31,6 +31,7 @@ changedir=doc deps=:pypi:sphinx pytest + pytest commands= py.test \ -rsfxX -v --junitxml={envlogdir}/junit-{envname}.xml check_sphinx.py https://bitbucket.org/hpk42/execnet/changeset/166e57dbf990/ changeset: 166e57dbf990 user: RonnyPfannschmidt date: 2011-11-21 16:28:54 summary: remove a duplicate pytest dep entry from tox.ini (cherrypick merge fuzz) affected #: 1 file diff -r 8d4c5071de22c882196015c47b8defb689b12a8a -r 166e57dbf99016eb037c012a8da754cf4c62151b tox.ini --- a/tox.ini +++ b/tox.ini @@ -31,7 +31,6 @@ changedir=doc deps=:pypi:sphinx pytest - pytest commands= py.test \ -rsfxX -v --junitxml={envlogdir}/junit-{envname}.xml check_sphinx.py https://bitbucket.org/hpk42/execnet/changeset/c24469809810/ changeset: c24469809810 user: RonnyPfannschmidt date: 2011-11-22 00:29:20 summary: merge upstream affected #: 1 file diff -r 166e57dbf99016eb037c012a8da754cf4c62151b -r c2446980981075060ec9188b33b50135c4909df7 testing/test_serializer.py --- a/testing/test_serializer.py +++ b/testing/test_serializer.py @@ -103,46 +103,23 @@ def pytest_funcarg__py3(request): return _py3_wrapper -def pytest_funcarg__dump(request): - py_dump = request.getfuncargvalue(request.param[0]) - return py_dump.dump +simple_tests = [ +# type: expected before/after repr + ('int', '4'), + ('float', '3.25'), + ('list', '[1, 2, 3]'), + ('tuple', '(1, 2, 3)'), + ('dict', '{(1, 2, 3): 32}'), +] -def pytest_funcarg__load(request): - py_dump = request.getfuncargvalue(request.param[1]) - return py_dump.load - -def pytest_generate_tests(metafunc): - if 'dump' in metafunc.funcargnames and 'load' in metafunc.funcargnames: - pys = 'py2', 'py3' - for dump in pys: - for load in pys: - param = (dump, load) - conversion = '%s to %s'%param - if 'repr' not in metafunc.funcargnames: - metafunc.addcall(id=conversion, param=param) - else: - for tp, repr in simple_tests.items(): - metafunc.addcall( - id='%s:%s'%(tp, conversion), - param=param, - funcargs={'tp_name':tp, 'repr':repr}, - ) - - -simple_tests = { -# type: expected before/after repr - 'int': '4', - 'float':'3.25', - 'list': '[1, 2, 3]', - 'tuple': '(1, 2, 3)', - 'dict': '{(1, 2, 3): 32}', -} - -def test_simple(tp_name, repr, dump, load): - p = dump(repr) - tp , v = load(p) - assert tp == tp_name - assert v == repr + at py.test.mark.parametrize(["tp_name", "repr"], simple_tests) +def test_simple(tp_name, repr, py2, py3): + for load in py2.load, py3.load: + for dump in py3.dump, py2.dump: + p = dump(repr) + tp , v = load(p) + assert tp == tp_name + assert v == repr def test_set(py2, py3): for dump in py2.dump, py3.dump: Repository URL: https://bitbucket.org/hpk42/execnet/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email. From commits-noreply at bitbucket.org Fri Dec 2 15:22:47 2011 From: commits-noreply at bitbucket.org (Bitbucket) Date: Fri, 02 Dec 2011 14:22:47 -0000 Subject: [execnet-commit] commit/execnet: RonnyPfannschmidt: ensure RSync will reconfigure the channels to keep strings as is, fixes #2 Message-ID: <20111202142247.13673.8031@bitbucket13.managed.contegix.com> 1 new commit in execnet: https://bitbucket.org/hpk42/execnet/changeset/b986b7655fa9/ changeset: b986b7655fa9 user: RonnyPfannschmidt date: 2011-12-02 15:22:05 summary: ensure RSync will reconfigure the channels to keep strings as is, fixes #2 affected #: 2 files diff -r c2446980981075060ec9188b33b50135c4909df7 -r b986b7655fa9ae4080528c9e81bf2df8cad656c2 execnet/rsync.py --- a/execnet/rsync.py +++ b/execnet/rsync.py @@ -152,6 +152,7 @@ def itemcallback(req): self._receivequeue.put((channel, req)) channel = gateway.remote_exec(execnet.rsync_remote) + channel.reconfigure(py2str_as_py3str=False, py3str_as_py2str=False) channel.setcallback(itemcallback, endmarker = None) channel.send((str(destdir), options)) self._channels[channel] = finishedcallback diff -r c2446980981075060ec9188b33b50135c4909df7 -r b986b7655fa9ae4080528c9e81bf2df8cad656c2 testing/test_rsync.py --- a/testing/test_rsync.py +++ b/testing/test_rsync.py @@ -200,3 +200,15 @@ assert len(dest.listdir()) == 1 assert len(source.listdir()) == 1 + + @py.test.mark.skip_if('sys.version_info >= (3)') + def test_2_to_3_bridge_can_send_binary_files(self, tmpdir): + gw = execnet.makegateway('popen//python=python3') + source = tmpdir.ensure('source', dir=1) + for i, content in enumerate('foo bar baz \x10foo'): + source.join(str(i)).write(content) + rsync = RSync(source) + + target = tmpdir.join('target') + rsync.add_target(gw, target) + rsync.send() Repository URL: https://bitbucket.org/hpk42/execnet/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email. From commits-noreply at bitbucket.org Fri Dec 2 15:42:25 2011 From: commits-noreply at bitbucket.org (Bitbucket) Date: Fri, 02 Dec 2011 14:42:25 -0000 Subject: [execnet-commit] commit/execnet: RonnyPfannschmidt: changelog entry and test enhancement for issue #2 (rsync from 2 to 3) Message-ID: <20111202144225.32583.87924@bitbucket12.managed.contegix.com> 1 new commit in execnet: https://bitbucket.org/hpk42/execnet/changeset/7bdfdda4dfb6/ changeset: 7bdfdda4dfb6 user: RonnyPfannschmidt date: 2011-12-02 15:41:52 summary: changelog entry and test enhancement for issue #2 (rsync from 2 to 3) affected #: 2 files diff -r b986b7655fa9ae4080528c9e81bf2df8cad656c2 -r 7bdfdda4dfb68a8f9b3be89ab390e847078d626e CHANGELOG --- a/CHANGELOG +++ b/CHANGELOG @@ -15,6 +15,9 @@ DNS queries for unknown hosts actually are resolved as such (Thanks Alex Gaynor) +- fix issue #2 - propperly reconfigure the channels string coercion for rsync, + so it can send from python2 to python3 + 1.0.9 -------------------------------- diff -r b986b7655fa9ae4080528c9e81bf2df8cad656c2 -r 7bdfdda4dfb68a8f9b3be89ab390e847078d626e testing/test_rsync.py --- a/testing/test_rsync.py +++ b/testing/test_rsync.py @@ -1,6 +1,7 @@ import py from execnet import RSync import execnet +from testing.test_serializer import _find_version def pytest_funcarg__gw1(request): return request.cached_setup( @@ -203,7 +204,8 @@ @py.test.mark.skip_if('sys.version_info >= (3)') def test_2_to_3_bridge_can_send_binary_files(self, tmpdir): - gw = execnet.makegateway('popen//python=python3') + python = _find_version('3') + gw = execnet.makegateway('popen//python=%s'%(python,)) source = tmpdir.ensure('source', dir=1) for i, content in enumerate('foo bar baz \x10foo'): source.join(str(i)).write(content) Repository URL: https://bitbucket.org/hpk42/execnet/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email. From commits-noreply at bitbucket.org Tue Dec 20 12:56:47 2011 From: commits-noreply at bitbucket.org (Bitbucket) Date: Tue, 20 Dec 2011 11:56:47 -0000 Subject: [execnet-commit] commit/execnet: RonnyPfannschmidt: decouple str coercion of gateways and channels, so channels dont change if gateway is reconfigured, fixes #1 Message-ID: <20111220115647.10872.36037@bitbucket12.managed.contegix.com> 1 new commit in execnet: https://bitbucket.org/hpk42/execnet/changeset/dea1a338781d/ changeset: dea1a338781d user: RonnyPfannschmidt date: 2011-12-20 12:54:44 summary: decouple str coercion of gateways and channels, so channels dont change if gateway is reconfigured, fixes #1 affected #: 3 files diff -r 7bdfdda4dfb68a8f9b3be89ab390e847078d626e -r dea1a338781d8f40d335339fc3dcc7040bb08f7f CHANGELOG --- a/CHANGELOG +++ b/CHANGELOG @@ -15,6 +15,8 @@ DNS queries for unknown hosts actually are resolved as such (Thanks Alex Gaynor) +- fix issue 1 - decouple string coercion of channels and gateway + - fix issue #2 - propperly reconfigure the channels string coercion for rsync, so it can send from python2 to python3 diff -r 7bdfdda4dfb68a8f9b3be89ab390e847078d626e -r dea1a338781d8f40d335339fc3dcc7040bb08f7f execnet/gateway_base.py --- a/execnet/gateway_base.py +++ b/execnet/gateway_base.py @@ -253,6 +253,8 @@ def __init__(self, gateway, id): assert isinstance(id, int) self.gateway = gateway + #XXX: defaults copied from Unserializer + self._strconfig = getattr(gateway, '_strconfig', (True, False)) self.id = id self._items = queue.Queue() self._closed = False @@ -285,8 +287,11 @@ olditem = items.get(block=False) except queue.Empty: if not (self._closed or self._receiveclosed.isSet()): - _callbacks[self.id] = (callback, endmarker, - getattr(self, '_strconfig', None)) + _callbacks[self.id] = ( + callback, + endmarker, + self._strconfig, + ) break else: if olditem is ENDMARKER: @@ -636,6 +641,7 @@ def __init__(self, io, id, _startcount=2): self._io = io self.id = id + self._strconfig = Unserializer.py2str_as_py3str, Unserializer.py3str_as_py2str self._channelfactory = ChannelFactory(self, _startcount) self._receivelock = threading.RLock() # globals may be NONE at process-termination @@ -720,7 +726,7 @@ class SlaveGateway(BaseGateway): def _local_schedulexec(self, channel, sourcetask): - sourcetask = deserialize(sourcetask, self._channelfactory) + sourcetask = deserialize(sourcetask, self) self._execqueue.put((channel, sourcetask)) def _terminate_execution(self): @@ -833,14 +839,10 @@ py3str_as_py2str = False # false means py2 will get unicode def __init__(self, stream, channel_or_gateway=None, strconfig=None): - default = self.py2str_as_py3str, self.py3str_as_py2str - if strconfig is None: - strconfig = default gateway = getattr(channel_or_gateway, 'gateway', channel_or_gateway) - strconfig = getattr(channel_or_gateway, '_strconfig', default) - if strconfig is default: - strconfig = getattr(gateway, '_strconfig', default) - self.py2str_as_py3str, self.py3str_as_py2str = strconfig + strconfig = getattr(channel_or_gateway, '_strconfig', strconfig) + if strconfig: + self.py2str_as_py3str, self.py3str_as_py2str = strconfig self.stream = stream self.channelfactory = getattr(gateway, '_channelfactory', gateway) diff -r 7bdfdda4dfb68a8f9b3be89ab390e847078d626e -r dea1a338781d8f40d335339fc3dcc7040bb08f7f testing/test_channel.py --- a/testing/test_channel.py +++ b/testing/test_channel.py @@ -356,6 +356,18 @@ ch.send('a') res = ch.receive() assert isinstance(res, str) + + gw.reconfigure(py3str_as_py2str=True) + ch = gw.remote_exec('channel.send(channel.receive());'*2) + + ch.send('a') + res = ch.receive() + assert isinstance(res, str) + ch.reconfigure(py3str_as_py2str=False, py2str_as_py3str=False) + + ch.send('a') + res = ch.receive() + assert isinstance(res, str) gw.exit() @py.test.mark.skipif('sys.version<"3.0"') @@ -373,5 +385,18 @@ ch.send('a') res = ch.receive() assert isinstance(res, bytes) + + gw.reconfigure(py3str_as_py2str=True, py2str_as_py3str=False) + ch = gw.remote_exec('channel.send(channel.receive());'*2) + + ch.send('a') + res = ch.receive() + assert isinstance(res, bytes) + + ch.reconfigure(py3str_as_py2str=False, py2str_as_py3str=True) + ch.send(bytes('a', 'ascii')) + res = ch.receive() + assert isinstance(res, str) + gw.exit() Repository URL: https://bitbucket.org/hpk42/execnet/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email. From commits-noreply at bitbucket.org Tue Dec 20 13:04:48 2011 From: commits-noreply at bitbucket.org (Bitbucket) Date: Tue, 20 Dec 2011 12:04:48 -0000 Subject: [execnet-commit] commit/execnet: RonnyPfannschmidt: enable socketserver as remote_exec module Message-ID: <20111220120448.11304.25912@bitbucket03.managed.contegix.com> 1 new commit in execnet: https://bitbucket.org/hpk42/execnet/changeset/76211fbbacf3/ changeset: 76211fbbacf3 user: RonnyPfannschmidt date: 2011-12-20 12:56:56 summary: enable socketserver as remote_exec module affected #: 3 files diff -r dea1a338781d8f40d335339fc3dcc7040bb08f7f -r 76211fbbacf3284e9b98d5fb68d3b7603572b095 CHANGELOG --- a/CHANGELOG +++ b/CHANGELOG @@ -20,6 +20,8 @@ - fix issue #2 - propperly reconfigure the channels string coercion for rsync, so it can send from python2 to python3 +- refactor socketserver, so it can be directly remote_exec'd for starting a socket gateway on a remote + 1.0.9 -------------------------------- diff -r dea1a338781d8f40d335339fc3dcc7040bb08f7f -r 76211fbbacf3284e9b98d5fb68d3b7603572b095 execnet/gateway_socket.py --- a/execnet/gateway_socket.py +++ b/execnet/gateway_socket.py @@ -73,17 +73,11 @@ host, port = hostport mydir = os.path.dirname(__file__) - socketserver = os.path.join(mydir, 'script', 'socketserver.py') - socketserverbootstrap = "\n".join([ - open(socketserver, 'r').read(), """if 1: - import socket - sock = bind_and_listen((%r, %r)) - port = sock.getsockname() - channel.send(port) - startserver(sock) - """ % (host, port)]) + from execnet.script import socketserver + # execute the above socketserverbootstrap on the other side - channel = gateway.remote_exec(socketserverbootstrap) + channel = gateway.remote_exec(socketserver) + channel.send((host, port)) (realhost, realport) = channel.receive() #self._trace("new_remote received" # "port=%r, hostname = %r" %(realport, hostname)) diff -r dea1a338781d8f40d335339fc3dcc7040bb08f7f -r 76211fbbacf3284e9b98d5fb68d3b7603572b095 execnet/script/socketserver.py --- a/execnet/script/socketserver.py +++ b/execnet/script/socketserver.py @@ -2,6 +2,13 @@ """ start socket based minimal readline exec server + + it can exeuted in 2 modes of operation + + 1. as normal script, that listens for new connections + + 2. via existing_gateway.remote_exec (as imported module) + """ # this part of the program only executes on the server side # @@ -97,4 +104,9 @@ hostport = ':8888' serversock = bind_and_listen(hostport) startserver(serversock, loop=False) - +elif __name__=='__channelexec__': + bindname = channel.receive() + sock = bind_and_listen(bindname) + port = sock.getsockname() + channel.send(port) + startserver(sock) Repository URL: https://bitbucket.org/hpk42/execnet/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email. From commits-noreply at bitbucket.org Tue Dec 20 13:05:16 2011 From: commits-noreply at bitbucket.org (Bitbucket) Date: Tue, 20 Dec 2011 12:05:16 -0000 Subject: [execnet-commit] commit/execnet: RonnyPfannschmidt: the popen gateway only needs a single sendexec Message-ID: <20111220120516.7497.89506@bitbucket02.managed.contegix.com> 1 new commit in execnet: https://bitbucket.org/hpk42/execnet/changeset/3030f8d2191d/ changeset: 3030f8d2191d user: RonnyPfannschmidt date: 2011-12-20 13:04:54 summary: the popen gateway only needs a single sendexec affected #: 1 file diff -r 76211fbbacf3284e9b98d5fb68d3b7603572b095 -r 3030f8d2191df3346aebf9ed130b964a6e4cacd9 execnet/gateway.py --- a/execnet/gateway.py +++ b/execnet/gateway.py @@ -250,13 +250,11 @@ def _remote_bootstrap_gateway(self, io): sendexec(io, - "import sys", - "sys.stdout.write('1')", - "sys.stdout.flush()", - popen_bootstrapline) - sendexec(io, - "import sys ; sys.path.insert(0, %r)" % importdir, + "import sys", + "sys.path.insert(0, %r)" % importdir, "from execnet.gateway_base import serve, init_popen_io", + "sys.stdout.write('1')", + "sys.stdout.flush()", "serve(init_popen_io(), id='%s-slave')" % self.id, ) s = io.read(1) Repository URL: https://bitbucket.org/hpk42/execnet/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email. From commits-noreply at bitbucket.org Tue Dec 20 13:07:14 2011 From: commits-noreply at bitbucket.org (Bitbucket) Date: Tue, 20 Dec 2011 12:07:14 -0000 Subject: [execnet-commit] commit/execnet: RonnyPfannschmidt: refactor _find_non_builtin_globals to work without the ast module Message-ID: <20111220120714.1806.93050@bitbucket13.managed.contegix.com> 1 new commit in execnet: https://bitbucket.org/hpk42/execnet/changeset/411f4e9fd277/ changeset: 411f4e9fd277 user: RonnyPfannschmidt date: 2011-12-20 13:06:45 summary: refactor _find_non_builtin_globals to work without the ast module affected #: 2 files diff -r 3030f8d2191df3346aebf9ed130b964a6e4cacd9 -r 411f4e9fd277d5723f94d474ffdf34695179d40c execnet/gateway.py --- a/execnet/gateway.py +++ b/execnet/gateway.py @@ -165,20 +165,15 @@ def _find_non_builtin_globals(source, codeobj): try: - import ast - except ImportError: - return None - try: import __builtin__ except ImportError: import builtins as __builtin__ - vars = dict.fromkeys(codeobj.co_varnames) all = [] - for node in ast.walk(ast.parse(source)): - if (isinstance(node, ast.Name) and node.id not in vars and - node.id not in __builtin__.__dict__): - all.append(node.id) + for name in codeobj.co_names: + if name not in __builtin__.__dict__ \ + and name not in codeobj.co_varnames: + all.append(name) return all diff -r 3030f8d2191df3346aebf9ed130b964a6e4cacd9 -r 411f4e9fd277d5723f94d474ffdf34695179d40c testing/test_basics.py --- a/testing/test_basics.py +++ b/testing/test_basics.py @@ -265,9 +265,6 @@ class TestGlobalFinder(object): - def setup_class(cls): - py.test.importorskip('ast') - def check(self, func): src = py.code.Source(func) code = py.code.Code(func) Repository URL: https://bitbucket.org/hpk42/execnet/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email. From commits-noreply at bitbucket.org Tue Dec 20 15:01:42 2011 From: commits-noreply at bitbucket.org (Bitbucket) Date: Tue, 20 Dec 2011 14:01:42 -0000 Subject: [execnet-commit] commit/execnet: RonnyPfannschmidt: bring back the ast based global finder, since the codeobject based heusteric was wrong, also add a test that triggers the issue Message-ID: <20111220140142.24375.47032@bitbucket03.managed.contegix.com> 1 new commit in execnet: https://bitbucket.org/hpk42/execnet/changeset/4def7f15ac3a/ changeset: 4def7f15ac3a user: RonnyPfannschmidt date: 2011-12-20 13:57:06 summary: bring back the ast based global finder, since the codeobject based heusteric was wrong, also add a test that triggers the issue affected #: 2 files diff -r 411f4e9fd277d5723f94d474ffdf34695179d40c -r 4def7f15ac3ad07ea5ca689d7ad60a5c72271262 execnet/gateway.py --- a/execnet/gateway.py +++ b/execnet/gateway.py @@ -165,15 +165,20 @@ def _find_non_builtin_globals(source, codeobj): try: + import ast + except ImportError: + return None + try: import __builtin__ except ImportError: import builtins as __builtin__ + vars = dict.fromkeys(codeobj.co_varnames) all = [] - for name in codeobj.co_names: - if name not in __builtin__.__dict__ \ - and name not in codeobj.co_varnames: - all.append(name) + for node in ast.walk(ast.parse(source)): + if (isinstance(node, ast.Name) and node.id not in vars and + node.id not in __builtin__.__dict__): + all.append(node.id) return all diff -r 411f4e9fd277d5723f94d474ffdf34695179d40c -r 4def7f15ac3ad07ea5ca689d7ad60a5c72271262 testing/test_basics.py --- a/testing/test_basics.py +++ b/testing/test_basics.py @@ -297,6 +297,12 @@ test py.test.raises(ValueError, gateway._source_of_function, func) + def test_method_call(self): + # method names are reason for the simple code object based heusteric failing + def f(channel): + channel.send(dict(testing=2)) + assert self.check(f) == [] + def test_remote_exec_function_with_kwargs(anypython): import sys Repository URL: https://bitbucket.org/hpk42/execnet/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email. From commits-noreply at bitbucket.org Wed Dec 28 15:04:26 2011 From: commits-noreply at bitbucket.org (Bitbucket) Date: Wed, 28 Dec 2011 14:04:26 -0000 Subject: [execnet-commit] commit/execnet: RonnyPfannschmidt: skip global finder tests on python < 2.6 again Message-ID: <20111228140426.23013.81318@bitbucket01.managed.contegix.com> 1 new commit in execnet: https://bitbucket.org/hpk42/execnet/changeset/da6994ca8402/ changeset: da6994ca8402 user: RonnyPfannschmidt date: 2011-12-28 15:04:11 summary: skip global finder tests on python < 2.6 again affected #: 1 file diff -r 4def7f15ac3ad07ea5ca689d7ad60a5c72271262 -r da6994ca8402d9d29a367ed312cce0717ea46dad testing/test_basics.py --- a/testing/test_basics.py +++ b/testing/test_basics.py @@ -264,6 +264,7 @@ class TestGlobalFinder(object): + pytestmark = py.test.mark.skipif('sys.version_info < (2, 6)') def check(self, func): src = py.code.Source(func) Repository URL: https://bitbucket.org/hpk42/execnet/ -- This is a commit notification from bitbucket.org. You are receiving this because you have the service enabled, addressing the recipient of this email.