Distributed remote tasks14 June 2008 10:54
Following on from an earlier post investigating potential ways of distributing tasks across multiple hosts using py.execnet, here is a simple working example. It requires py.execnet's socketserver.py script to be running on the host/ports specified.
hosts = [('192.168.116.128',8888), ('127.0.0.1',8888)] remoteCode = """ from time import sleep from subprocess import Popen, PIPE task = Popen('sleep 3', shell=True, stdout=PIPE) print "Task started" while task.poll() is None: channel.send(None) sleep(0.1) channel.send('Data') """ from threading import Thread from Queue import Queue from time import sleep from random import random from py.execnet import SocketGateway class Host: def __init__(self, address, port): self.address = address self.busy = False self.gateway = SocketGateway(address, port) class DistributedTaskDispatcher(Thread): def __init__(self, hosts, task): self.task = task self.hosts = [Host(*h) for h in hosts] self.jobs = Queue() self.die = False Thread.__init__(self) self.start() def run(self): while not self.die: for h in self.hosts: if not h.busy: if not self.jobs.empty(): print "Sending task %s to %s" % (self.jobs.get(), h.address) h.busy = True h.channel = h.gateway.remote_exec(self.task) else: data = h.channel.receive() if data is not None: h.busy = False h.channel.close() print "Received data from",h.address,data sleep(0.1) d = DistributedTaskDispatcher(hosts, remoteCode) for i in range(10): sleep(random() * 2) print "Adding task %s to queue" % i d.jobs.put(i) sleep(20) d.die = True
To make the servers start listening again after the socket connection is closed, you will need to set this option to true in socketserver.py.
Serious health warning: by starting the socketserver script on your machine, you are allowing anyone who has access to that port to do anything they like. Be careful! Connecting over SSH with py.execnet.SshGateway is a much better option for many tasks.
Leave a comment