[py-dev] distributed computing with py.execnet
Carl Friedrich Bolz
cfbolz at gmx.de
Tue Jun 21 02:16:19 CEST 2005
Hi Holger!
holger krekel wrote:
> In theory and probably even in practice you should not
> have to use a separate socket. What keeps you from (re)using
> a channel there as well?
Hum. I confess that I only read the docs not the source until now. The
docs mention no channels with callbacks, only the blocking
channel.receive (which isn't enough for my use case). I even thought
that there had to be an easier way, but well...
>
> For example, you may do:
>
> # client (which manages the cluster)
>
> def receivernotificationcallback((hostid, result)):
> # process result coming from hostid (be careful because
> # this callback executes directly in the IO receive
> # thread, you may want to just put things in a queue
> # and process it in some collecting-result thread)
>
> channel = gw.newchannel(receivernotificationcallback)
>
> for hostid in ...:
> gw.remote_exec(..., channel=channel) # use preconstructed channel
> # and the callback resp.
> so the other side's stuff will go the callback function directly.
>
Ah. Everything becomes clearer now! With a callback this my toy example
is even shorter. See below.
> If you want to have a channel for control-messages and
> one for results you could do:
>
> for hostid in ...:
> notify_channel = gw.newchannel(receivernotificationcallback)
> channel = gw.remote_exec(...)
> channel.send(notify_channel)
>
> the other side can then receive the result channel and sent
> the result when it is ready. This is probably a cleaner
> solution than the former.
I didn't even know that sending channels is possible.
Thanks a lot for the explanations!
Carl Friedrich
P.S.: did you know that
285542542228279613901563566102164008326164238644702889199247456602284400390600653875954571505539843239754513915896150297878399377056071435169747221107988791198200988477531339214282772016059009904586686254989084815735422480409022344297588352526004383890632616124076317387416881148592486188361873904175783145696016919574390765598280188599035578448591077683677175520434074287726578006266759615970759521327828555662781678385691581844436444812511562428136742490459363212810180276096088111401003377570363545725120924073646921576797146199387619296560302680261790118132925012323046444438622308877924609373773012481681672424493674474488537770155783006880852648161513067144814790288366664062257274665275787127374649231096375001170901890786263324619578795731425693805073056119677580338084333381987500902968831935913095269821311141322393356490178488728982288156282600813831296143663845945431144043753821542871277745606447858564159213328443580206422714694913091762716447041689678070096773590429808909616
750452927258000843500344831628297089902728649981994387647234574276263729694848304750917174186181130688518792748622612293341368928056634384466646326572476167275660839105650528975713899320211121495795311427946254553305387067821067601768750977866100460014602138408448021225053689054793742003095722096732954750721718115531871310231057902608580607L
is a prime?
#-------------server.py---------------
import py
import sys
chunk_size = 50
source = py.code.Source(py.path.local().join("task.py").read())
nodes = ["cfbolz at linux", "jana at albert"]
class TaskDistributor(object):
def __init__(self, nodes, chunk_size=1):
self.i = 1
self.nodes = nodes
self.chunk_size = chunk_size
self.channels = []
for node in nodes:
print node
gw = py.execnet.SshGateway(node)
channel = gw.newchannel(self.new_task)
gw.remote_exec(source, channel=channel)
self.channels.append(channel)
channel.send(len(self.channels) - 1)
channel.send(range(self.i, self.i + self.chunk_size))
self.i += self.chunk_size
def new_task(self, message):
number, results = message
channel = self.channels[number]
channel.send("ok")
channel.send(range(self.i, self.i + self.chunk_size))
self.i += self.chunk_size
for result in results:
if result[1]:
print "%s = 2**%s - 1 is a prime! result from node %s"
% (2 ** result[0] - 1, result[0], number)
if not result[1]:
print "2**%s - 1 is not prime" % (self.i, )
if __name__ == '__main__':
TaskDistributor(nodes, chunk_size)
while 1:
pass
#-------------task.py---------------
import random
def fermattest(p, tests=30):
if p == 1:
return False
for i in xrange(tests):
a = long(random.random() * (p - 1) + 1)
if not pow(a, p, p) == a:
return False
return True
def M(p):
return 2 ** p - 1
def lucastest(p):
curr = 4
m = M(p)
for i in range(p - 2):
curr = (curr ** 2 - 2) % m
if curr == 0:
return True
return False
def process_task(task_data):
results = []
for p in task_data:
if not fermattest(p):
results.append((p, False))
else:
results.append((p, lucastest(p)))
return results
mynumber = channel.receive()
task_data = channel.receive()
while 1:
result = process_task(task_data)
channel.send((mynumber, result))
assert channel.receive() == "ok"
task_data = channel.receive()
More information about the py-dev
mailing list