Distributed remote tasks

14 June 2008 10:54

Following on from an earlier post investigating potential ways of distributing tasks across multiple hosts using py.execnet, here is a simple working example. It requires py.execnet's socketserver.py script to be running on the host/ports specified.

hosts = [('',8888), ('',8888)]

remoteCode = """
from time import sleep
from subprocess import Popen, PIPE
task = Popen('sleep 3', shell=True, stdout=PIPE)
print "Task started"
while task.poll() is None:

from threading import Thread
from Queue import Queue
from time import sleep
from random import random

from py.execnet import SocketGateway

class Host:
    def __init__(self, address, port):
        self.address = address
        self.busy = False
        self.gateway = SocketGateway(address, port)

class DistributedTaskDispatcher(Thread):
    def __init__(self, hosts, task):
        self.task = task
        self.hosts = [Host(*h) for h in hosts]
        self.jobs = Queue()
        self.die = False

    def run(self):
        while not self.die:
            for h in self.hosts:
                if not h.busy:
                    if not self.jobs.empty():
                        print "Sending task %s to %s" % (self.jobs.get(), h.address)
                        h.busy = True
                        h.channel = h.gateway.remote_exec(self.task)
                    data = h.channel.receive()
                    if data is not None:
                        h.busy = False
                        print "Received data from",h.address,data

d = DistributedTaskDispatcher(hosts, remoteCode)

for i in range(10):
    sleep(random() * 2)
    print "Adding task %s to queue" % i

d.die = True

To make the servers start listening again after the socket connection is closed, you will need to set this option to true in socketserver.py.

Serious health warning: by starting the socketserver script on your machine, you are allowing anyone who has access to that port to do anything they like. Be careful! Connecting over SSH with py.execnet.SshGateway is a much better option for many tasks.

Leave a comment