[py-dev] distributed computing with py.execnet

Carl Friedrich Bolz cfbolz at gmx.de
Tue Jun 21 02:16:19 CEST 2005


Hi Holger!

holger krekel wrote:
> In theory and probably even in practice you should not 
> have to use a separate socket. What keeps you from (re)using 
> a channel there as well?

Hum. I confess that I only read the docs not the source until now. The 
docs mention no channels with callbacks, only the blocking 
channel.receive (which isn't enough for my use case). I even thought 
that there had to be an easier way, but well...

> 
> For example, you may do: 
> 
>     # client (which manages the cluster) 
> 
>     def receivernotificationcallback((hostid, result)): 
>         # process result coming from hostid (be careful because
>         # this callback executes directly in the IO receive
>         # thread, you may want to just put things in a queue
>         # and process it in some collecting-result thread)
> 
>     channel = gw.newchannel(receivernotificationcallback) 
> 
>     for hostid in ...: 
>         gw.remote_exec(..., channel=channel)  # use preconstructed channel
>                                               # and the callback resp.
> so the other side's stuff will go the callback function directly. 
>

Ah. Everything becomes clearer now! With a callback this my toy example 
is even shorter. See below.

> If you want to have a channel for control-messages and 
> one for results you could do: 
> 
>     for hostid in ...: 
>         notify_channel = gw.newchannel(receivernotificationcallback) 
>         channel = gw.remote_exec(...) 
>         channel.send(notify_channel)  
>         
> the other side can then receive the result channel and sent 
> the result when it is ready.  This is probably a cleaner
> solution than the former. 

I didn't even know that sending channels is possible.

Thanks a lot for the explanations!

Carl Friedrich




P.S.: did you know that

285542542228279613901563566102164008326164238644702889199247456602284400390600653875954571505539843239754513915896150297878399377056071435169747221107988791198200988477531339214282772016059009904586686254989084815735422480409022344297588352526004383890632616124076317387416881148592486188361873904175783145696016919574390765598280188599035578448591077683677175520434074287726578006266759615970759521327828555662781678385691581844436444812511562428136742490459363212810180276096088111401003377570363545725120924073646921576797146199387619296560302680261790118132925012323046444438622308877924609373773012481681672424493674474488537770155783006880852648161513067144814790288366664062257274665275787127374649231096375001170901890786263324619578795731425693805073056119677580338084333381987500902968831935913095269821311141322393356490178488728982288156282600813831296143663845945431144043753821542871277745606447858564159213328443580206422714694913091762716447041689678070096773590429808909616
750452927258000843500344831628297089902728649981994387647234574276263729694848304750917174186181130688518792748622612293341368928056634384466646326572476167275660839105650528975713899320211121495795311427946254553305387067821067601768750977866100460014602138408448021225053689054793742003095722096732954750721718115531871310231057902608580607L

is a prime?


#-------------server.py---------------

import py
import sys
chunk_size = 50
source = py.code.Source(py.path.local().join("task.py").read())
nodes = ["cfbolz at linux", "jana at albert"]

class TaskDistributor(object):
     def __init__(self, nodes, chunk_size=1):
         self.i = 1
         self.nodes = nodes
         self.chunk_size = chunk_size
         self.channels = []
         for node in nodes:
             print node
             gw = py.execnet.SshGateway(node)
             channel = gw.newchannel(self.new_task)
             gw.remote_exec(source, channel=channel)
             self.channels.append(channel)
             channel.send(len(self.channels) - 1)
             channel.send(range(self.i, self.i + self.chunk_size))
             self.i += self.chunk_size

     def new_task(self, message):
         number, results = message
         channel = self.channels[number]
         channel.send("ok")
         channel.send(range(self.i, self.i + self.chunk_size))
         self.i += self.chunk_size
         for result in results:
             if result[1]:
                 print "%s = 2**%s - 1 is a prime! result from node %s" 
% (2 ** result[0] - 1, result[0], number)
         if not result[1]:
             print "2**%s - 1 is not prime" % (self.i, )

if __name__ == '__main__':
     TaskDistributor(nodes, chunk_size)
     while 1:
         pass



#-------------task.py---------------

import random

def fermattest(p, tests=30):
     if p == 1:
         return False
     for i in xrange(tests):
         a = long(random.random() * (p - 1) + 1)
         if not pow(a, p, p) == a:
             return False
     return True

def M(p):
         return 2 ** p - 1

def lucastest(p):
     curr = 4
     m = M(p)
     for i in range(p - 2):
         curr = (curr ** 2 - 2) % m
     if curr == 0:
         return True
     return False

def process_task(task_data):
     results = []
     for p in task_data:
         if not fermattest(p):
             results.append((p, False))
         else:
             results.append((p, lucastest(p)))
     return results

mynumber = channel.receive()
task_data = channel.receive()

while 1:
     result = process_task(task_data)
     channel.send((mynumber, result))
     assert channel.receive() == "ok"
     task_data = channel.receive()



More information about the py-dev mailing list