Distributed remote tasks
14 June 2008 10:54Following on from an earlier post investigating potential ways of distributing tasks across multiple hosts using py.execnet, here is a simple working example. It requires py.execnet's socketserver.py script to be running on the host/ports specified.
hosts = [('192.168.116.128',8888), ('127.0.0.1',8888)]
remoteCode = """
from time import sleep
from subprocess import Popen, PIPE
task = Popen('sleep 3', shell=True, stdout=PIPE)
print "Task started"
while task.poll() is None:
channel.send(None)
sleep(0.1)
channel.send('Data')
"""
from threading import Thread
from Queue import Queue
from time import sleep
from random import random
from py.execnet import SocketGateway
class Host:
def __init__(self, address, port):
self.address = address
self.busy = False
self.gateway = SocketGateway(address, port)
class DistributedTaskDispatcher(Thread):
def __init__(self, hosts, task):
self.task = task
self.hosts = [Host(*h) for h in hosts]
self.jobs = Queue()
self.die = False
Thread.__init__(self)
self.start()
def run(self):
while not self.die:
for h in self.hosts:
if not h.busy:
if not self.jobs.empty():
print "Sending task %s to %s" % (self.jobs.get(), h.address)
h.busy = True
h.channel = h.gateway.remote_exec(self.task)
else:
data = h.channel.receive()
if data is not None:
h.busy = False
h.channel.close()
print "Received data from",h.address,data
sleep(0.1)
d = DistributedTaskDispatcher(hosts, remoteCode)
for i in range(10):
sleep(random() * 2)
print "Adding task %s to queue" % i
d.jobs.put(i)
sleep(20)
d.die = True
To make the servers start listening again after the socket connection is closed, you will need to set this option to true in socketserver.py.
Serious health warning: by starting the socketserver script on your machine, you are allowing anyone who has access to that port to do anything they like. Be careful! Connecting over SSH with py.execnet.SshGateway is a much better option for many tasks.
Leave a comment