From commits-noreply at bitbucket.org Mon Jul 5 18:05:43 2010 From: commits-noreply at bitbucket.org (commits-noreply at bitbucket.org) Date: Mon, 5 Jul 2010 16:05:43 +0000 (UTC) Subject: [execnet-commit] execnet commit cb800a0014f3: try to avoid a random KeyboardInterrupt in the exec-thread during joining Message-ID: <20100705160543.DB9A07EF6B@bitbucket.org> # HG changeset patch -- Bitbucket.org # Project execnet # URL http://bitbucket.org/hpk42/execnet/overview # User holger krekel # Date 1278345916 -7200 # Node ID cb800a0014f34d07dba411bd135e2f78083002fe # Parent 0c5fe1527b51b4772702a470cb7b003e9472b7df try to avoid a random KeyboardInterrupt in the exec-thread during joining the receiver thread which itself may trigger a KeyboardInterrupt in the exec thread. --- a/execnet/gateway_base.py +++ b/execnet/gateway_base.py @@ -708,11 +708,11 @@ class SlaveGateway(BaseGateway): self._trace("io.close_write()") self._io.close_write() self._trace("slavegateway.serve finished") + if joining: + self.join() except KeyboardInterrupt: # in the slave we can't really do anything sensible self._trace("swallowing keyboardinterrupt in main-thread") - if joining: - self.join() def executetask(self, item): channel, source = item --- a/execnet/__init__.py +++ b/execnet/__init__.py @@ -3,7 +3,7 @@ execnet: pure python lib for connecting (c) 2010, Holger Krekel and others """ -__version__ = "1.0.6" +__version__ = "1.0.7dev" import execnet.apipkg --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,16 @@ +1.0.7dev +-------------------------------- + +- try to avoid a random KeyboardInterrupt Error when threads + are ending. + +1.0.6 +-------------------------------- + +- fix jython/windows interactions +- fix waitclose/callback-with-endmarker race condition +- fix race condition where multiple threads sending data over channels + would crash the serializer and process 1.0.6 -------------------------------- From commits-noreply at bitbucket.org Mon Jul 5 18:05:43 2010 From: commits-noreply at bitbucket.org (commits-noreply at bitbucket.org) Date: Mon, 5 Jul 2010 16:05:43 +0000 (UTC) Subject: [execnet-commit] execnet commit 0c5fe1527b51: add tox configuration Message-ID: <20100705160543.C6C017EF48@bitbucket.org> # HG changeset patch -- Bitbucket.org # Project execnet # URL http://bitbucket.org/hpk42/execnet/overview # User holger krekel # Date 1278284824 -7200 # Node ID 0c5fe1527b51b4772702a470cb7b003e9472b7df # Parent aaff8a632c3713c19b060aa5c8c81f22351dfde9 add tox configuration --- /dev/null +++ b/doc/check_sphinx.py @@ -0,0 +1,16 @@ +import py +import subprocess +def test_linkcheck(tmpdir): + doctrees = tmpdir.join("doctrees") + htmldir = tmpdir.join("html") + subprocess.check_call( + ["sphinx-build", "-W", "-blinkcheck", + "-d", str(doctrees), ".", str(htmldir)]) + +def test_build_docs(tmpdir): + doctrees = tmpdir.join("doctrees") + htmldir = tmpdir.join("html") + subprocess.check_call([ + "sphinx-build", "-W", "-bhtml", + "-d", str(doctrees), ".", str(htmldir)]) + --- a/.hgignore +++ b/.hgignore @@ -8,3 +8,4 @@ syntax:glob *$py.class *.orig *~ +.tox --- a/doc/install.txt +++ b/doc/install.txt @@ -48,7 +48,8 @@ Next checkout the basic api and examples .. _`bitbucket repository`: http://bitbucket.org/hpk42/execnet/ .. _`execnet mercurial repository`: http://bitbucket.org/hpk42/execnet/ .. _`pypi release`: http://pypi.python.org/pypi/execnet +.. _setuptools: http://pypi.python.org/pypi/setuptools +.. _distribute: http://pypi.python.org/pypi/distribute - --- a/doc/example/svn-sync-repo.py +++ b/doc/example/svn-sync-repo.py @@ -7,12 +7,12 @@ uses execnet. """ -import execnet +import py, execnet import sys, os def usage(): arg0 = sys.argv[0] - print """%s [user@]remote-host:/repo/location localrepo [identity keyfile]""" % (arg0,) + print "%s [user@]remote-host:/repo/location localrepo [ssh-config-file]" % (arg0,) def main(args): @@ -20,13 +20,13 @@ def main(args): localrepo = py.path.local(args[1]) if not localrepo.check(dir=1): raise SystemExit("localrepo %s does not exist" %(localrepo,)) - if len(args) == 3: - keyfile = py.path.local(args[2]) + if len(args) ==3: + configfile = args[2] else: - keyfile = None + configfile = None remote_host, path = remote.split(':', 1) print "ssh-connecting to", remote_host - gw = getgateway(remote_host, keyfile) + gw = getgateway(remote_host, configfile) local_rev = get_svn_youngest(localrepo) @@ -104,8 +104,11 @@ def get_svn_youngest(repo): rev = py.process.cmdexec('svnlook youngest "%s"' % repo) return int(rev) -def getgateway(host, keyfile=None): - return execnet.SshGateway(host, identity=keyfile) +def getgateway(host, configfile=None): + xspec = "ssh=%s" % host + if configfile is not None: + xspec += "//ssh_config=%s" % configfile + return execnet.makegateway(xspec) if __name__ == '__main__': if len(sys.argv) < 3: From commits-noreply at bitbucket.org Tue Jul 6 12:33:19 2010 From: commits-noreply at bitbucket.org (commits-noreply at bitbucket.org) Date: Tue, 6 Jul 2010 10:33:19 +0000 (UTC) Subject: [execnet-commit] execnet commit 90b323b5c194: adding a toxini, correcting the version Message-ID: <20100706103319.272047EF69@bitbucket.org> # HG changeset patch -- Bitbucket.org # Project execnet # URL http://bitbucket.org/hpk42/execnet/overview # User holger krekel # Date 1278412349 -7200 # Node ID 90b323b5c1948d526a5f8e1543b1a1488cbe9052 # Parent cb800a0014f34d07dba411bd135e2f78083002fe adding a toxini, correcting the version --- /dev/null +++ b/tox.ini @@ -0,0 +1,34 @@ +[tox] +distshare={homedir}/.tox/distshare +[tox:hudson] +sdistsrc={distshare}/execnet-**LATEST** + +[testenv] +changedir=testing +deps= {distshare}/py-**LATEST** +commands=py.test -rsfxX --junitxml={envlogdir}/junit-{envname}.xml [] +[testenv:py27] +basepython=python2.7 +[testenv:py26] +basepython=python2.6 +[testenv:py26-py131] +basepython=python2.6 +deps= py==1.3.1 +[testenv:py25] +basepython=python2.5 +[testenv:py24] +basepython=python2.4 +[testenv:py31] +basepython=python3.1 +#[testenv:pypy] +#basepython=pypy-c +[testenv:jython] +basepython=jython + +[testenv:docs] +basepython=python +changedir=doc +deps=sphinx + py +commands= + py.test -rsfxX -v --junitxml={envlogdir}/junit-{envname}.xml check_sphinx.py --- a/execnet/__init__.py +++ b/execnet/__init__.py @@ -3,7 +3,7 @@ execnet: pure python lib for connecting (c) 2010, Holger Krekel and others """ -__version__ = "1.0.7dev" +__version__ = "1.0.7a1" import execnet.apipkg --- a/CHANGELOG +++ b/CHANGELOG @@ -1,4 +1,4 @@ -1.0.7dev +1.0.7a1 -------------------------------- - try to avoid a random KeyboardInterrupt Error when threads @@ -11,13 +11,6 @@ 1.0.6 - fix waitclose/callback-with-endmarker race condition - fix race condition where multiple threads sending data over channels would crash the serializer and process -1.0.6 --------------------------------- - -- fix jython/windows interactions -- fix waitclose/callback-with-endmarker race condition -- fix race condition where multiple threads sending data over channels - would crash the serializer and process 1.0.5 -------------------------------- From commits-noreply at bitbucket.org Tue Jul 6 13:06:03 2010 From: commits-noreply at bitbucket.org (commits-noreply at bitbucket.org) Date: Tue, 6 Jul 2010 11:06:03 +0000 (UTC) Subject: [execnet-commit] execnet commit 66bce8eb6d09: defining distshare for tox:hudson Message-ID: <20100706110603.2F6EB7EF69@bitbucket.org> # HG changeset patch -- Bitbucket.org # Project execnet # URL http://bitbucket.org/hpk42/execnet/overview # User holger krekel # Date 1278414318 -7200 # Node ID 66bce8eb6d0958cd4dfb7ebe338bd14ab9250754 # Parent 90b323b5c1948d526a5f8e1543b1a1488cbe9052 defining distshare for tox:hudson --- a/tox.ini +++ b/tox.ini @@ -1,6 +1,7 @@ [tox] distshare={homedir}/.tox/distshare [tox:hudson] +distshare={toxworkdir}/distshare sdistsrc={distshare}/execnet-**LATEST** [testenv] From commits-noreply at bitbucket.org Tue Jul 6 14:49:58 2010 From: commits-noreply at bitbucket.org (commits-noreply at bitbucket.org) Date: Tue, 6 Jul 2010 12:49:58 +0000 (UTC) Subject: [execnet-commit] execnet commit 319e77ba00e2: correction Message-ID: <20100706124958.D6AC37EF77@bitbucket.org> # HG changeset patch -- Bitbucket.org # Project execnet # URL http://bitbucket.org/hpk42/execnet/overview # User holger krekel # Date 1278420565 -7200 # Node ID 319e77ba00e2bf26f7ad265a5fed6a13091f735d # Parent 66bce8eb6d0958cd4dfb7ebe338bd14ab9250754 correction --- a/tox.ini +++ b/tox.ini @@ -7,7 +7,8 @@ sdistsrc={distshare}/execnet-**LATEST** [testenv] changedir=testing deps= {distshare}/py-**LATEST** -commands=py.test -rsfxX --junitxml={envlogdir}/junit-{envname}.xml [] +commands=py.test --confcutdir=.. \ + -rsfxX --junitxml={envlogdir}/junit-{envname}.xml [] [testenv:py27] basepython=python2.7 [testenv:py26] @@ -32,4 +33,5 @@ changedir=doc deps=sphinx py commands= - py.test -rsfxX -v --junitxml={envlogdir}/junit-{envname}.xml check_sphinx.py + py.test --confcutdir=.. + -rsfxX -v --junitxml={envlogdir}/junit-{envname}.xml check_sphinx.py From commits-noreply at bitbucket.org Tue Jul 6 15:03:31 2010 From: commits-noreply at bitbucket.org (commits-noreply at bitbucket.org) Date: Tue, 6 Jul 2010 13:03:31 +0000 (UTC) Subject: [execnet-commit] execnet commit 6dc0daade930: higher timeout for slow windows machines Message-ID: <20100706130331.54D287EF7C@bitbucket.org> # HG changeset patch -- Bitbucket.org # Project execnet # URL http://bitbucket.org/hpk42/execnet/overview # User holger krekel # Date 1278421380 -7200 # Node ID 6dc0daade930f142b4fcca5705d0ba98f38a8f18 # Parent 319e77ba00e2bf26f7ad265a5fed6a13091f735d higher timeout for slow windows machines --- a/testing/test_termination.py +++ b/testing/test_termination.py @@ -96,6 +96,6 @@ def test_terminate_implicit_does_trykill # sync with start-up line = popen.stdout.readline() reply = WorkerPool(1).dispatch(popen.communicate) - reply.get(timeout=10) + reply.get(timeout=50) out, err = capfd.readouterr() assert not err or "Killed" in err From commits-noreply at bitbucket.org Wed Jul 7 12:50:30 2010 From: commits-noreply at bitbucket.org (commits-noreply at bitbucket.org) Date: Wed, 7 Jul 2010 10:50:30 +0000 (UTC) Subject: [execnet-commit] execnet commit bf0cd0d5e506: search more explicitely for a Python2 version Message-ID: <20100707105030.3E6917EF70@bitbucket.org> # HG changeset patch -- Bitbucket.org # Project execnet # URL http://bitbucket.org/hpk42/execnet/overview # User holger krekel # Date 1278499776 -7200 # Node ID bf0cd0d5e506f9ddcbaf86a16b99395f6fe697ed # Parent 6dc0daade930f142b4fcca5705d0ba98f38a8f18 search more explicitely for a Python2 version (in a virtualenv3 'python' will be a python3.x) --- a/testing/test_serializer.py +++ b/testing/test_serializer.py @@ -12,6 +12,11 @@ def _find_version(suffix=""): name = "python" + suffix executable = py.path.local.sysfind(name) if executable is None: + if suffix == "2": + for name in ('python2.6', 'python2.7'): + executable = py.path.local.sysfind(name) + if executable: + return executable if sys.platform == "win32" and suffix == "3": for name in ('python31', 'python30'): executable = py.path.local(r"c:\\%s\python.exe" % (name,)) @@ -24,7 +29,7 @@ def setup_module(mod): mod.TEMPDIR = py.path.local(tempfile.mkdtemp()) if sys.version_info > (3, 0): mod._py3_wrapper = PythonWrapper(py.path.local(sys.executable)) - mod._py2_wrapper = PythonWrapper(_find_version()) + mod._py2_wrapper = PythonWrapper(_find_version("2")) else: mod._py3_wrapper = PythonWrapper(_find_version("3")) mod._py2_wrapper = PythonWrapper(py.path.local(sys.executable)) From commits-noreply at bitbucket.org Wed Jul 7 14:29:55 2010 From: commits-noreply at bitbucket.org (commits-noreply at bitbucket.org) Date: Wed, 7 Jul 2010 12:29:55 +0000 (UTC) Subject: [execnet-commit] execnet commit 180c18c4bac2: add a new env:NAME=value setting to xspecs Message-ID: <20100707122955.47D7F7EF83@bitbucket.org> # HG changeset patch -- Bitbucket.org # Project execnet # URL http://bitbucket.org/hpk42/execnet/overview # User holger krekel # Date 1278505756 -7200 # Node ID 180c18c4bac279e63ccb80b2f7ef6b3aaf7fc3e4 # Parent bf0cd0d5e506f9ddcbaf86a16b99395f6fe697ed add a new env:NAME=value setting to xspecs bump version to 1.0.7 --- a/tox.ini +++ b/tox.ini @@ -33,5 +33,5 @@ changedir=doc deps=sphinx py commands= - py.test --confcutdir=.. + py.test --confcutdir=.. \ -rsfxX -v --junitxml={envlogdir}/junit-{envname}.xml check_sphinx.py --- a/CHANGELOG +++ b/CHANGELOG @@ -1,9 +1,13 @@ -1.0.7a1 +1.0.7 -------------------------------- - try to avoid a random KeyboardInterrupt Error when threads are ending. +- extend xspec syntax to allow for one or multiple "env:NAME=value" + environment variable settings which will be set on the remote side. + (thanks Jakub Gustak) + 1.0.6 -------------------------------- --- a/doc/conf.py +++ b/doc/conf.py @@ -46,7 +46,7 @@ copyright = '2009, holger krekel and oth # # The short X.Y version. import execnet -version = "1.0.6" +version = "1.0.7" # The full version, including alpha/beta/rc tags. release = version --- a/execnet/__init__.py +++ b/execnet/__init__.py @@ -3,7 +3,7 @@ execnet: pure python lib for connecting (c) 2010, Holger Krekel and others """ -__version__ = "1.0.7a1" +__version__ = "1.0.7" import execnet.apipkg --- a/execnet/multi.py +++ b/execnet/multi.py @@ -65,6 +65,7 @@ class Group: python= specifies which python interpreter to execute chdir= specifies to which directory to change nice= specifies process priority of new process + env:NAME=value specifies a remote environment variable setting. If no spec is given, self.defaultspec is used. """ @@ -93,19 +94,22 @@ class Group: raise ValueError("no gateway type found for %r" % (spec._spec,)) gw.spec = spec self._register(gw) - if spec.chdir or spec.nice: + if spec.chdir or spec.nice or spec.env: channel = gw.remote_exec(""" import os - path, nice = channel.receive() + path, nice, env = channel.receive() if path: if not os.path.exists(path): os.mkdir(path) os.chdir(path) if nice and hasattr(os, 'nice'): os.nice(nice) + if env: + for name, value in env.items(): + os.environ[name] = value """) nice = spec.nice and int(spec.nice) or 0 - channel.send((spec.chdir, nice)) + channel.send((spec.chdir, nice, spec.env)) channel.waitclose() return gw --- a/testing/test_xspec.py +++ b/testing/test_xspec.py @@ -28,6 +28,10 @@ class TestXSpec: spec = XSpec("popen") assert spec.popen == True + def test_env(self): + xspec = XSpec("popen//env:NAME=value1") + assert xspec.env['NAME'] == "value1" + def test__samefilesystem(self): assert XSpec("popen")._samefilesystem() assert XSpec("popen//python=123")._samefilesystem() @@ -81,6 +85,15 @@ class TestMakegateway: if remotenice is not None: assert remotenice == 5 + def test_popen_env(self): + gw = execnet.makegateway("popen//env:NAME123=123") + ch = gw.remote_exec(""" + import os + channel.send(os.environ['NAME123']) + """) + value = ch.receive() + assert value == "123" + def test_popen_explicit(self): gw = execnet.makegateway("popen//python=%s" % py.std.sys.executable) assert gw.spec.python == py.std.sys.executable --- a/execnet/xspec.py +++ b/execnet/xspec.py @@ -16,6 +16,7 @@ class XSpec: def __init__(self, string): self._spec = string + self.env = {} for keyvalue in string.split("//"): i = keyvalue.find("=") if i == -1: @@ -26,7 +27,10 @@ class XSpec: raise AttributeError("%r not a valid XSpec key" % key) if key in self.__dict__: raise ValueError("duplicate key: %r in %r" %(key, string)) - setattr(self, key, value) + if key.startswith("env:"): + self.env[key[4:]] = value + else: + setattr(self, key, value) def __getattr__(self, name): if name[0] == "_": --- a/testing/test_serializer.py +++ b/testing/test_serializer.py @@ -17,6 +17,11 @@ def _find_version(suffix=""): executable = py.path.local.sysfind(name) if executable: return executable + elif suffix == "3": + for name in ('python3.1', 'python3.2'): + executable = py.path.local.sysfind(name) + if executable: + return executable if sys.platform == "win32" and suffix == "3": for name in ('python31', 'python30'): executable = py.path.local(r"c:\\%s\python.exe" % (name,)) --- a/doc/basics.txt +++ b/doc/basics.txt @@ -40,6 +40,10 @@ examples for valid gateway specification subprocess; running with the lowest CPU priority ("nice" level). By default current dir will be the current dir of the instantiator. +* ``popen//env:NAME=value`` specifies a subprocess that uses the + same interpreter as the one it is initiated from and additionally + remotely sets an environment variable ``NAME`` to ``value``. + * ``socket=192.168.1.4:8888`` specifies a Python Socket server process that listens on 192.168.1.4:8888`` From commits-noreply at bitbucket.org Wed Jul 7 15:33:28 2010 From: commits-noreply at bitbucket.org (commits-noreply at bitbucket.org) Date: Wed, 7 Jul 2010 13:33:28 +0000 (UTC) Subject: [execnet-commit] execnet commit e25a7d3c706f: catch all Message-ID: <20100707133328.54E197EF83@bitbucket.org> # HG changeset patch -- Bitbucket.org # Project execnet # URL http://bitbucket.org/hpk42/execnet/overview # User holger krekel # Date 1278509569 -7200 # Node ID e25a7d3c706f61911ae34d7828e07ad8ff316365 # Parent 180c18c4bac279e63ccb80b2f7ef6b3aaf7fc3e4 catch all --- a/execnet/gateway_base.py +++ b/execnet/gateway_base.py @@ -640,7 +640,7 @@ class BaseGateway(object): self._terminate_execution() self._channelfactory._finished_receiving() self._trace('RECEIVERTHREAD', 'leaving finalization') - except Exception: + except: pass # XXX be silent at interp-shutdown def _terminate_execution(self): From commits-noreply at bitbucket.org Wed Jul 7 15:50:23 2010 From: commits-noreply at bitbucket.org (commits-noreply at bitbucket.org) Date: Wed, 7 Jul 2010 13:50:23 +0000 (UTC) Subject: [execnet-commit] execnet commit eb2dd153170b: Added tag 1.0.7 for changeset e25a7d3c706f Message-ID: <20100707135023.125F77EF81@bitbucket.org> # HG changeset patch -- Bitbucket.org # Project execnet # URL http://bitbucket.org/hpk42/execnet/overview # User holger krekel # Date 1278510571 -7200 # Node ID eb2dd153170b2695f8849ff61862cce731bd81e9 # Parent e25a7d3c706f61911ae34d7828e07ad8ff316365 Added tag 1.0.7 for changeset e25a7d3c706f --- a/.hgtags +++ b/.hgtags @@ -10,3 +10,4 @@ 9e7b55db59f94009e739273916efbb356d385b96 61ae5d8a32582d48871672ba894b2761fcaeec43 1.0.6 61ae5d8a32582d48871672ba894b2761fcaeec43 1.0.6 f4f785cd543ec95455907841840ba2b965f1400b 1.0.6 +e25a7d3c706f61911ae34d7828e07ad8ff316365 1.0.7 From commits-noreply at bitbucket.org Wed Jul 7 18:13:50 2010 From: commits-noreply at bitbucket.org (commits-noreply at bitbucket.org) Date: Wed, 7 Jul 2010 16:13:50 +0000 (UTC) Subject: [execnet-commit] execnet commit fbb2bc1316d2: improved versioning Message-ID: <20100707161350.35E387EF81@bitbucket.org> # HG changeset patch -- Bitbucket.org # Project execnet # URL http://bitbucket.org/hpk42/execnet/overview # User holger krekel # Date 1278519194 -7200 # Node ID fbb2bc1316d211b2c547cb7d6b0521a7377191f7 # Parent eb2dd153170b2695f8849ff61862cce731bd81e9 improved versioning --- a/tox.ini +++ b/tox.ini @@ -2,11 +2,11 @@ distshare={homedir}/.tox/distshare [tox:hudson] distshare={toxworkdir}/distshare -sdistsrc={distshare}/execnet-**LATEST** +sdistsrc={distshare}/execnet-* [testenv] changedir=testing -deps= {distshare}/py-**LATEST** +deps= {distshare}/py-* commands=py.test --confcutdir=.. \ -rsfxX --junitxml={envlogdir}/junit-{envname}.xml [] [testenv:py27] From commits-noreply at bitbucket.org Sun Jul 11 12:29:01 2010 From: commits-noreply at bitbucket.org (commits-noreply at bitbucket.org) Date: Sun, 11 Jul 2010 10:29:01 +0000 (UTC) Subject: [execnet-commit] execnet commit 8414d8f19e46: small updates Message-ID: <20100711102901.84A8E7EF63@bitbucket.org> # HG changeset patch -- Bitbucket.org # Project execnet # URL http://bitbucket.org/hpk42/execnet/overview # User holger krekel # Date 1278844124 -7200 # Node ID 8414d8f19e464a7cb029b2bea587e59b88da78d5 # Parent fbb2bc1316d211b2c547cb7d6b0521a7377191f7 small updates --- a/doc/example/sysinfo.py +++ b/doc/example/sysinfo.py @@ -2,6 +2,8 @@ sysinfo.py [host1] [host2] [options] obtain system info from remote machine. + +(c) Holger Krekel, GPLv2 or 3 """ import py @@ -93,11 +95,15 @@ def error(*args): debug("ERROR", args[0] + ":", *args[1:]) def getinfo(sshname, ssh_config=None, loginfo=sys.stdout): - debug("connecting to", sshname) + import execnet + spec = "ssh=%s" % sshname + if ssh_config: + spec += "ssh_config=%s" % ssh_config + debug("connecting to", repr(spec)) try: - gw = execnet.SshGateway(sshname, ssh_config=ssh_config) + gw = execnet.makegateway(spec) except IOError: - error("could not get sshagteway", sshname) + error("could not get sshgatway", sshname) else: ri = RemoteInfo(gw) #print "%s info:" % sshname From charlessolar at gmail.com Tue Jul 13 16:40:56 2010 From: charlessolar at gmail.com (Charles Solar) Date: Tue, 13 Jul 2010 09:40:56 -0500 Subject: [execnet-commit] RSync Fixes Message-ID: Dunno if this is the best way to submit patches or if bitbucket has some sort of system, but here is a patch for the symlink issue I reported and to sync file permissions as reported in http://bitbucket.org/hpk42/py-trunk/issue/68/distributed-testing-rsync-does-not-maintain-file Charles -------------- next part -------------- A non-text attachment was scrubbed... Name: rsyncfixes.patch Type: application/octet-stream Size: 5422 bytes Desc: not available Url : http://codespeak.net/pipermail/execnet-commit/attachments/20100713/749d305e/attachment.obj From holger at merlinux.eu Tue Jul 13 16:50:54 2010 From: holger at merlinux.eu (holger krekel) Date: Tue, 13 Jul 2010 16:50:54 +0200 Subject: [execnet-commit] RSync Fixes In-Reply-To: References: Message-ID: <20100713145054.GM14601@trillke.net> Hi Charles, On Tue, Jul 13, 2010 at 09:40 -0500, Charles Solar wrote: > Dunno if this is the best way to submit patches or if bitbucket has > some sort of system, but here is a patch for the symlink issue I > reported and to sync file permissions as reported in > http://bitbucket.org/hpk42/py-trunk/issue/68/distributed-testing-rsync-does-not-maintain-file the attached patch looks like a binary file. as to Bitbucket: you can register and "fork" execnet and then notify me (through bitbucket or by private email) that you want me to "pull". best, holger From commits-noreply at bitbucket.org Mon Jul 26 21:19:16 2010 From: commits-noreply at bitbucket.org (commits-noreply at bitbucket.org) Date: Mon, 26 Jul 2010 19:19:16 +0000 (UTC) Subject: [execnet-commit] execnet commit 19148dc174d2: some fixes to examples and issues Message-ID: <20100726191916.C2FD247857@bitbucket.org> # HG changeset patch -- Bitbucket.org # Project execnet # URL http://bitbucket.org/hpk42/execnet/overview # User holger krekel # Date 1280171862 -3600 # Node ID 19148dc174d28e4086fe81f0549a27333673065a # Parent 487e31b019479cfa5807834c70bf219698247080 some fixes to examples and issues --- a/ISSUES.txt +++ b/ISSUES.txt @@ -83,9 +83,9 @@ have callback accept errors / errback tags: 1.1 wish If using channel.setcallback() it currently is not -impossible notice the exact errors that might happen -on the other side. Either introduce an "errback" -or (probably better) optionally allow an extra 'error' +possible to notice the exact errors that might happen +on the other side. Either introduce an "errback" +or (probably better) optionally allow an extra 'error' parameter to execnet callbacks. fix rsync between python/jython --- a/doc/example/sysinfo.py +++ b/doc/example/sysinfo.py @@ -8,6 +8,7 @@ obtain system info from remote machine. import py import sys +import execnet parser = py.std.optparse.OptionParser(usage=__doc__) --- a/doc/example/remotecmd.py +++ b/doc/example/remotecmd.py @@ -1,8 +1,12 @@ +import os # contents of: remotecmd.py def simple(arg): return arg + 1 +def listdir(path): + return os.listdir(path) + if __name__ == '__channelexec__': for item in channel: channel.send(eval(item)) From commits-noreply at bitbucket.org Mon Jul 26 21:17:08 2010 From: commits-noreply at bitbucket.org (commits-noreply at bitbucket.org) Date: Mon, 26 Jul 2010 19:17:08 +0000 (UTC) Subject: [execnet-commit] execnet commit 487e31b01947: remove trailing whitespace everywhere Message-ID: <20100726191708.BF76947831@bitbucket.org> # HG changeset patch -- Bitbucket.org # Project execnet # URL http://bitbucket.org/hpk42/execnet/overview # User holger krekel # Date 1280170721 -7200 # Node ID 487e31b019479cfa5807834c70bf219698247080 # Parent 8414d8f19e464a7cb029b2bea587e59b88da78d5 remove trailing whitespace everywhere --- a/ISSUES.txt +++ b/ISSUES.txt @@ -1,45 +1,45 @@ -rsync links from posix to windows +rsync links from posix to windows ----------------------------------------- tags: 1.1 feature bb: http://bitbucket.org/hpk42/py-trunk/issue/35 -path: execnet/rsync.py +path: execnet/rsync.py currently rsyncing bails out with a strange message when trying to rsync links. should provide better message or even some support for doing it. -socketservice process for keeping cross-invocation state +socketservice process for keeping cross-invocation state --------------------------------------------------------- -tags: 1.1 feature newdist +tags: 1.1 feature newdist -A building block for keeping permanent network -connections would be a started-on-first-access process +A building block for keeping permanent network +connections would be a started-on-first-access process that keeps state. Considered: - a rendevouz process that starts threads for each incoming connection. On each such connection - a SocketGateway can be instantiated but it + a SocketGateway can be instantiated but it will re-use the already-imported gateway base code so again avoid parse/compile on arbitrary strings. Possible code example: # connect to on-demand started port-8888 socket server - # similar to popen+socket//installvia=popen + # similar to popen+socket//installvia=popen gw = group.makegateway("socketservice//port=8888") gw.remote_exec("...") # execute code in a thread in the socket server - ... + ... # exit the gateway-connection, but leave socket server running - gw.exit() + gw.exit() a channel-IO based gateway -------------------------------------------------- -tags: 1.1 feature newdist +tags: 1.1 feature newdist A building block for establishing a permanent hierarchy of processes is a Gateway that operates on a Channel object. This way a gateway can be instantiated through another -intermediating gateway which bi-directionallry forwads +intermediating gateway which bi-directionallry forwads Messages through a existing channel. A code example could look like this: @@ -47,9 +47,9 @@ A code example could look like this: group.setlocalfactory(port=8888) gw = group.makegateway("ssh=codespeak.net") # local ssh-process would be a child of the factory service - # which could have timeouts for killing, or allow resuming. - # for instantiating remote it could also install a multiplexer - # on the remote place such that the same ssh connection is re-used + # which could have timeouts for killing, or allow resuming. + # for instantiating remote it could also install a multiplexer + # on the remote place such that the same ssh connection is re-used # for multiple ssh=codespeak.net connections, makes for small latency. some first simple implemenation might look like this:: @@ -64,32 +64,32 @@ some first simple implemenation might lo subchannel = ch.receive() return ChannelGateway(subchannel) -creating virtualenv environments on the fly +creating virtualenv environments on the fly ----------------------------------------------- tags: 1.1 wish there should be a (plugin-provided?) way to create -a virtual environment: +a virtual environment: gw = makegateway("ssh=codespeak.net") newgw = xdeploy.create_venv_gateway(gw, "venvname", depstring) - newgw # lives in home/ dir of remote virtualenv - # maybe have get_temproot default to tmp/ there? + newgw # lives in home/ dir of remote virtualenv + # maybe have get_temproot default to tmp/ there? sf = xdeploy.SetupFile("..setup.py") sf.sdist_remote_install(newgw) - - -have callback accept errors / errback + + +have callback accept errors / errback ----------------------------------------------- tags: 1.1 wish If using channel.setcallback() it currently is not impossible notice the exact errors that might happen -on the other side. Either introduce an "errback" -or (probably better) optionally allow an extra 'error' +on the other side. Either introduce an "errback" +or (probably better) optionally allow an extra 'error' parameter to execnet callbacks. -fix rsync between python/jython +fix rsync between python/jython ----------------------------------------------- tags: 1.0 bug -rsync does not complete with jython. +rsync does not complete with jython. --- a/testing/test_gateway.py +++ b/testing/test_gateway.py @@ -22,9 +22,9 @@ def test_deprecation(recwarn, monkeypatc class TestBasicGateway: def test_correct_setup(self, gw): assert gw.hasreceiver() - assert gw in gw._group - assert gw.id in gw._group - assert gw.spec + assert gw in gw._group + assert gw.id in gw._group + assert gw.spec def test_repr_doesnt_crash(self, gw): assert isinstance(repr(gw), str) @@ -64,7 +64,7 @@ class TestBasicGateway: status = gw.remote_status() assert status.execqsize == 0 assert status.numexecuting == 0 - # race condition + # race condition assert status.numchannels <= numchannels + 1 def test_remote_exec_module(self, tmpdir, gw): @@ -104,7 +104,7 @@ class TestBasicGateway: def test_remote_exec_no_explicit_close(self, gw): channel = gw.remote_exec('channel.close()') - excinfo = py.test.raises(channel.RemoteError, + excinfo = py.test.raises(channel.RemoteError, "channel.waitclose(TESTTIMEOUT)") assert "explicit" in excinfo.value.formatted @@ -182,9 +182,9 @@ class TestPopenGateway: assert x.lower() == str(waschangedir).lower() def test_remoteerror_readable_traceback(self, gw): - e = py.test.raises(gateway_base.RemoteError, + e = py.test.raises(gateway_base.RemoteError, 'gw.remote_exec("x y").waitclose()') - assert "gateway_base" in e.value.formatted + assert "gateway_base" in e.value.formatted def test_many_popen(self): num = 4 @@ -278,7 +278,7 @@ class TestThreads: assert res == 42 def test_threads_race_sending(self): - # multiple threads sending data in parallel + # multiple threads sending data in parallel gw = execnet.makegateway("popen") num = 5 gw.remote_init_threads(num) @@ -287,7 +287,7 @@ class TestThreads: for x in range(num): ch = gw.remote_exec(""" for x in range(10): - channel.send(''*1000) + channel.send(''*1000) channel.receive() """) channels.append(ch) @@ -322,7 +322,7 @@ class TestThreads: py.test.raises(IOError, gw.remote_init_threads, 3) -class TestTracing: +class TestTracing: def test_popen_filetracing(self, testdir, monkeypatch): tmpdir = testdir.tmpdir monkeypatch.setenv("TMP", tmpdir) --- a/execnet/gateway_base.py +++ b/execnet/gateway_base.py @@ -169,7 +169,7 @@ def _setupmessages(): raise SystemExit(0) classes = [ - STATUS, CHANNEL_EXEC, CHANNEL_DATA, CHANNEL_CLOSE, + STATUS, CHANNEL_EXEC, CHANNEL_DATA, CHANNEL_CLOSE, CHANNEL_CLOSE_ERROR, CHANNEL_LAST_MESSAGE, GATEWAY_TERMINATE ] for i, cls in enumerate(classes): @@ -179,7 +179,7 @@ def _setupmessages(): _setupmessages() -def geterrortext(excinfo, +def geterrortext(excinfo, format_exception=traceback.format_exception, sysex=sysex): try: l = format_exception(*excinfo) @@ -328,9 +328,9 @@ class Channel(object): raise ValueError("mode %r not availabe" %(mode,)) def close(self, error=None): - """ close down this channel with an optional error message. - Note that closing of a channel tied to remote_exec happens - automatically at the end of execution and cannot be done explicitely. + """ close down this channel with an optional error message. + Note that closing of a channel tied to remote_exec happens + automatically at the end of execution and cannot be done explicitely. """ if self._executing: raise IOError("cannot explicitly close channel within remote_exec") @@ -362,10 +362,10 @@ class Channel(object): """ wait until this channel is closed (or the remote side otherwise signalled that no more data was being sent). The channel may still hold receiveable items, but not receive - any more after waitclose() has returned. Exceptions from executing + any more after waitclose() has returned. Exceptions from executing code on the other side are reraised as local channel.RemoteErrors. - EOFError is raised if the reading-connection was prematurely closed, - which often indicates a dying process. + EOFError is raised if the reading-connection was prematurely closed, + which often indicates a dying process. self.TimeoutError is raised after the specified number of seconds (default is None, i.e. wait indefinitely). """ @@ -379,9 +379,9 @@ class Channel(object): def send(self, item): """sends the given item to the other side of the channel, possibly blocking if the sender queue is full. - The item must be a simple python type and will be - copied to the other side by value. IOError is - raised if the write pipe was prematurely closed. + The item must be a simple python type and will be + copied to the other side by value. IOError is + raised if the write pipe was prematurely closed. """ if self.isclosed(): raise IOError("cannot send to %r" %(self,)) @@ -390,10 +390,10 @@ class Channel(object): def receive(self, timeout=-1): """receive a data item that was sent from the other side. - timeout: -1 [default] blocked waiting, but wake up periodically + timeout: -1 [default] blocked waiting, but wake up periodically to let CTRL-C through. A positive number indicates the number of seconds after which a channel.TimeoutError exception - will be raised if no item was received. + will be raised if no item was received. Note that exceptions from the remotely executing code will be reraised as channel.RemoteError exceptions containing a textual representation of the remote traceback. @@ -405,8 +405,8 @@ class Channel(object): internal_timeout = self._INTERNALWAKEUP else: internal_timeout = timeout - - while 1: + + while 1: try: x = itemqueue.get(timeout=internal_timeout) break @@ -453,7 +453,7 @@ class ChannelFactory(object): id = self.count self.count += 2 try: - channel = self._channels[id] + channel = self._channels[id] except KeyError: channel = self._channels[id] = Channel(self.gateway, id) return channel @@ -594,7 +594,7 @@ class BaseGateway(object): self._channelfactory = ChannelFactory(self, _startcount) self._receivelock = threading.RLock() # globals may be NONE at process-termination - self._trace = trace + self._trace = trace self._geterrortext = geterrortext def _trace(self, *msg): @@ -627,7 +627,7 @@ class BaseGateway(object): self._io.close_read() except EOFError: self._trace("RECEIVERTHREAD: got EOFError") - self._trace("RECEIVERTHREAD: traceback was: ", + self._trace("RECEIVERTHREAD: traceback was: ", self._geterrortext(self.exc_info())) self._error = self.exc_info()[1] eof = True @@ -900,10 +900,10 @@ class Unserializer(object): newchannel = self.channelfactory.new(id) self.stack.append(newchannel) -# automatically build opcodes and byte-encoding +# automatically build opcodes and byte-encoding class opcode: - """ container for name -> num mappings. """ + """ container for name -> num mappings. """ def _buildopcodes(): l = [] @@ -915,7 +915,7 @@ def _buildopcodes(): for i,(opname, func) in enumerate(l): assert i < 26, "xxx" i = b(chr(64+i)) - Unserializer.num2func[i] = func + Unserializer.num2func[i] = func setattr(opcode, opname, i) _buildopcodes() @@ -934,8 +934,8 @@ class _Serializer(object): self._streamlist.append(data) def save(self, obj): - # calling here is not re-entrant but multiple instances - # may write to the same stream because of the common platform + # calling here is not re-entrant but multiple instances + # may write to the same stream because of the common platform # atomic-write guaruantee (concurrent writes each happen atomicly) self._save(obj) self._write(opcode.STOP) --- a/doc/example/test_debug.txt +++ b/doc/example/test_debug.txt @@ -1,16 +1,16 @@ -Debugging execnet / Wire messages +Debugging execnet / Wire messages =============================================================== By setting the environment variable ``EXECNET_DEBUG`` you can -configure the execnet tracing mechanism: +configure the execnet tracing mechanism: -:EXECNET_DEBUG=1: write per-process trace-files to ``${TEMPROOT}/execnet-debug-PID`` +:EXECNET_DEBUG=1: write per-process trace-files to ``${TEMPROOT}/execnet-debug-PID`` :EXECNET_DEBUG=2: perform tracing to stderr (popen-gateway slaves will send this to their instantiator) Here is a simple example to see what goes on with a simple execution:: - EXECNET_DEBUG=2 # or "set EXECNET_DEBUG=2" on windows + EXECNET_DEBUG=2 # or "set EXECNET_DEBUG=2" on windows python -c 'import execnet ; execnet.makegateway().remote_exec("42")' --- a/execnet/gateway_socket.py +++ b/execnet/gateway_socket.py @@ -64,14 +64,14 @@ class SocketGateway(Gateway): super(SocketGateway, self).__init__(io=io, id=id) def new_remote(cls, gateway, id, hostport=None): - """ return a new (connected) socket gateway, + """ return a new (connected) socket gateway, instantiated through the given 'gateway'. """ if hostport is None: host, port = ('localhost', 0) else: host, port = hostport - + mydir = os.path.dirname(__file__) socketserver = os.path.join(mydir, 'script', 'socketserver.py') socketserverbootstrap = "\n".join([ --- a/doc/rel-1.0.0.txt +++ b/doc/rel-1.0.0.txt @@ -2,8 +2,8 @@ Hi all, execnet enables zero-install ad-hoc instantiation of local or remote Python processes. It establishes channels for basic data communication. -Data Serialization is independent from Pickle and is tested between -Python2.4, 2.5, 2.6, 3.1 and Jython interpreters. +Data Serialization is independent from Pickle and is tested between +Python2.4, 2.5, 2.6, 3.1 and Jython interpreters. execnet-1.0.0 (compared to 1.0.0b3) has bug fixes, new tested examples and introduces execnet.Group for managing a dynamic @@ -13,7 +13,7 @@ bunch of hosts. See the improved docs and below the changelog. I am set to improve and develop execnet further and thus am very interested in feedback -and suggestions. +and suggestions. cheers, -holger +holger --- a/testing/test_basics.py +++ b/testing/test_basics.py @@ -162,13 +162,13 @@ class PseudoChannel: self._sent.append(obj) def close(self, errortext=None): self._closed.append(errortext) - + def test_exectask(): io = py.io.BytesIO() gw = gateway_base.SlaveGateway(io, id="something") ch = PseudoChannel() gw.executetask((ch, "raise ValueError()")) - assert "ValueError" in str(ch._closed[0]) + assert "ValueError" in str(ch._closed[0]) class TestMessage: --- a/execnet/__init__.py +++ b/execnet/__init__.py @@ -3,7 +3,7 @@ execnet: pure python lib for connecting (c) 2010, Holger Krekel and others """ -__version__ = "1.0.7" +__version__ = "1.0.8a1" import execnet.apipkg --- a/doc/example/redirect_remote_output.py +++ b/doc/example/redirect_remote_output.py @@ -1,10 +1,10 @@ """ -redirect output from remote to a local function +redirect output from remote to a local function showcasing features of the channel object: -- sending a channel over a channel -- adapting a channel to a file object -- setting a callback for receiving channel data +- sending a channel over a channel +- adapting a channel to a file object +- setting a callback for receiving channel data """ @@ -16,10 +16,10 @@ outchan = gw.remote_exec(""" import sys outchan = channel.gateway.newchannel() sys.stdout = outchan.makefile("w") - channel.send(outchan) + channel.send(outchan) """).receive() -# note: callbacks execute in receiver thread! +# note: callbacks execute in receiver thread! def write(data): print "received:", repr(data) outchan.setcallback(write) --- a/testing/test_channel.py +++ b/testing/test_channel.py @@ -265,7 +265,7 @@ class TestChannelBasicBehaviour: """) subchan = channel.receive() subchan.send(1) - excinfo = py.test.raises(subchan.RemoteError, + excinfo = py.test.raises(subchan.RemoteError, "subchan.waitclose(TESTTIMEOUT)") assert "42" in excinfo.value.formatted channel.send(1) --- a/README.txt +++ b/README.txt @@ -1,8 +1,8 @@ -Welcome to execnet and elastic distributed computing! +Welcome to execnet and elastic distributed computing! -Rapidly deploy tools and code to local or remote Python interpreters. +Rapidly deploy tools and code to local or remote Python interpreters. -See doc/ for more info, examples and contact info. +See doc/ for more info, examples and contact info. have fun, --- a/doc/example/remote1.py +++ b/doc/example/remote1.py @@ -1,4 +1,4 @@ -# content of a module remote1.py +# content of a module remote1.py if __name__ == '__channelexec__': channel.send('initialization complete') --- a/doc/example/hybridpython.txt +++ b/doc/example/hybridpython.txt @@ -1,7 +1,7 @@ Connecting different Python interpreters ========================================== -Connect to Python2/Numpy from Python3 +Connect to Python2/Numpy from Python3 ---------------------------------------- Here we run a Python3 interpreter to connect to a Python2.6 interpreter @@ -29,8 +29,8 @@ will print on the CPython3.1 side:: array([1, 2, 3, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) -A more refined real-life example of python3/python2 interaction -is the anyvc_ project which uses version-control bindings in +A more refined real-life example of python3/python2 interaction +is the anyvc_ project which uses version-control bindings in a Python2 subprocess in order to offer Python3-based library functionality. @@ -86,9 +86,9 @@ using Mono 2.0 and IronPython-1.1 this w System.Double[](1.0, 2.0) -.. note:: +.. note:: Using IronPython needs more testing, likely newer versions - will work better. please feedback if you have information. + will work better. please feedback if you have information. .. _IronPython: http://ironpython.net --- a/testing/test_termination.py +++ b/testing/test_termination.py @@ -51,7 +51,7 @@ def test_termination_on_remote_channel_r gw.remote_exec("channel.receive()") group.terminate() command = ["ps", "-p", str(pid)] - popen = subprocess.Popen(command, stdout=subprocess.PIPE, + popen = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) out, err = popen.communicate() out = py.builtin._totext(out, 'utf8') @@ -71,7 +71,7 @@ def test_close_initiating_remote_no_erro ch3 = gw.remote_exec("channel.receive()") execnet.default_group.terminate() """ % str(execnetdir)) - popen = subprocess.Popen([str(anypython), str(p)], + popen = subprocess.Popen([str(anypython), str(p)], stdout=subprocess.PIPE, stderr=subprocess.PIPE,) stdout, stderr = popen.communicate() print (stdout) --- a/doc/index.txt +++ b/doc/index.txt @@ -3,15 +3,15 @@ .. image:: _static/pythonring.png :align: right -Python_ is a mature dynamic language whose interpreters can interact with -all major computing platforms today. +Python_ is a mature dynamic language whose interpreters can interact with +all major computing platforms today. **execnet** provides carefully tested means to easily interact with Python -interpreters across version, platform and network barriers. It has +interpreters across version, platform and network barriers. It has a minimal and fast API targetting the following uses: * distribute tasks to local or remote CPUs -* write and deploy hybrid multi-process applications +* write and deploy hybrid multi-process applications * write scripts to administer a bunch of exec environments .. _Python: http://www.python.org @@ -23,31 +23,31 @@ Features * safe and simple serialization of python builtin types (no pickle) -* flexible communication: synchronous send/receive as well as +* flexible communication: synchronous send/receive as well as callback/queue mechanisms supported * easy creation, handling and termination of multiple processes -* well tested interactions between CPython 2.4-2.7, CPython3.1, Jython 2.5.1 +* well tested interactions between CPython 2.4-2.7, CPython3.1, Jython 2.5.1 and PyPy 1.1 interpreters. -* fully interoperable between Windows and Unix-ish systems. +* fully interoperable between Windows and Unix-ish systems. * many tested :doc:`examples` Known uses ------------------- -* `py.test`_ uses it for its `distributed testing`_ mechanism. +* `py.test`_ uses it for its `distributed testing`_ mechanism. -* Jacob Perkins uses it for his `Distributed NTLK with execnet`_ +* Jacob Perkins uses it for his `Distributed NTLK with execnet`_ project to launch computation processes through ssh. He also - compares `disco and execnet`_ in a subsequent post. + compares `disco and execnet`_ in a subsequent post. * Ronny Pfannschmidt uses it for his `anyvc`_ VCS-abstraction project - to bridge the Python2/Python3 version gap. + to bridge the Python2/Python3 version gap. -* sysadmins and developers are using it for ad-hoc custom scripting +* sysadmins and developers are using it for ad-hoc custom scripting .. _`py.test`: http://pytest.org .. _`distributed testing`: http://codespeak.net/py/dist/test/dist.html @@ -62,10 +62,10 @@ The current 1.0 series aims at :doc:`bas The 1.1 series will target setting up permanent networks and offering unix-shell-like capabilities to spawn processes and applications. -execnet was conceived and is `actively developed`_ by `Holger Krekel`_. -The package is licensed under the GPL Version 2 or later, at your choice. +execnet was conceived and is `actively developed`_ by `Holger Krekel`_. +The package is licensed under the GPL Version 2 or later, at your choice. Armin Rigo and Benjamin Peterson have done major contributions which -are MIT-licensed. +are MIT-licensed. .. _`basic API`: basics.html .. _`actively developed`: http://bitbucket.org/hpk42/execnet/changesets @@ -76,5 +76,5 @@ are MIT-licensed. support implnotes - install + install rel-1.0.0 --- a/testing/test_xspec.py +++ b/testing/test_xspec.py @@ -66,7 +66,7 @@ class TestMakegateway: def test_popen_default(self): gw = execnet.makegateway("") - assert gw.spec.popen + assert gw.spec.popen assert gw.spec.python == None rinfo = gw._rinfo() #assert rinfo.executable == py.std.sys.executable --- a/testing/test_multi.py +++ b/testing/test_multi.py @@ -19,7 +19,7 @@ class TestMultiChannelAndGateway: assert channels[0] in mch assert channels[1] in mch assert channels[2] in mch - + def test_multichannel_receive_each(self): class pseudochannel: def receive(self): --- a/setup.py +++ b/setup.py @@ -1,5 +1,5 @@ """ -execnet: rapid multi-Python deployment +execnet: rapid multi-Python deployment ======================================================== .. _execnet: http://codespeak.net/execnet @@ -8,8 +8,8 @@ execnet_ provides carefully tested means interpreters across version, platform and network barriers. It provides a minimal and fast API targetting the following uses: -* distribute tasks to local or remote CPUs -* write and deploy hybrid multi-process applications +* distribute tasks to local or remote CPUs +* write and deploy hybrid multi-process applications * write scripts to administer a bunch of exec environments Features @@ -17,7 +17,7 @@ Features * zero-install bootstrapping: no remote installation required! -* flexible communication: send/receive as well as +* flexible communication: send/receive as well as callback/queue mechanisms supported * simple serialization of python builtin types (no pickling) @@ -27,7 +27,7 @@ Features * well tested between CPython 2.4-3.1, Jython 2.5.1 and PyPy 1.1 interpreters. -* fully interoperable between Windows and Unix-ish systems. +* fully interoperable between Windows and Unix-ish systems. """ try: --- a/testing/test_serializer.py +++ b/testing/test_serializer.py @@ -222,11 +222,11 @@ def test_str(py2, py3): assert tp == "str" assert s == "'xyz'" tp, s = py3.load(p, "py2str_as_py3str=True") - assert tp == "str" + assert tp == "str" assert s == "'xyz'" tp, s = py3.load(p, "py2str_as_py3str=False") assert s == "b'xyz'" - assert tp == "bytes" + assert tp == "bytes" def test_unicode(py2, py3): p = py2.dump("u'hi'") --- a/doc/basics.txt +++ b/doc/basics.txt @@ -2,11 +2,11 @@ API in a nutshell ============================================================================== -execnet ad-hoc instantiates local and remote Python interpreters. +execnet ad-hoc instantiates local and remote Python interpreters. Each interpreter is accessible through a **Gateway** which manages code and data communication. **Channels** allow to exchange data between the local and the remote end. **Groups** -help to manage creation and termination of sub interpreters. +help to manage creation and termination of sub interpreters. .. image:: _static/basic1.png @@ -15,8 +15,8 @@ Gateways: bootstrapping Python interpret .. currentmodule:: execnet -All Gateways are instantiated via a call to ``makegateway()`` -passing it a gateway specification or URL. +All Gateways are instantiated via a call to ``makegateway()`` +passing it a gateway specification or URL. .. _xspec: @@ -26,22 +26,22 @@ Here is an example which instantiates a >>> gateway = execnet.makegateway() -gateways allow to `remote execute code`_ and -`exchange data`_ bidirectionally. +gateways allow to `remote execute code`_ and +`exchange data`_ bidirectionally. examples for valid gateway specifications ------------------------------------------- * ``ssh=wyvern//python=python2.4//chdir=mycache`` specifies a Python2.4 - interpreter on the host ``wyvern``. The remote process will have - ``mycache`` as its current working directory. + interpreter on the host ``wyvern``. The remote process will have + ``mycache`` as its current working directory. * ``popen//python=2.5//nice=20`` specification of a python2.5 - subprocess; running with the lowest CPU priority ("nice" level). + subprocess; running with the lowest CPU priority ("nice" level). By default current dir will be the current dir of the instantiator. -* ``popen//env:NAME=value`` specifies a subprocess that uses the - same interpreter as the one it is initiated from and additionally +* ``popen//env:NAME=value`` specifies a subprocess that uses the + same interpreter as the one it is initiated from and additionally remotely sets an environment variable ``NAME`` to ``value``. * ``socket=192.168.1.4:8888`` specifies a Python Socket server @@ -49,25 +49,25 @@ examples for valid gateway specification .. _`remote execute code`: -remote_exec: execute source code remotely +remote_exec: execute source code remotely =================================================== .. currentmodule:: execnet.gateway -All gateways offer a simple method to execute source code +All gateways offer a simple method to execute source code in the connected interpreter: .. automethod:: Gateway.remote_exec(source) -It is allowed to pass a module object as source code -in which case it's source code will be obtained and +It is allowed to pass a module object as source code +in which case it's source code will be obtained and get sent for remote execution. ``remote_exec`` returns -a channel object whose symmetric counterpart channel -is available to the remotely executing source. +a channel object whose symmetric counterpart channel +is available to the remotely executing source. -.. _`Channel`: -.. _`channel-api`: +.. _`Channel`: +.. _`channel-api`: .. _`exchange data`: @@ -76,8 +76,8 @@ Channels: exchanging data with remote co .. currentmodule:: execnet.gateway_base -A channel object allows to send and receive data between -two asynchronously running programs. +A channel object allows to send and receive data between +two asynchronously running programs. .. automethod:: Channel.send(item) .. automethod:: Channel.receive(timeout) @@ -98,30 +98,30 @@ Grouped Gateways and robust termination All created gateway instances are part of a group. If you call ``execnet.makegateway`` it actually is forwarded to -the ``execnet.default_group``. Group objects are container -objects (see :doc:`group examples `) +the ``execnet.default_group``. Group objects are container +objects (see :doc:`group examples `) and manage the final termination procedure: .. automethod:: Group.terminate(timeout=None) -This method is implicitely called for each gateway group at +This method is implicitely called for each gateway group at process-exit, using a small timeout. This is fine for interactive sessions or random scripts which you rather like to error out than hang. If you start many processes then you often want to call ``group.terminate()`` yourself and specify a larger or not timeout. -remote_status: get low-level execution info +remote_status: get low-level execution info =================================================== .. currentmodule:: execnet.gateway All gateways offer a simple method to obtain some status -information from the remote side. +information from the remote side. .. automethod:: Gateway.remote_status(source) -Calling this method tells you e.g. how many execution +Calling this method tells you e.g. how many execution tasks are queued, how many are executing and how many channels are active. Note that ``remote_status()`` works even if the other side is busy executing code @@ -133,7 +133,7 @@ rsync: synchronise filesystem with remot .. currentmodule:: execnet -``execnet`` implements a simple efficient rsyncing protocol. +``execnet`` implements a simple efficient rsyncing protocol. Here is a basic example for using RSync:: rsync = execnet.RSync('/tmp/source') @@ -152,8 +152,8 @@ Debugging execnet =============================================================== By setting the environment variable ``EXECNET_DEBUG`` you can -configure a tracing mechanism: +configure a tracing mechanism: -:EXECNET_DEBUG=1: write per-process trace-files to ``execnet-debug-PID`` +:EXECNET_DEBUG=1: write per-process trace-files to ``execnet-debug-PID`` :EXECNET_DEUBG=2: perform tracing to stderr (popen-gateway slaves will send this to their instantiator) --- a/execnet/gateway.py +++ b/execnet/gateway.py @@ -32,12 +32,12 @@ class Gateway(gateway_base.BaseGateway): def exit(self): """ trigger gateway exit. Defer waiting for finishing of receiver-thread and subprocess activity to when - group.terminate() is called. + group.terminate() is called. """ self._trace("gateway.exit() called") if self not in self._group: self._trace("gateway already unregistered with group") - return + return self._group._unregister(self) self._trace("--> sending GATEWAY_TERMINATE") try: @@ -50,10 +50,10 @@ class Gateway(gateway_base.BaseGateway): def _remote_bootstrap_gateway(self, io): """ send gateway bootstrap code to a remote Python interpreter - endpoint, which reads from io for a string to execute. + endpoint, which reads from io for a string to execute. """ - sendexec(io, - inspect.getsource(gateway_base), + sendexec(io, + inspect.getsource(gateway_base), self._remotesetup, "io.write('1'.encode('ascii'))", "serve(io, id='%s-slave')" % self.id, @@ -139,7 +139,7 @@ class PopenCmdGateway(Gateway): self._popen = p = Popen(args, stdin=PIPE, stdout=PIPE) io = Popen2IO(p.stdin, p.stdout) super(PopenCmdGateway, self).__init__(io=io, id=id) - # fix for jython 2.5.1 + # fix for jython 2.5.1 if p.pid is None: p.pid = self.remote_exec( "import os; channel.send(os.getpid())").receive() @@ -159,12 +159,12 @@ class PopenGateway(PopenCmdGateway): super(PopenGateway, self).__init__(args, id=id) def _remote_bootstrap_gateway(self, io): - sendexec(io, + sendexec(io, "import sys", "sys.stdout.write('1')", "sys.stdout.flush()", popen_bootstrapline) - sendexec(io, + sendexec(io, "import sys ; sys.path.insert(0, %r)" % importdir, "from execnet.gateway_base import serve, init_popen_io", "serve(init_popen_io(), id='%s-slave')" % self.id, --- a/doc/install.txt +++ b/doc/install.txt @@ -3,15 +3,15 @@ Info in a nutshell **Pythons**: 2.4, 2.5, 2.6, 2.7, 3.0, 3.1.x, Jython-2.5.1, PyPy-1.2 -**Operating systems**: Linux, Windows, OSX, Unix +**Operating systems**: Linux, Windows, OSX, Unix -**Requirements**: plain distutils, setuptools_ or Distribute_ +**Requirements**: plain distutils, setuptools_ or Distribute_ -**Installers**: easy_install_ and pip_ or distutils +**Installers**: easy_install_ and pip_ or distutils **Distribution names**: -* PyPI name: ``execnet`` +* PyPI name: ``execnet`` * redhat fedora: ``python-execnet`` * debian: ``python-execnet`` * gentoo: ``dev-python/execnet`` @@ -24,15 +24,15 @@ Installation Install a public `pypi release`_ via `easy_install`_ or pip_:: easy_install -U execnet - + or - + pip install execnet or hg clone https://hpk42 at bitbucket.org/hpk42/execnet/ - python setup.py install # or 'develop' or add checkout path to PYTHONPATH + python setup.py install # or 'develop' or add checkout path to PYTHONPATH Next checkout the basic api and examples: --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,8 @@ +1.0.8 +-------------------------------- + +- removed all trailing whitespace from source files + 1.0.7 -------------------------------- --- a/execnet/multi.py +++ b/execnet/multi.py @@ -61,13 +61,13 @@ class Group: Valid types: ``popen``, ``ssh=hostname``, ``socket=host:port``. Valid configuration:: - id= specifies the gateway id + id= specifies the gateway id python= specifies which python interpreter to execute chdir= specifies to which directory to change nice= specifies process priority of new process - env:NAME=value specifies a remote environment variable setting. + env:NAME=value specifies a remote environment variable setting. - If no spec is given, self.defaultspec is used. + If no spec is given, self.defaultspec is used. """ if not spec: spec = self.defaultspec @@ -77,7 +77,7 @@ class Group: if spec.popen: gw = gateway.PopenGateway(python=spec.python, id=id) elif spec.ssh: - gw = gateway.SshGateway(spec.ssh, remotepython=spec.python, + gw = gateway.SshGateway(spec.ssh, remotepython=spec.python, ssh_config=spec.ssh_config, id=id) elif spec.socket: assert not spec.python, ( @@ -137,10 +137,10 @@ class Group: self.terminate(timeout=1.0) def terminate(self, timeout=None): - """ trigger exit of member gateways and wait for termination - of member gateways and associated subprocesses. After waiting - timeout seconds try to to kill local sub processes of popen- - and ssh-gateways. Timeout defaults to None meaning + """ trigger exit of member gateways and wait for termination + of member gateways and associated subprocesses. After waiting + timeout seconds try to to kill local sub processes of popen- + and ssh-gateways. Timeout defaults to None meaning open-ended waiting and no kill attempts. """ for gw in self: @@ -159,7 +159,7 @@ class Group: try: reply.get(timeout=timeout) except IOError: - trace("Gateways did not come down after timeout: %r" + trace("Gateways did not come down after timeout: %r" %(self._gateways_to_join)) while self._gateways_to_join: gw = self._gateways_to_join.pop(0) @@ -185,10 +185,10 @@ class MultiChannel: def __iter__(self): return iter(self._channels) - + def __getitem__(self, key): return self._channels[key] - + def __contains__(self, chan): return chan in self._channels @@ -251,7 +251,7 @@ def killpid(pid): import ctypes except ImportError: import subprocess - # T: treekill, F: Force + # T: treekill, F: Force cmd = ("taskkill /T /F /PID %d" %(pid)).split() ret = subprocess.call(cmd) if ret != 0: --- a/doc/example/sysinfo.py +++ b/doc/example/sysinfo.py @@ -1,7 +1,7 @@ """ sysinfo.py [host1] [host2] [options] -obtain system info from remote machine. +obtain system info from remote machine. (c) Holger Krekel, GPLv2 or 3 """ @@ -56,7 +56,7 @@ class RemoteInfo: def getmemswap(self): if self.islinux(): - return self.exreceive(""" + return self.exreceive(""" import commands, re out = commands.getoutput("free") mem = re.search(r"Mem:\s+(\S*)", out).group(1) @@ -110,8 +110,8 @@ def getinfo(sshname, ssh_config=None, lo prefix = sshname.upper() + " " print >>loginfo, prefix, "fqdn:", ri.getfqdn() for attr in ( - "sys.platform", - "sys.version_info", + "sys.platform", + "sys.version_info", ): loginfo.write("%s %s: " %(prefix, attr,)) loginfo.flush() @@ -122,14 +122,14 @@ def getinfo(sshname, ssh_config=None, lo memswap = ri.getmemswap() if memswap: mem,swap = memswap - print >>loginfo, prefix, "Memory:", mem, "Swap:", swap + print >>loginfo, prefix, "Memory:", mem, "Swap:", swap cpuinfo = ri.getcpuinfo() if cpuinfo: numcpu, model = cpuinfo print >>loginfo, prefix, "number of cpus:", numcpu - print >>loginfo, prefix, "cpu model", model + print >>loginfo, prefix, "cpu model", model return ri - + if __name__ == '__main__': options, args = parser.parse_args() hosts = list(args) @@ -142,4 +142,4 @@ if __name__ == '__main__': for host in hosts: if host not in ignores: getinfo(host, ssh_config=ssh_config) - + --- a/execnet/xspec.py +++ b/execnet/xspec.py @@ -28,7 +28,7 @@ class XSpec: if key in self.__dict__: raise ValueError("duplicate key: %r in %r" %(key, string)) if key.startswith("env:"): - self.env[key[4:]] = value + self.env[key[4:]] = value else: setattr(self, key, value) --- a/doc/check_sphinx.py +++ b/doc/check_sphinx.py @@ -4,13 +4,13 @@ def test_linkcheck(tmpdir): doctrees = tmpdir.join("doctrees") htmldir = tmpdir.join("html") subprocess.check_call( - ["sphinx-build", "-W", "-blinkcheck", + ["sphinx-build", "-W", "-blinkcheck", "-d", str(doctrees), ".", str(htmldir)]) def test_build_docs(tmpdir): doctrees = tmpdir.join("doctrees") htmldir = tmpdir.join("html") subprocess.check_call([ - "sphinx-build", "-W", "-bhtml", + "sphinx-build", "-W", "-bhtml", "-d", str(doctrees), ".", str(htmldir)]) --- a/doc/changelog.txt +++ b/doc/changelog.txt @@ -2,7 +2,7 @@ .. _changes: -execnet CHANGELOG +execnet CHANGELOG ******************** .. include:: ../CHANGELOG --- a/conftest.py +++ b/conftest.py @@ -75,7 +75,7 @@ def pytest_generate_tests(metafunc): for gwtype in gwtypes: metafunc.addcall(id=gwtype, param=gwtype) elif 'anypython' in metafunc.funcargnames: - for name in ('python3.1', 'python2.4', 'python2.5', 'python2.6', + for name in ('python3.1', 'python2.4', 'python2.5', 'python2.6', 'python2.7', 'pypy-c', 'jython'): metafunc.addcall(id=name, param=name) @@ -86,7 +86,7 @@ def getexecutable(name, cache={}): executable = py.path.local.sysfind(name) if executable: if name == "jython": - popen = subprocess.Popen([str(executable), "--version"], + popen = subprocess.Popen([str(executable), "--version"], universal_newlines=True, stderr=subprocess.PIPE) out, err = popen.communicate() if not err or "2.5" not in err: @@ -110,7 +110,7 @@ def pytest_funcarg__anypython(request): def pytest_funcarg__gw(request): scope = request.config.option.scope group = request.cached_setup( - setup=execnet.Group, + setup=execnet.Group, teardown=lambda group: group.terminate(timeout=1), extrakey="testgroup", scope=scope, @@ -118,7 +118,7 @@ def pytest_funcarg__gw(request): try: return group[request.param] except KeyError: - if request.param == "popen": + if request.param == "popen": gw = group.makegateway("popen//id=popen") elif request.param == "socket": pname = 'sproxy1' @@ -128,7 +128,7 @@ def pytest_funcarg__gw(request): gw = group.makegateway("socket//id=socket//installvia=%s" % pname) gw.proxygw = proxygw assert pname in group - + elif request.param == "ssh": sshhost = request.getfuncargvalue('specssh').ssh gw = group.makegateway("ssh=%s//id=ssh" %(sshhost,)) --- a/doc/example/svn-sync-repo.py +++ b/doc/example/svn-sync-repo.py @@ -1,9 +1,9 @@ -#!/usr/bin/env python +#!/usr/bin/env python """ -small utility for hot-syncing a svn repository through ssh. -uses execnet. +small utility for hot-syncing a svn repository through ssh. +uses execnet. """ @@ -25,22 +25,22 @@ def main(args): else: configfile = None remote_host, path = remote.split(':', 1) - print "ssh-connecting to", remote_host + print "ssh-connecting to", remote_host gw = getgateway(remote_host, configfile) local_rev = get_svn_youngest(localrepo) - # local protocol - # 1. client sends rev/repo -> server - # 2. server checks for newer revisions and sends dumps - # 3. client receives dumps, updates local repo + # local protocol + # 1. client sends rev/repo -> server + # 2. server checks for newer revisions and sends dumps + # 3. client receives dumps, updates local repo # 4. client goes back to step 1 c = gw.remote_exec(""" import py import os remote_rev, repopath = channel.receive() - while 1: - rev = py.process.cmdexec('svnlook youngest "%s"' % repopath) + while 1: + rev = py.process.cmdexec('svnlook youngest "%s"' % repopath) rev = int(rev) if rev > remote_rev: revrange = (remote_rev+1, rev) @@ -49,11 +49,11 @@ def main(args): channel.send(dumpchannel) f = os.popen( - "svnadmin dump -q --incremental -r %s:%s %s" + "svnadmin dump -q --incremental -r %s:%s %s" % (revrange[0], revrange[1], repopath), 'r') try: maxcount = dumpchannel.receive() - count = maxcount + count = maxcount while 1: s = f.read(8192) if not s: @@ -62,11 +62,11 @@ def main(args): count = count - 1 if count <= 0: ack = dumpchannel.receive() - count = maxcount - + count = maxcount + except EOFError: dumpchannel.close() - remote_rev = rev + remote_rev = rev else: # using svn-hook instead would be nice here py.std.time.sleep(30) @@ -74,17 +74,17 @@ def main(args): c.send((local_rev, path)) print "checking revisions from %d in %s" %(local_rev, remote) - while 1: + while 1: revstart, revend = c.receive() - dumpchannel = c.receive() + dumpchannel = c.receive() print "receiving revisions", revstart, "-", revend, "replaying..." svn_load(localrepo, dumpchannel) - print "current revision", revend + print "current revision", revend def svn_load(repo, dumpchannel, maxcount=100): # every maxcount we will send an ACK to the other # side in order to synchronise and avoid our side - # growing buffers (execnet does not control + # growing buffers (execnet does not control # RAM usage or receive queue sizes) dumpchannel.send(maxcount) f = os.popen("svnadmin load -q %s" %(repo, ), "w") @@ -98,10 +98,10 @@ def svn_load(repo, dumpchannel, maxcount dumpchannel.send(maxcount) count = maxcount print >>sys.stdout - f.close() + f.close() def get_svn_youngest(repo): - rev = py.process.cmdexec('svnlook youngest "%s"' % repo) + rev = py.process.cmdexec('svnlook youngest "%s"' % repo) return int(rev) def getgateway(host, configfile=None): --- a/testing/test_rsync.py +++ b/testing/test_rsync.py @@ -94,7 +94,7 @@ class TestRSync: assert not err def test_symlink_rsync(self, dirs, gw1): - if (py.std.sys.platform == 'win32' or + if (py.std.sys.platform == 'win32' or getattr(py.std.os, '_name', '') == 'nt'): py.test.skip("symlinks are unsupported on Windows.") source = dirs.source --- a/doc/example/popen_read_multiple.py +++ b/doc/example/popen_read_multiple.py @@ -1,7 +1,7 @@ """ example -reading results from possibly blocking code running in sub processes. +reading results from possibly blocking code running in sub processes. """ import execnet @@ -9,7 +9,7 @@ NUM_PROCESSES = 5 channels = [] for i in range(NUM_PROCESSES): - gw = execnet.makegateway() # or use SSH or socket gateways + gw = execnet.makegateway() # or use SSH or socket gateways channel = gw.remote_exec(""" import time secs = channel.receive() @@ -24,12 +24,12 @@ queue = mc.make_receive_queue() print "***", "verifying that timeout on receiving results from blocked subprocesses works" try: - queue.get(timeout=1.0) + queue.get(timeout=1.0) except Exception: pass print "*** sending subprocesses some data to have them unblock" -mc.send_each(1) +mc.send_each(1) print "*** receiving results asynchronously" for i in range(NUM_PROCESSES): --- a/doc/example/test_group.txt +++ b/doc/example/test_group.txt @@ -1,7 +1,7 @@ Managing multiple gateways and clusters ================================================== - -Usings Groups for managing multiple gateways + +Usings Groups for managing multiple gateways ------------------------------------------------------ Use ``execnet.Group`` to manage membership and lifetime of @@ -13,23 +13,23 @@ of multiple gateways:: 2 >>> group - >>> list(group) + >>> list(group) [, ] >>> 'gw0' in group and 'gw1' in group True - >>> group['gw0'] == group[0] + >>> group['gw0'] == group[0] True - >>> group['gw1'] == group[1] + >>> group['gw1'] == group[1] True >>> group.terminate() # exit all member gateways >>> group -Assigning Gateway IDs +Assigning Gateway IDs ------------------------------------------------------ All gateways are created as part of a group and receive -a per-group unique ``id`` after successful initialization. +a per-group unique ``id`` after successful initialization. Pass an ``id=MYNAME`` part to ``group.makegateway``. Example:: >>> import execnet @@ -56,7 +56,7 @@ you actually use the ``execnet.default_g Robust Termination of ssh/popen processes ----------------------------------------------- -Use ``group.terminate(timeout)`` if you want to terminate +Use ``group.terminate(timeout)`` if you want to terminate member gateways and ensure that no local sub processes remain you can specify a ``timeout`` after which an attempt at killing the related process is made:: @@ -71,8 +71,8 @@ the related process is made:: >>> group -execnet aims to provide totally robust termination so if -you have left-over processes or other termination issues +execnet aims to provide totally robust termination so if +you have left-over processes or other termination issues please :doc:`report them <../support>`. thanks! @@ -85,7 +85,7 @@ specification used by ``group.makegatewa >>> import execnet >>> group = execnet.Group() >>> group.defaultspec = "ssh=localhost//chdir=mytmp//nice=20" - >>> gw = group.makegateway() + >>> gw = group.makegateway() >>> ch = gw.remote_exec(""" ... import os.path ... basename = os.path.basename(os.getcwd()) --- a/doc/example/test_multi.txt +++ b/doc/example/test_multi.txt @@ -1,7 +1,7 @@ -advanced (multi) channel communication +advanced (multi) channel communication ===================================================== -MultiChannel: container for multiple channels +MultiChannel: container for multiple channels ------------------------------------------------------ Use ``execnet.MultiChannel`` to work with multiple channels:: @@ -29,9 +29,9 @@ from which to obtain results:: >>> ch2 = execnet.makegateway().remote_exec("channel.send(2)") >>> mch = execnet.MultiChannel([ch1, ch2]) >>> queue = mch.make_receive_queue() - >>> chan1, res1 = queue.get() + >>> chan1, res1 = queue.get() >>> chan2, res2 = queue.get(timeout=3) - >>> res1 + res2 + >>> res1 + res2 3 Working asynchronously/event-based with channels @@ -46,18 +46,18 @@ data immediately and without blocking ex >>> l = [] >>> ch.setcallback(l.append) >>> ch.send(1) - >>> ch.waitclose() + >>> ch.waitclose() >>> assert l == [42] Note that the callback function will be executed in the -receiver thread and should not block or run for too long. +receiver thread and should not block or run for too long. -robustly receive results and termination notification +robustly receive results and termination notification ----------------------------------------------------- Use ``MultiChannel.make_receive_queue(endmarker)`` to specify an object to be put to the queue when the remote side of a channel -is closed. The endmarker will also be put to the Queue if the gateway +is closed. The endmarker will also be put to the Queue if the gateway is blocked in execution and is terminated/killed:: >>> group = execnet.Group(['popen'] * 3) # create three gateways --- a/doc/example/test_info.txt +++ b/doc/example/test_info.txt @@ -4,11 +4,11 @@ basic local and remote communication Execute code in subprocess, communicate through a channel --------------------------------------------------------- -You can instantiate a subprocess gateway, execute code +You can instantiate a subprocess gateway, execute code in it and exchange data dynamically:: >>> import execnet - >>> gw = execnet.makegateway() + >>> gw = execnet.makegateway() >>> channel = gw.remote_exec("channel.send(channel.receive()+1)") >>> channel.send(1) >>> channel.receive() @@ -20,7 +20,7 @@ Compare current working directories A local subprocess gateway has the same working directory as the instantiatior:: >>> import execnet, os - >>> gw = execnet.makegateway() + >>> gw = execnet.makegateway() >>> ch = gw.remote_exec("import os; channel.send(os.getcwd())") >>> res = ch.receive() >>> assert res == os.getcwd() @@ -44,13 +44,13 @@ Use simple execution to obtain informati >>> version_info (2, 4, 2, 'final', 0) -Use a callback instead of receive() and wait for completion +Use a callback instead of receive() and wait for completion ------------------------------------------------------------- Set a channel callback to immediately react on incoming data:: >>> import execnet - >>> gw = execnet.makegateway() + >>> gw = execnet.makegateway() >>> channel = gw.remote_exec("for i in range(10): channel.send(i)") >>> l = [] >>> channel.setcallback(l.append, endmarker=None) @@ -59,9 +59,9 @@ Set a channel callback to immediately re [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, None] Note that the callback function will execute in the receiver thread -so it should not block on IO or long to execute. +so it should not block on IO or long to execute. -Sending channels over channels +Sending channels over channels ------------------------------------------------------ You can create and transfer a channel over an existing channel @@ -71,7 +71,7 @@ and use it to transfer information:: >>> gw = execnet.makegateway() >>> channel = gw.remote_exec(""" ... ch1, ch2 = channel.receive() - ... ch2.send("world") + ... ch2.send("world") ... ch1.send("hello") ... """) >>> c1 = gw.newchannel() # create new channel @@ -87,9 +87,9 @@ Avoid "inlined" source strings with remo You can pass a module object to ``remote_exec`` in which case its source code will be sent. No dependencies will be transferred -so the module must be self-contained or only use modules that are -installed on the "other" side. Module code can detect if it is -running in a remote_exec situation by checking for the special +so the module must be self-contained or only use modules that are +installed on the "other" side. Module code can detect if it is +running in a remote_exec situation by checking for the special ``__name__`` attribute. .. include:: remote1.py @@ -103,7 +103,7 @@ You can now remote-execute the module li >>> print (ch.receive()) initialization complete -which will print the 'initialization complete' string. +which will print the 'initialization complete' string. a simple command loop pattern -------------------------------------------------------------- @@ -128,29 +128,29 @@ Our remotecmd module starts up remote se through the ``for item in channel`` loop which will terminate when the channel closes. It evaluates all incoming requests in the global name space and -sends back the results. +sends back the results. -Instantiate gateways through sockets +Instantiate gateways through sockets ----------------------------------------------------- .. _`socketserver.py`: http://bitbucket.org/hpk42/execnet/raw/80baab4140de/execnet/script/socketserver.py In cases where you do not have SSH-access to a machine -you need to download a small version-independent standalone -`socketserver.py`_ script to provide a remote bootstrapping-point. -You do not need to install the execnet package remotely. +you need to download a small version-independent standalone +`socketserver.py`_ script to provide a remote bootstrapping-point. +You do not need to install the execnet package remotely. Simply run the script like this:: python socketserver.py :8888 # bind to all IPs, port 8888 -You can then instruct execnet on your local machine to bootstrap +You can then instruct execnet on your local machine to bootstrap itself into the remote socket endpoint:: import execnet gw = execnet.SocketGateway("TARGET-IP:8888") That's it, you can now use the gateway object just like -a popen- or ssh-based one. +a popen- or ssh-based one. .. include:: test_ssh_fileserver.txt --- a/doc/support.txt +++ b/doc/support.txt @@ -1,4 +1,4 @@ -Contact and Support channels +Contact and Support channels ------------------------------ If you have interest, questions, issues or suggestions you @@ -7,9 +7,9 @@ are welcome to: * join `execnet-dev`_ for general discussions * join `execnet-commit`_ to be notified of changes * clone the `bitbucket repository`_ and submit patches -* hang out on the irc.freenode.net #pylib channel +* hang out on the irc.freenode.net #pylib channel * follow the `tetamap blog`_ or `Holger's twitter presence`_. -* contact merlinux_ if you want to buy teaching or other support. +* contact merlinux_ if you want to buy teaching or other support. .. _`Holger's twitter presence`: http://twitter.com/hpk42 .. _merlinux: http://merlinux.eu --- a/doc/example/test_ssh_fileserver.txt +++ b/doc/example/test_ssh_fileserver.txt @@ -1,5 +1,5 @@ -Receive file contents from remote SSH account +Receive file contents from remote SSH account ----------------------------------------------------- Here is some small server code that you can use to retrieve @@ -16,7 +16,7 @@ And here is some code to use it to retri channel = gw.remote_exec(servefiles) for fn in ('/etc/passwd', '/etc/group'): - channel.send(fn) + channel.send(fn) content = channel.receive() print(fn) print(content) From commits-noreply at bitbucket.org Tue Jul 27 14:25:39 2010 From: commits-noreply at bitbucket.org (commits-noreply at bitbucket.org) Date: Tue, 27 Jul 2010 12:25:39 +0000 (UTC) Subject: [execnet-commit] execnet commit aebb1812a988: update apipkg usage Message-ID: <20100727122539.CE4467EE78@bitbucket.org> # HG changeset patch -- Bitbucket.org # Project execnet # URL http://bitbucket.org/hpk42/execnet/overview # User holger krekel # Date 1280233436 -7200 # Node ID aebb1812a988e28a1cbebc4f166bd60e9acddd70 # Parent 19148dc174d28e4086fe81f0549a27333673065a update apipkg usage --- a/execnet/apipkg.py +++ b/execnet/apipkg.py @@ -8,16 +8,17 @@ see http://pypi.python.org/pypi/apipkg import sys from types import ModuleType -__version__ = "1.0b4" +__version__ = "1.0b6" def initpkg(pkgname, exportdefs): """ initialize given package from the export definitions. """ mod = ApiModule(pkgname, exportdefs, implprefix=pkgname) oldmod = sys.modules[pkgname] mod.__file__ = getattr(oldmod, '__file__', None) - mod.__version__ = getattr(oldmod, '__version__', None) - mod.__path__ = getattr(oldmod, '__path__', None) - mod.__loader__ = getattr(oldmod, '__loader__', None) + mod.__version__ = getattr(oldmod, '__version__', '0') + for name in ('__path__', '__loader__'): + if hasattr(oldmod, name): + setattr(mod, name, getattr(oldmod, name)) sys.modules[pkgname] = mod def importobj(modpath, attrname): @@ -71,7 +72,10 @@ class ApiModule(ModuleType): else: result = importobj(modpath, attrname) setattr(self, name, result) - del self.__map__[name] + try: + del self.__map__[name] + except KeyError: + pass # in a recursive-import situation a double-del can happen return result __getattr__ = __makeattr --- a/CHANGELOG +++ b/CHANGELOG @@ -2,6 +2,7 @@ 1.0.8 -------------------------------- - removed all trailing whitespace from source files +- update usage of apipkg to 1.0b6 1.0.7 -------------------------------- From commits-noreply at bitbucket.org Tue Jul 27 16:56:30 2010 From: commits-noreply at bitbucket.org (commits-noreply at bitbucket.org) Date: Tue, 27 Jul 2010 14:56:30 +0000 (UTC) Subject: [execnet-commit] execnet commit 102af6badef9: fix rsyncing of symlinks Message-ID: <20100727145630.036EA47854@bitbucket.org> # HG changeset patch -- Bitbucket.org # Project execnet # URL http://bitbucket.org/hpk42/execnet/overview # User holger krekel # Date 1280241116 -7200 # Node ID 102af6badef9f9979469f192cdfefcc7d2040b12 # Parent aebb1812a988e28a1cbebc4f166bd60e9acddd70 fix rsyncing of symlinks --- a/execnet/rsync.py +++ b/execnet/rsync.py @@ -159,8 +159,8 @@ class RSync(object): for channel in self._channels: channel.send(msg) - def _send_link(self, basename, linkpoint): - self._links.append(("link", basename, linkpoint)) + def _send_link(self, linktype, basename, linkpoint): + self._links.append((linktype, basename, linkpoint)) def _send_directory(self, path): # dir: send a list of entries @@ -178,14 +178,12 @@ class RSync(object): def _send_link_structure(self, path): linkpoint = os.readlink(path) basename = path[len(self._sourcedir) + 1:] - if not linkpoint.startswith(os.sep): - # relative link, just send it - # XXX: do sth with ../ links - self._send_link(basename, linkpoint) - elif linkpoint.startswith(self._sourcedir): - self._send_link(basename, linkpoint[len(self._sourcedir) + 1:]) + if linkpoint.startswith(self._sourcedir): + self._send_link("linkbase", basename, + linkpoint[len(self._sourcedir) + 1:]) else: - self._send_link(basename, linkpoint) + # relative or absolute link, just send it + self._send_link("link", basename, linkpoint) self._broadcast(None) def _send_directory_structure(self, path): --- a/execnet/rsync_remote.py +++ b/execnet/rsync_remote.py @@ -82,14 +82,17 @@ def serve_rsync(channel): while msg != 42: # we get symlink _type, relpath, linkpoint = msg - assert _type == "link" path = os.path.join(destdir, relpath) try: remove(path) except OSError: pass - - os.symlink(os.path.join(destdir, linkpoint), path) + if _type == "linkbase": + src = os.path.join(destdir, linkpoint) + else: + assert _type == "link", _type + src = linkpoint + os.symlink(src, path) msg = channel.receive() channel.send(("done", None)) --- a/testing/test_rsync.py +++ b/testing/test_rsync.py @@ -93,22 +93,40 @@ class TestRSync: assert not out assert not err + @py.test.mark.skipif("not hasattr(os, 'symlink')") def test_symlink_rsync(self, dirs, gw1): - if (py.std.sys.platform == 'win32' or - getattr(py.std.os, '_name', '') == 'nt'): - py.test.skip("symlinks are unsupported on Windows.") source = dirs.source dest = dirs.dest1 - dirs.source.ensure("existant") - source.join("rellink").mksymlinkto(source.join("existant"), absolute=0) - source.join('abslink').mksymlinkto(source.join("existant")) + sourcefile = dirs.source.ensure("subdir", "existant") + source.join("rellink").mksymlinkto(sourcefile, absolute=0) + source.join('abslink').mksymlinkto(sourcefile) rsync = RSync(source) rsync.add_target(gw1, dest) rsync.send() - assert dest.join('rellink').readlink() == dest.join("existant") - assert dest.join('abslink').readlink() == dest.join("existant") + expected = dest.join(sourcefile.relto(dirs.source)) + assert dest.join('rellink').readlink() == "subdir/existant" + assert dest.join('abslink').readlink() == expected + + @py.test.mark.skipif("not hasattr(os, 'symlink')") + def test_symlink2_rsync(self, dirs, gw1): + source = dirs.source + dest = dirs.dest1 + subdir = dirs.source.ensure("subdir", dir=1) + sourcefile = subdir.ensure("somefile") + subdir.join("link1").mksymlinkto(subdir.join("link2"), absolute=0) + subdir.join("link2").mksymlinkto(sourcefile, absolute=1) + subdir.join("link3").mksymlinkto(source.dirpath(), absolute=1) + rsync = RSync(source) + rsync.add_target(gw1, dest) + rsync.send() + expected = dest.join(sourcefile.relto(dirs.source)) + destsub = dest.join("subdir") + assert destsub.check() + assert destsub.join('link1').readlink() == "link2" + assert destsub.join('link2').readlink() == expected + assert destsub.join('link3').readlink() == source.dirpath() def test_callback(self, dirs, gw1): dest = dirs.dest1 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,8 +1,10 @@ 1.0.8 -------------------------------- +- fix rsyncing of symlinks, thanks to Charles Solar + (should also resolve http://bitbucket.org/hpk42/py-trunk/issue/70/) +- update usage of apipkg to 1.0b6 - removed all trailing whitespace from source files -- update usage of apipkg to 1.0b6 1.0.7 -------------------------------- From commits-noreply at bitbucket.org Tue Jul 27 19:42:30 2010 From: commits-noreply at bitbucket.org (commits-noreply at bitbucket.org) Date: Tue, 27 Jul 2010 17:42:30 +0000 (UTC) Subject: [execnet-commit] execnet commit fcda30bac7e5: rsync path (dir and files) permission/mode information to the other side Message-ID: <20100727174230.8FC2F5D590@bitbucket.org> # HG changeset patch -- Bitbucket.org # Project execnet # URL http://bitbucket.org/hpk42/execnet/overview # User holger krekel # Date 1280252273 -7200 # Node ID fcda30bac7e50fabf3da1c2550636de8c2190d73 # Parent 102af6badef9f9979469f192cdfefcc7d2040b12 rsync path (dir and files) permission/mode information to the other side --- a/execnet/rsync.py +++ b/execnet/rsync.py @@ -49,7 +49,7 @@ class RSync(object): def _process_link(self, channel): for link in self._links: channel.send(link) - # completion marker, this host is done + # completion marker, this host is done channel.send(42) def _done(self, channel): @@ -58,6 +58,7 @@ class RSync(object): finishedcallback = self._channels.pop(channel) if finishedcallback: finishedcallback() + channel.waitclose() def _list_done(self, channel): # sum up all to send @@ -171,7 +172,8 @@ class RSync(object): if self.filter(p): names.append(name) subpaths.append(p) - self._broadcast(names) + mode = os.lstat(path).st_mode + self._broadcast([mode] + names) for p in subpaths: self._send_directory_structure(p) @@ -190,11 +192,11 @@ class RSync(object): try: st = os.lstat(path) except OSError: - self._broadcast((0, 0)) + self._broadcast((None, 0, 0)) return if stat.S_ISREG(st.st_mode): - # regular file: send a timestamp/size pair - self._broadcast((st.st_mtime, st.st_size)) + # regular file: send a mode/timestamp/size pair + self._broadcast((st.st_mode, st.st_mtime, st.st_size)) elif stat.S_ISDIR(st.st_mode): self._send_directory(path) elif stat.S_ISLNK(st.st_mode): --- a/execnet/rsync_remote.py +++ b/execnet/rsync_remote.py @@ -30,10 +30,13 @@ def serve_rsync(channel): st = None if not st: os.makedirs(path) + mode = msg.pop(0) + if mode: + os.chmod(path, mode) entrynames = {} for entryname in msg: - receive_directory_structure(os.path.join(path, entryname), - relcomponents + [entryname]) + destpath = os.path.join(path, entryname) + receive_directory_structure(destpath, relcomponents + [entryname]) entrynames[entryname] = True if options.get('delete'): for othername in os.listdir(path): @@ -41,16 +44,20 @@ def serve_rsync(channel): otherpath = os.path.join(path, othername) remove(otherpath) elif msg is not None: + assert isinstance(msg, tuple) checksum = None if st: if stat.S_ISREG(st.st_mode): - msg_mtime, msg_size = msg + msg_mode, msg_mtime, msg_size = msg if msg_size != st.st_size: pass elif msg_mtime != st.st_mtime: f = open(path, 'rb') checksum = md5(f.read()).digest() f.close() + elif msg_mode and msg_mode != st.st_mode: + os.chmod(path, msg_mode) + return else: return # already fine else: @@ -62,7 +69,7 @@ def serve_rsync(channel): STRICT_CHECK = False # seems most useful this way for py.test channel.send(("list_done", None)) - for path, (time, size) in modifiedfiles: + for path, (mode, time, size) in modifiedfiles: data = channel.receive() channel.send(("ack", path[len(destdir) + 1:])) if data is not None: @@ -72,6 +79,8 @@ def serve_rsync(channel): f.write(data) f.close() try: + if mode: + os.chmod(path, mode) os.utime(path, (time, time)) except OSError: pass --- a/testing/test_rsync.py +++ b/testing/test_rsync.py @@ -93,6 +93,44 @@ class TestRSync: assert not out assert not err + @py.test.mark.skipif( + "sys.platform == 'win32' or getattr(os, '_name', '') == 'nt'") + def test_permissions(self, dirs, gw1, gw2): + source = dirs.source + dest = dirs.dest1 + onedir = dirs.source.ensure("one", dir=1) + onedir.chmod(448) + onefile = dirs.source.ensure("file") + onefile.chmod(504) + onefile_mtime = onefile.stat().mtime + + print "status of gw1", gw1 + rsync = RSync(source) + rsync.add_target(gw1, dest) + rsync.send() + print "status of gw1 after rsync", gw1 + + destdir = dirs.dest1.join(onedir.basename) + destfile = dirs.dest1.join(onefile.basename) + assert destfile.stat().mode & 511 == 504 + mode = destdir.stat().mode + assert mode & 511 == 448 + + # transfer again with changed permissions + onedir.chmod(504) + onefile.chmod(448) + onefile.setmtime(onefile_mtime) + + rsync = RSync(source) + rsync.add_target(gw1, dest) + print "invoking second send", gw1 + rsync.send() + + mode = destfile.stat().mode + assert mode & 511 == 448, mode + mode = destdir.stat().mode + assert mode & 511 == 504 + @py.test.mark.skipif("not hasattr(os, 'symlink')") def test_symlink_rsync(self, dirs, gw1): source = dirs.source --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ 1.0.8 -------------------------------- +- enhance rsyncing to also sync permissions (stat().st_mode) + of directories and files. - fix rsyncing of symlinks, thanks to Charles Solar (should also resolve http://bitbucket.org/hpk42/py-trunk/issue/70/) - update usage of apipkg to 1.0b6 From commits-noreply at bitbucket.org Tue Jul 27 19:48:39 2010 From: commits-noreply at bitbucket.org (commits-noreply at bitbucket.org) Date: Tue, 27 Jul 2010 17:48:39 +0000 (UTC) Subject: [execnet-commit] execnet commit 1629c46531f0: fix syntax for python3.1 Message-ID: <20100727174839.A15635D590@bitbucket.org> # HG changeset patch -- Bitbucket.org # Project execnet # URL http://bitbucket.org/hpk42/execnet/overview # User holger krekel # Date 1280252673 -7200 # Node ID 1629c46531f0c8a2786a0325060f6b43de78bb60 # Parent fcda30bac7e50fabf3da1c2550636de8c2190d73 fix syntax for python3.1 --- a/testing/test_rsync.py +++ b/testing/test_rsync.py @@ -104,11 +104,9 @@ class TestRSync: onefile.chmod(504) onefile_mtime = onefile.stat().mtime - print "status of gw1", gw1 rsync = RSync(source) rsync.add_target(gw1, dest) rsync.send() - print "status of gw1 after rsync", gw1 destdir = dirs.dest1.join(onedir.basename) destfile = dirs.dest1.join(onefile.basename) @@ -123,7 +121,6 @@ class TestRSync: rsync = RSync(source) rsync.add_target(gw1, dest) - print "invoking second send", gw1 rsync.send() mode = destfile.stat().mode From commits-noreply at bitbucket.org Tue Jul 27 20:09:52 2010 From: commits-noreply at bitbucket.org (commits-noreply at bitbucket.org) Date: Tue, 27 Jul 2010 18:09:52 +0000 (UTC) Subject: [execnet-commit] execnet commit 69122c9b1fc7: update linecache before reading module-source (thanks ronny, mattmtl) Message-ID: <20100727180952.5925E47835@bitbucket.org> # HG changeset patch -- Bitbucket.org # Project execnet # URL http://bitbucket.org/hpk42/execnet/overview # User holger krekel # Date 1280254138 -7200 # Node ID 69122c9b1fc7036a1188b4099c3972d7f9454137 # Parent 1629c46531f0c8a2786a0325060f6b43de78bb60 update linecache before reading module-source (thanks ronny, mattmtl) --- a/execnet/gateway.py +++ b/execnet/gateway.py @@ -3,7 +3,7 @@ gateway code for initiating popen, socke (c) 2004-2009, Holger Krekel and others """ -import sys, os, inspect, types +import sys, os, inspect, types, linecache import textwrap import execnet from execnet.gateway_base import Message, Popen2IO @@ -89,6 +89,7 @@ class Gateway(gateway_base.BaseGateway): namespace. """ if isinstance(source, types.ModuleType): + linecache.updatecache(inspect.getsourcefile(source)) source = inspect.getsource(source) else: source = textwrap.dedent(str(source)) --- a/testing/test_gateway.py +++ b/testing/test_gateway.py @@ -75,6 +75,10 @@ class TestBasicGateway: channel = gw.remote_exec(mod) name = channel.receive() assert name == 1 + p.write("channel.send(2)") + channel = gw.remote_exec(mod) + name = channel.receive() + assert name == 2 def test_correct_setup_no_py(self, gw): channel = gw.remote_exec(""" --- a/CHANGELOG +++ b/CHANGELOG @@ -3,9 +3,12 @@ 1.0.8 - enhance rsyncing to also sync permissions (stat().st_mode) of directories and files. + (should also resolve http://bitbucket.org/hpk42/py-trunk/issue/68/) - fix rsyncing of symlinks, thanks to Charles Solar (should also resolve http://bitbucket.org/hpk42/py-trunk/issue/70/) - update usage of apipkg to 1.0b6 +- remote_exec(module) now makes sure that the linecache is updated + before reading and sending the source. thanks Ronny, Matt. - removed all trailing whitespace from source files 1.0.7 From commits-noreply at bitbucket.org Thu Jul 29 21:56:29 2010 From: commits-noreply at bitbucket.org (commits-noreply at bitbucket.org) Date: Thu, 29 Jul 2010 19:56:29 +0000 (UTC) Subject: [execnet-commit] execnet commit a6967fecef38: use tuple slicing instead of list comprehension to ensure rinfo version_info is always a tuple Message-ID: <20100729195629.9F3094786D@bitbucket.org> # HG changeset patch -- Bitbucket.org # Project execnet # URL http://bitbucket.org/hpk42/execnet/overview # User Ronny Pfannschmidt # Date 1280432094 -7200 # Node ID a6967fecef38c44108658428528eb62054459c3a # Parent 366dc8f9884dd385cf8d0afcba4d29028a7aae9e use tuple slicing instead of list comprehension to ensure rinfo version_info is always a tuple --- a/execnet/gateway.py +++ b/execnet/gateway.py @@ -128,7 +128,7 @@ rinfo_source = """ import sys, os channel.send(dict( executable = sys.executable, - version_info = tuple([sys.version_info[i] for i in range(5)]), + version_info = sys.version_info[:5], platform = sys.platform, cwd = os.getcwd(), pid = os.getpid(), From commits-noreply at bitbucket.org Thu Jul 29 21:56:30 2010 From: commits-noreply at bitbucket.org (commits-noreply at bitbucket.org) Date: Thu, 29 Jul 2010 19:56:30 +0000 (UTC) Subject: [execnet-commit] execnet commit 2aadb3761ab7: put onre more dependent function under import-ast-skip protection Message-ID: <20100729195630.7970D4786F@bitbucket.org> # HG changeset patch -- Bitbucket.org # Project execnet # URL http://bitbucket.org/hpk42/execnet/overview # User holger krekel # Date 1280433310 -7200 # Node ID 2aadb3761ab70d908029646dcadbb645becd5ef2 # Parent 8955e7b959b14154a80cddac6b2932cac11ded08 put onre more dependent function under import-ast-skip protection --- a/testing/test_basics.py +++ b/testing/test_basics.py @@ -236,11 +236,6 @@ class TestSourceOfFunction(object): py.test.raises(ValueError, gateway._source_of_function, closure) - def test_function_with_global_fails(self): - def func(channel): - test - py.test.raises(ValueError, gateway._source_of_function, func) - def test_function_call_concat(self): def working(channel): @@ -253,7 +248,7 @@ class TestSourceOfFunction(object): class TestGlobalFinder(object): - def setup_method(self, method): + def setup_class(cls): py.test.importorskip('ast') def check(self, func): @@ -276,9 +271,16 @@ class TestGlobalFinder(object): assert self.check(f) == ['glob'] + def test_builtin(self): def f(): len assert self.check(f) == [] + def test_function_with_global_fails(self): + def func(channel): + test + py.test.raises(ValueError, gateway._source_of_function, func) + + From commits-noreply at bitbucket.org Thu Jul 29 21:56:29 2010 From: commits-noreply at bitbucket.org (commits-noreply at bitbucket.org) Date: Thu, 29 Jul 2010 19:56:29 +0000 (UTC) Subject: [execnet-commit] execnet commit 1aedf1fb7759: implement and test passing functions to remote_exec Message-ID: <20100729195629.2AC594786B@bitbucket.org> # HG changeset patch -- Bitbucket.org # Project execnet # URL http://bitbucket.org/hpk42/execnet/overview # User Ronny Pfannschmidt # Date 1280405486 -7200 # Node ID 1aedf1fb7759aa4b3c0aec457630b54d9cd7cfe3 # Parent 69122c9b1fc7036a1188b4099c3972d7f9454137 implement and test passing functions to remote_exec --- a/execnet/gateway.py +++ b/execnet/gateway.py @@ -91,6 +91,8 @@ class Gateway(gateway_base.BaseGateway): if isinstance(source, types.ModuleType): linecache.updatecache(inspect.getsourcefile(source)) source = inspect.getsource(source) + elif isinstance(source, types.FunctionType): + source = _source_of_function(source) else: source = textwrap.dedent(str(source)) channel = self.newchannel() @@ -133,6 +135,33 @@ channel.send(dict( )) """ + +def _source_of_function(function): + if function.__name__ == '': + raise ValueError("can't evaluate lambda functions'") + argspec = inspect.getargspec(function) + if argspec != (['channel'], None, None, None): + raise ValueError( + 'the expected function prototype is %s(channel)' % function.__name__ + ) + + if sys.version_info < (3,0): + closure = function.func_closure + else: + closure = function.__closure__ + + if closure is not None: + raise ValueError("functions with closures can't be passed") + + try: + source = inspect.getsource(function) + except IOError: + raise ValueError("can't find source file for %s" % function) + + source = textwrap.dedent(source) # just for inner functions + return '%s\n%s(channel)' % (source, function.__name__) + + class PopenCmdGateway(Gateway): _remotesetup = "io = init_popen_io()" def __init__(self, args, id): --- a/testing/test_basics.py +++ b/testing/test_basics.py @@ -211,3 +211,36 @@ class TestPureChannel: py.test.raises(ValueError, 'channel.makefile("rw")') +class TestSourceOfFunction(object): + + def test_lambda_unsupported(self): + py.test.raises(ValueError, gateway._source_of_function, lambda:1) + + def test_wrong_prototype_fails(self): + def prototype(wrong): + pass + py.test.raises(ValueError, gateway._source_of_function, prototype) + + def test_function_without_known_source_fails(self): + # this one wont be able to find the source + mess = {} + py.builtin.exec_('def fail(channel): pass', mess, mess) + import inspect + print(inspect.getsourcefile(mess['fail'])) + py.test.raises(ValueError, gateway._source_of_function, mess['fail']) + + def test_function_with_closure_fails(self): + mess = {} + def closure(channel): + print(mess) + + py.test.raises(ValueError, gateway._source_of_function, closure) + + def test_function_call_concat(self): + def working(channel): + pass + + send_source = gateway._source_of_function(working) + assert send_source.startswith('def working') + assert send_source.endswith('working(channel)') + From commits-noreply at bitbucket.org Thu Jul 29 21:56:29 2010 From: commits-noreply at bitbucket.org (commits-noreply at bitbucket.org) Date: Thu, 29 Jul 2010 19:56:29 +0000 (UTC) Subject: [execnet-commit] execnet commit 366dc8f9884d: integrate maciej's global finder for the remote_exec(function) sanity checks Message-ID: <20100729195629.487A04786C@bitbucket.org> # HG changeset patch -- Bitbucket.org # Project execnet # URL http://bitbucket.org/hpk42/execnet/overview # User Ronny Pfannschmidt # Date 1280417960 -7200 # Node ID 366dc8f9884dd385cf8d0afcba4d29028a7aae9e # Parent 1aedf1fb7759aa4b3c0aec457630b54d9cd7cfe3 integrate maciej's global finder for the remote_exec(function) sanity checks taken from http://codespeak.net/svn/user/fijal/glob/ --- a/execnet/gateway.py +++ b/execnet/gateway.py @@ -136,6 +136,25 @@ channel.send(dict( """ +def _find_non_builtin_globals(source, codeobj): + try: + import ast + except ImportError: + return None + try: + import __builtin__ + except ImportError: + import builtins as __builtin__ + + vars = dict.fromkeys(codeobj.co_varnames) + all = [] + for node in ast.walk(ast.parse(source)): + if (isinstance(node, ast.Name) and node.id not in vars and + node.id not in __builtin__.__dict__): + all.append(node.id) + return all + + def _source_of_function(function): if function.__name__ == '': raise ValueError("can't evaluate lambda functions'") @@ -147,8 +166,10 @@ def _source_of_function(function): if sys.version_info < (3,0): closure = function.func_closure + codeobj = function.func_code else: closure = function.__closure__ + codeobj = function.__code__ if closure is not None: raise ValueError("functions with closures can't be passed") @@ -159,6 +180,14 @@ def _source_of_function(function): raise ValueError("can't find source file for %s" % function) source = textwrap.dedent(source) # just for inner functions + + used_globals = _find_non_builtin_globals(source, codeobj) + if used_globals: + raise ValueError( + "the use of non-builtin globals isn't supported", + used_globals, + ) + return '%s\n%s(channel)' % (source, function.__name__) --- a/testing/test_basics.py +++ b/testing/test_basics.py @@ -236,6 +236,12 @@ class TestSourceOfFunction(object): py.test.raises(ValueError, gateway._source_of_function, closure) + def test_function_with_global_fails(self): + def func(channel): + test + py.test.raises(ValueError, gateway._source_of_function, func) + + def test_function_call_concat(self): def working(channel): pass @@ -244,3 +250,32 @@ class TestSourceOfFunction(object): assert send_source.startswith('def working') assert send_source.endswith('working(channel)') + +class TestGlobalFinder(object): + + def check(self, func): + src = py.code.Source(func) + code = py.code.Code(func) + return gateway._find_non_builtin_globals(str(src), code.raw) + + def test_local(self): + def f(a, b, c): + d = 3 + pass + + assert self.check(f) == [] + + def test_global(self): + def f(a, b): + c = 3 + glob + d = 4 + + assert self.check(f) == ['glob'] + + def test_builtin(self): + def f(): + len + + assert self.check(f) == [] + From commits-noreply at bitbucket.org Thu Jul 29 21:56:30 2010 From: commits-noreply at bitbucket.org (commits-noreply at bitbucket.org) Date: Thu, 29 Jul 2010 19:56:30 +0000 (UTC) Subject: [execnet-commit] execnet commit 8955e7b959b1: auto-skip tests for global finder if the ast module is missing Message-ID: <20100729195630.3D04A4786E@bitbucket.org> # HG changeset patch -- Bitbucket.org # Project execnet # URL http://bitbucket.org/hpk42/execnet/overview # User Ronny Pfannschmidt # Date 1280432734 -7200 # Node ID 8955e7b959b14154a80cddac6b2932cac11ded08 # Parent a6967fecef38c44108658428528eb62054459c3a auto-skip tests for global finder if the ast module is missing --- a/testing/test_basics.py +++ b/testing/test_basics.py @@ -253,6 +253,9 @@ class TestSourceOfFunction(object): class TestGlobalFinder(object): + def setup_method(self, method): + py.test.importorskip('ast') + def check(self, func): src = py.code.Source(func) code = py.code.Code(func) From commits-noreply at bitbucket.org Thu Jul 29 23:23:53 2010 From: commits-noreply at bitbucket.org (commits-noreply at bitbucket.org) Date: Thu, 29 Jul 2010 21:23:53 +0000 (UTC) Subject: [execnet-commit] execnet commit 0263548163b4: whitespace-fix Message-ID: <20100729212353.E4CA35D6F1@bitbucket.org> # HG changeset patch -- Bitbucket.org # Project execnet # URL http://bitbucket.org/hpk42/execnet/overview # User holger krekel # Date 1280438367 -7200 # Node ID 0263548163b4a7b9b937d6282c52aed0fb8903a9 # Parent 12bc646542073c59837b18ca5d4627430a1598b8 whitespace-fix --- a/tox.ini +++ b/tox.ini @@ -29,9 +29,9 @@ basepython=jython [testenv:docs] basepython=python -changedir=doc +changedir=doc deps=sphinx py commands= py.test --confcutdir=.. \ - -rsfxX -v --junitxml={envlogdir}/junit-{envname}.xml check_sphinx.py + -rsfxX -v --junitxml={envlogdir}/junit-{envname}.xml check_sphinx.py From commits-noreply at bitbucket.org Thu Jul 29 23:23:54 2010 From: commits-noreply at bitbucket.org (commits-noreply at bitbucket.org) Date: Thu, 29 Jul 2010 21:23:54 +0000 (UTC) Subject: [execnet-commit] execnet commit c61e461998eb: adding an example about calling with kwargs (pending some implementation from ronny) Message-ID: <20100729212354.2F8715D702@bitbucket.org> # HG changeset patch -- Bitbucket.org # Project execnet # URL http://bitbucket.org/hpk42/execnet/overview # User holger krekel # Date 1280438354 -7200 # Node ID c61e461998ebdf0be5591d56a1163ab5fb904319 # Parent 2aadb3761ab70d908029646dcadbb645becd5ef2 adding an example about calling with kwargs (pending some implementation from ronny) --- a/doc/example/test_info.txt +++ b/doc/example/test_info.txt @@ -1,8 +1,8 @@ basic local and remote communication ========================================= -Execute code in subprocess, communicate through a channel ---------------------------------------------------------- +Execute source code in subprocess, communicate through a channel +------------------------------------------------------------------- You can instantiate a subprocess gateway, execute code in it and exchange data dynamically:: @@ -14,6 +14,32 @@ in it and exchange data dynamically:: >>> channel.receive() 2 +Remote-Execute a function, communicate through a channel +------------------------------------------------------------------- + +You can send and execute parametrized functions remotely like this:: + + import execnet + gw = execnet.makegateway() + + def adder(channel, arg1, arg2): + channel.send(arg1+arg2) + + channel = gw.remote_exec(adder, arg1=1, arg2=15) + result = channel.receive() + assert result == 16 + +The ``adder`` function needs to be "pure", i.e. is not allowed +to access global state. Its source code will be send to +the remote side and called with the supplied ``arg1`` +and ``arg2`` parameters as well as with the channel that allows +it to communicate back. All arguments must be sendable over a +channel so can only contain instances of builtin types. + +Note that this example does not work on the Python prompt because +``remote_exec()`` uses ``inspect.getsource(func)`` and this does +not work for interactively defined functions. + Compare current working directories ---------------------------------------- From commits-noreply at bitbucket.org Thu Jul 29 23:23:54 2010 From: commits-noreply at bitbucket.org (commits-noreply at bitbucket.org) Date: Thu, 29 Jul 2010 21:23:54 +0000 (UTC) Subject: [execnet-commit] execnet commit 12bc64654207: adding a missing file Message-ID: <20100729212354.68CF15D72A@bitbucket.org> # HG changeset patch -- Bitbucket.org # Project execnet # URL http://bitbucket.org/hpk42/execnet/overview # User holger krekel # Date 1280438360 -7200 # Node ID 12bc646542073c59837b18ca5d4627430a1598b8 # Parent c61e461998ebdf0be5591d56a1163ab5fb904319 adding a missing file --- /dev/null +++ b/doc/example/servefiles.py @@ -0,0 +1,11 @@ + +# content of servefiles.py + +def servefiles(channel): + for fn in channel: + f = open(fn, 'rb') + channel.send(f.read()) + f.close() + +if __name__ == "__channelexec__": + servefiles(channel) From commits-noreply at bitbucket.org Fri Jul 30 13:19:12 2010 From: commits-noreply at bitbucket.org (commits-noreply at bitbucket.org) Date: Fri, 30 Jul 2010 11:19:12 +0000 (UTC) Subject: [execnet-commit] execnet commit 5ab79d17a84d: merge Message-ID: <20100730111912.3FC425D756@bitbucket.org> # HG changeset patch -- Bitbucket.org # Project execnet # URL http://bitbucket.org/hpk42/execnet/overview # User holger krekel # Date 1280479539 -7200 # Node ID 5ab79d17a84dfa9c661924017a486cbe37d60a80 # Parent 0263548163b4a7b9b937d6282c52aed0fb8903a9 # Parent 74cb0db0dea80bf0dfb5fd9029c685a699910ff3 merge From commits-noreply at bitbucket.org Fri Jul 30 13:19:12 2010 From: commits-noreply at bitbucket.org (commits-noreply at bitbucket.org) Date: Fri, 30 Jul 2010 11:19:12 +0000 (UTC) Subject: [execnet-commit] execnet commit 74cb0db0dea8: enable gw.remote_exec(function, **kwargs) Message-ID: <20100730111912.2C0AB5D755@bitbucket.org> # HG changeset patch -- Bitbucket.org # Project execnet # URL http://bitbucket.org/hpk42/execnet/overview # User Ronny Pfannschmidt # Date 1280475589 -7200 # Node ID 74cb0db0dea80bf0dfb5fd9029c685a699910ff3 # Parent 2aadb3761ab70d908029646dcadbb645becd5ef2 enable gw.remote_exec(function, **kwargs) this is implemented by extending the CHANNEL_EXEC message it now contains source, call_name and kwargs instead of just source this helps with failing early in case of non-serializable arguments and removes the need for argument retrival and calling in the source --- a/execnet/gateway.py +++ b/execnet/gateway.py @@ -82,21 +82,28 @@ class Gateway(gateway_base.BaseGateway): self._channelfactory._local_close(channel.id) return RemoteStatus(statusdict) - def remote_exec(self, source): + def remote_exec(self, source, **kwargs): """ return channel object and connect it to a remote execution thread where the given 'source' executes and has the sister 'channel' object in its global namespace. """ + call_name = None if isinstance(source, types.ModuleType): linecache.updatecache(inspect.getsourcefile(source)) source = inspect.getsource(source) elif isinstance(source, types.FunctionType): + call_name = source.__name__ source = _source_of_function(source) else: source = textwrap.dedent(str(source)) + + if call_name is None and kwargs: + raise TypeError("can't pass kwargs to non-function remote_exec") + channel = self.newchannel() - self._send(Message.CHANNEL_EXEC(channel.id, source)) + self._send(Message.CHANNEL_EXEC(channel.id, + (source, call_name, kwargs))) return channel def remote_init_threads(self, num=None): @@ -158,11 +165,11 @@ def _find_non_builtin_globals(source, co def _source_of_function(function): if function.__name__ == '': raise ValueError("can't evaluate lambda functions'") - argspec = inspect.getargspec(function) - if argspec != (['channel'], None, None, None): - raise ValueError( - 'the expected function prototype is %s(channel)' % function.__name__ - ) + #XXX: we dont check before remote instanciation + # if arguments are used propperly + args, varargs, keywords, defaults = inspect.getargspec(function) + if args[0] != 'channel': + raise ValueError('expected first function argument to be `channel`') if sys.version_info < (3,0): closure = function.func_closure @@ -188,8 +195,7 @@ def _source_of_function(function): used_globals, ) - return '%s\n%s(channel)' % (source, function.__name__) - + return source class PopenCmdGateway(Gateway): _remotesetup = "io = init_popen_io()" --- a/testing/test_basics.py +++ b/testing/test_basics.py @@ -167,7 +167,7 @@ def test_exectask(): io = py.io.BytesIO() gw = gateway_base.SlaveGateway(io, id="something") ch = PseudoChannel() - gw.executetask((ch, "raise ValueError()")) + gw.executetask((ch, ("raise ValueError()", None, {}))) assert "ValueError" in str(ch._closed[0]) @@ -237,13 +237,13 @@ class TestSourceOfFunction(object): py.test.raises(ValueError, gateway._source_of_function, closure) - def test_function_call_concat(self): + def test_source_of_nested_function(self): def working(channel): pass send_source = gateway._source_of_function(working) - assert send_source.startswith('def working') - assert send_source.endswith('working(channel)') + expected = 'def working(channel):\n pass\n' + assert send_source == expected class TestGlobalFinder(object): @@ -284,3 +284,22 @@ class TestGlobalFinder(object): py.test.raises(ValueError, gateway._source_of_function, func) +def test_remote_exec_function_with_kwargs(anypython): + def func(channel, data): + channel.send(data) + group = execnet.Group() + gw = group.makegateway('popen//python=%s' % anypython) + ch = gw.remote_exec(func, data=1) + result = ch.receive() + assert result == 1 + + + +def test_remote_exc_module_takes_no_kwargs(): + gw = execnet.makegateway() + py.test.raises(TypeError, gw.remote_exec, gateway_base, kwarg=1) + +def test_remote_exec_string_takes_no_kwargs(): + gw = execnet.makegateway() + py.test.raises(TypeError, gw.remote_exec, 'pass', kwarg=1) + --- a/execnet/gateway_base.py +++ b/execnet/gateway_base.py @@ -715,7 +715,7 @@ class SlaveGateway(BaseGateway): self._trace("swallowing keyboardinterrupt in main-thread") def executetask(self, item): - channel, source = item + channel, (source, call_name, kwargs) = item try: loc = {'channel' : channel, '__name__': '__channelexec__'} self._trace("execution starts[%s]: %s" % (channel.id, repr(source)[:50])) @@ -723,6 +723,10 @@ class SlaveGateway(BaseGateway): try: co = compile(source+'\n', '', 'exec') do_exec(co, loc) + if call_name: + self._trace('calling function [%s] with %s'%(call_name, repr(kwargs)[:50])) + function = loc[call_name] + function(channel, **kwargs) finally: channel._executing = False self._trace("execution finished") From commits-noreply at bitbucket.org Fri Jul 30 13:19:12 2010 From: commits-noreply at bitbucket.org (commits-noreply at bitbucket.org) Date: Fri, 30 Jul 2010 11:19:12 +0000 (UTC) Subject: [execnet-commit] execnet commit 334f92823ec3: unify writeto/readfrom signatures a bit Message-ID: <20100730111912.54F955D757@bitbucket.org> # HG changeset patch -- Bitbucket.org # Project execnet # URL http://bitbucket.org/hpk42/execnet/overview # User holger krekel # Date 1280485925 -7200 # Node ID 334f92823ec3627672bde77f12e08500ef3af7cc # Parent 5ab79d17a84dfa9c661924017a486cbe37d60a80 unify writeto/readfrom signatures a bit --- a/testing/test_basics.py +++ b/testing/test_basics.py @@ -78,7 +78,7 @@ def test_io_message(anypython, tmpdir): io.infile.seek(0) io.infile.write(x) io.infile.seek(0) - msg2 = Message.readfrom(unserializer) + msg2 = Message.readfrom(io, None) assert msg1.channelid == msg2.channelid, (msg1, msg2) assert msg1.data == msg2.data print ("all passed") @@ -178,8 +178,7 @@ class TestMessage: data = '23'.encode('ascii') cls(42, data).writeto(one) two = py.io.BytesIO(one.getvalue()) - unserializer = gateway_base.Unserializer(two) - msg = Message.readfrom(unserializer) + msg = Message.readfrom(two, None) assert isinstance(msg, cls) assert msg.channelid == 42 assert msg.data == data --- a/execnet/gateway_base.py +++ b/execnet/gateway_base.py @@ -111,9 +111,11 @@ class Message: def writeto(self, io): serialize(io, (self.msgtype, self.channelid, self.data)) - def readfrom(cls, unserializer): + def readfrom(cls, io, channelfactory): + unserializer = Unserializer(io, channelfactory) msgtype, senderid, data = unserializer.load() return cls._types[msgtype](senderid, data) + readfrom = classmethod(readfrom) def __repr__(self): @@ -611,9 +613,8 @@ class BaseGateway(object): eof = False try: try: - unserializer = Unserializer(self._io, self._channelfactory) while 1: - msg = Message.readfrom(unserializer) + msg = Message.readfrom(self._io, self._channelfactory) self._trace("received", msg) _receivelock = self._receivelock _receivelock.acquire() From commits-noreply at bitbucket.org Fri Jul 30 13:19:12 2010 From: commits-noreply at bitbucket.org (commits-noreply at bitbucket.org) Date: Fri, 30 Jul 2010 11:19:12 +0000 (UTC) Subject: [execnet-commit] execnet commit 235d6213891e: * adding docs and changelog for remote_exec(func, **kwargs) Message-ID: <20100730111912.91FF75D758@bitbucket.org> # HG changeset patch -- Bitbucket.org # Project execnet # URL http://bitbucket.org/hpk42/execnet/overview # User holger krekel # Date 1280488466 -7200 # Node ID 235d6213891ea595ea069ae171675a7f39d118f9 # Parent 334f92823ec3627672bde77f12e08500ef3af7cc * adding docs and changelog for remote_exec(func, **kwargs) * fixing a py2/py3 interaction bug, turning up in my tox run --- /dev/null +++ b/doc/example/test_funcmultiplier.py @@ -0,0 +1,3 @@ + +def test_function(): + import funcmultiplier --- a/execnet/gateway_base.py +++ b/execnet/gateway_base.py @@ -12,7 +12,7 @@ try: except ImportError: import Queue as queue -ISPY3 = sys.version_info > (3, 0) +ISPY3 = sys.version_info >= (3, 0) if ISPY3: exec("def do_exec(co, loc): exec(co, loc)\n" "def reraise(cls, val, tb): raise val\n") @@ -716,16 +716,26 @@ class SlaveGateway(BaseGateway): self._trace("swallowing keyboardinterrupt in main-thread") def executetask(self, item): - channel, (source, call_name, kwargs) = item try: + channel, (source, call_name, kwargs) = item + if not ISPY3 and kwargs: + # some python2 versions do not accept unicode keyword params + # note: Unserializer generally turns py2-str to py3-str objects + newkwargs = {} + for name, value in kwargs.items(): + if isinstance(name, unicode): + name = name.encode('ascii') + newkwargs[name] = value + kwargs = newkwargs loc = {'channel' : channel, '__name__': '__channelexec__'} - self._trace("execution starts[%s]: %s" % (channel.id, repr(source)[:50])) + self._trace("execution starts[%s]: %s" % + (channel.id, repr(source)[:50])) channel._executing = True try: co = compile(source+'\n', '', 'exec') do_exec(co, loc) if call_name: - self._trace('calling function [%s] with %s'%(call_name, repr(kwargs)[:50])) + self._trace('calling %s(**%60r)' % (call_name, kwargs)) function = loc[call_name] function(channel, **kwargs) finally: --- /dev/null +++ b/doc/example/funcmultiplier.py @@ -0,0 +1,17 @@ + +import execnet + +def multiplier(channel, factor): + while not channel.isclosed(): + param = channel.receive() + channel.send(param * factor) + +gw = execnet.makegateway() +channel = gw.remote_exec(multiplier, factor=10) + +for i in range(5): + channel.send(i) + result = channel.receive() + assert result == i * 10 + +gw.exit() --- a/doc/index.txt +++ b/doc/index.txt @@ -6,13 +6,18 @@ Python_ is a mature dynamic language whose interpreters can interact with all major computing platforms today. -**execnet** provides carefully tested means to easily interact with Python -interpreters across version, platform and network barriers. It has +**execnet** provides a `share-nothing model`_ with `channel-send/receive`_ +communication for distributing execution across many Python interpreters +across version, platform and network barriers. It has a minimal and fast API targetting the following uses: -* distribute tasks to local or remote CPUs +* distribute tasks to (many) local or remote CPUs * write and deploy hybrid multi-process applications -* write scripts to administer a bunch of exec environments +* write scripts to administer multiple environments + +.. _`channel-send/receive`: http://en.wikipedia.org/wiki/Channel_(programming) +.. _`share-nothing model`: http://en.wikipedia.org/wiki/Shared_nothing_architecture + .. _Python: http://www.python.org @@ -22,6 +27,7 @@ Features * automatic bootstrapping: no manual remote installation. * safe and simple serialization of python builtin types (no pickle) + for sending/receiving structured data messages. * flexible communication: synchronous send/receive as well as callback/queue mechanisms supported --- a/doc/conf.py +++ b/doc/conf.py @@ -46,7 +46,7 @@ copyright = '2009, holger krekel and oth # # The short X.Y version. import execnet -version = "1.0.7" +version = "1.0.8" # The full version, including alpha/beta/rc tags. release = version --- a/CHANGELOG +++ b/CHANGELOG @@ -1,14 +1,26 @@ 1.0.8 -------------------------------- +- new ``gateway.remote_exec(func, **kwargs)`` style fo executing + a pure function with parameters. The function on the remote + side also needs to accept a ``channel`` which allows it to + communicate back and forth. Thanks to Ronny Pfannschmidt + for implementing it with special kudos to Maciej Fijalkowski + for writing a "pure-function" checker so that on Python2.6 + onwards non-pure functions will be rejected. + - enhance rsyncing to also sync permissions (stat().st_mode) of directories and files. (should also resolve http://bitbucket.org/hpk42/py-trunk/issue/68/) + - fix rsyncing of symlinks, thanks to Charles Solar (should also resolve http://bitbucket.org/hpk42/py-trunk/issue/70/) -- update usage of apipkg to 1.0b6 + +- update internal usage of apipkg to 1.0b6 + - remote_exec(module) now makes sure that the linecache is updated before reading and sending the source. thanks Ronny, Matt. + - removed all trailing whitespace from source files 1.0.7 --- a/doc/example/test_info.txt +++ b/doc/example/test_info.txt @@ -5,7 +5,7 @@ Execute source code in subprocess, commu ------------------------------------------------------------------- You can instantiate a subprocess gateway, execute code -in it and exchange data dynamically:: +in it and bidirectionally send messages:: >>> import execnet >>> gw = execnet.makegateway() @@ -14,31 +14,64 @@ in it and exchange data dynamically:: >>> channel.receive() 2 -Remote-Execute a function, communicate through a channel +The initiating and the remote execution happen concurrently. +``channel.receive()`` operations return when input is available. +``channel.send(data)`` operations return when the message could +be delivered to the IO system. + +The initiating and the "other" process work use a `share-nothing +model`_ and ``channel.send|receive`` are means to pass basic data +messages between two processes. + +.. _`share-nothing model`: http://en.wikipedia.org/wiki/Shared_nothing_architecture + +remote-exec a function (avoiding inlined source part I) ------------------------------------------------------------------- -You can send and execute parametrized functions remotely like this:: +You can send and remote execute parametrized pure functions like this: - import execnet - gw = execnet.makegateway() +.. include:: funcmultiplier.py + :literal: - def adder(channel, arg1, arg2): - channel.send(arg1+arg2) +The ``multiplier`` function executes remotely and establishes +a loop multipliying incoming data with a constant factor passed +in via keyword arguments to ``remote_exec``. - channel = gw.remote_exec(adder, arg1=1, arg2=15) - result = channel.receive() - assert result == 16 +Notes: -The ``adder`` function needs to be "pure", i.e. is not allowed -to access global state. Its source code will be send to -the remote side and called with the supplied ``arg1`` -and ``arg2`` parameters as well as with the channel that allows -it to communicate back. All arguments must be sendable over a -channel so can only contain instances of builtin types. +* unfortunately, you can not type this example interactively because + ``inspect.getsource(func)`` fails for interactively defined + functions. -Note that this example does not work on the Python prompt because -``remote_exec()`` uses ``inspect.getsource(func)`` and this does -not work for interactively defined functions. +* on Python2.6 and onwards you will get an explicit error + if you try to execute non-pure functions, i.e. functions + that access any global state (which will not be available + remotely as we have a share-nothing model between the nodes). + + +remote-exec a module (avoiding inlined source part II) +-------------------------------------------------------------- + +You can pass a module object to ``remote_exec`` in which case +its source code will be sent. No dependencies will be transferred +so the module must be self-contained or only use modules that are +installed on the "other" side. Module code can detect if it is +running in a remote_exec situation by checking for the special +``__name__`` attribute. + +.. include:: remote1.py + :literal: + +You can now remote-execute the module like this:: + + >>> import execnet, remote1 + >>> gw = execnet.makegateway() + >>> ch = gw.remote_exec(remote1) + >>> print (ch.receive()) + initialization complete + +which will print the 'initialization complete' string. + Compare current working directories ---------------------------------------- @@ -108,28 +141,7 @@ and use it to transfer information:: >>> c2.receive() 'world' -Avoid "inlined" source strings with remote_exec --------------------------------------------------------------- -You can pass a module object to ``remote_exec`` in which case -its source code will be sent. No dependencies will be transferred -so the module must be self-contained or only use modules that are -installed on the "other" side. Module code can detect if it is -running in a remote_exec situation by checking for the special -``__name__`` attribute. - -.. include:: remote1.py - :literal: - -You can now remote-execute the module like this:: - - >>> import execnet, remote1 - >>> gw = execnet.makegateway() - >>> ch = gw.remote_exec(remote1) - >>> print (ch.receive()) - initialization complete - -which will print the 'initialization complete' string. a simple command loop pattern -------------------------------------------------------------- --- a/execnet/gateway.py +++ b/execnet/gateway.py @@ -84,9 +84,19 @@ class Gateway(gateway_base.BaseGateway): def remote_exec(self, source, **kwargs): """ return channel object and connect it to a remote - execution thread where the given 'source' executes - and has the sister 'channel' object in its global - namespace. + execution thread where the given ``source`` executes. + + * ``source`` is a string: execute source string remotely + with a ``channel`` put into the global namespace. + * ``source`` is a pure function: serialize source and + call function with ``**kwargs``, adding a + ``channel`` object to the keyword arguments. + * ``source`` is a pure module: execute source of module + with a ``channel`` in its global namespace + + In all cases the binding ``__name__='__channelexec__'`` + will be available in the global namespace of the remotely + executing code. """ call_name = None if isinstance(source, types.ModuleType): --- a/execnet/__init__.py +++ b/execnet/__init__.py @@ -3,7 +3,7 @@ execnet: pure python lib for connecting (c) 2010, Holger Krekel and others """ -__version__ = "1.0.8a1" +__version__ = "1.0.8" import execnet.apipkg --- a/testing/test_basics.py +++ b/testing/test_basics.py @@ -284,10 +284,13 @@ class TestGlobalFinder(object): def test_remote_exec_function_with_kwargs(anypython): + import sys def func(channel, data): channel.send(data) group = execnet.Group() gw = group.makegateway('popen//python=%s' % anypython) + print ("local version_info %r" %(sys.version_info,)) + print ("remote info: %s" % (gw._rinfo(),)) ch = gw.remote_exec(func, data=1) result = ch.receive() assert result == 1 From commits-noreply at bitbucket.org Sat Jul 31 00:15:09 2010 From: commits-noreply at bitbucket.org (commits-noreply at bitbucket.org) Date: Fri, 30 Jul 2010 22:15:09 +0000 (UTC) Subject: [execnet-commit] execnet commit 988e10cbd44d: Added tag 1.0.8 for changeset 235d6213891e Message-ID: <20100730221509.610105D75B@bitbucket.org> # HG changeset patch -- Bitbucket.org # Project execnet # URL http://bitbucket.org/hpk42/execnet/overview # User holger krekel # Date 1280527956 -7200 # Node ID 988e10cbd44d9666784ad0d64acd44628f4aa757 # Parent 235d6213891ea595ea069ae171675a7f39d118f9 Added tag 1.0.8 for changeset 235d6213891e --- a/.hgtags +++ b/.hgtags @@ -11,3 +11,4 @@ 61ae5d8a32582d48871672ba894b2761fcaeec43 61ae5d8a32582d48871672ba894b2761fcaeec43 1.0.6 f4f785cd543ec95455907841840ba2b965f1400b 1.0.6 e25a7d3c706f61911ae34d7828e07ad8ff316365 1.0.7 +235d6213891ea595ea069ae171675a7f39d118f9 1.0.8