Distributed remote tasks

14 June 2008 10:54

Following on from an earlier post investigating potential ways of distributing tasks across multiple hosts using py.execnet, here is a simple working example. It requires py.execnet's socketserver.py script to be running on the host/ports specified.

hosts = [('192.168.116.128',8888), ('127.0.0.1',8888)]

remoteCode = """
from time import sleep
from subprocess import Popen, PIPE
task = Popen('sleep 3', shell=True, stdout=PIPE)
print "Task started"
while task.poll() is None:
     channel.send(None)
     sleep(0.1)
channel.send('Data')
"""

from threading import Thread
from Queue import Queue
from time import sleep
from random import random

from py.execnet import SocketGateway

class Host:
    def __init__(self, address, port):
        self.address = address
        self.busy = False
        self.gateway = SocketGateway(address, port)

class DistributedTaskDispatcher(Thread):
    def __init__(self, hosts, task):
        self.task = task
        self.hosts = [Host(*h) for h in hosts]
        self.jobs = Queue()
        self.die = False
        Thread.__init__(self)
        self.start()

    def run(self):
        while not self.die:
            for h in self.hosts:
                if not h.busy:
                    if not self.jobs.empty():
                        print "Sending task %s to %s" % (self.jobs.get(), h.address)
                        h.busy = True
                        h.channel = h.gateway.remote_exec(self.task)
                else:
                    data = h.channel.receive()
                    if data is not None:
                        h.busy = False
                        h.channel.close()
                        print "Received data from",h.address,data
            sleep(0.1)

d = DistributedTaskDispatcher(hosts, remoteCode)

for i in range(10):
    sleep(random() * 2)
    print "Adding task %s to queue" % i
    d.jobs.put(i)

sleep(20)
d.die = True

To make the servers start listening again after the socket connection is closed, you will need to set this option to true in socketserver.py.

Serious health warning: by starting the socketserver script on your machine, you are allowing anyone who has access to that port to do anything they like. Be careful! Connecting over SSH with py.execnet.SshGateway is a much better option for many tasks.

Leave a comment