Monday, June 29, 2009

python subprocess based parallel processing

The python threading module is cool and when combined with rpyc and the Sun Grid Engine you can get a lot done really fast on a cluster. I will blog that later, but using one of the tricks I use with rpyc with the newish 'subprocess' module in the python standard library, multi-process based parallel processing seems simpler then ever now.

This is a jiffy to run a shell command on a number of hosts.



#!/usr/local/bin/python -u
#
# runAllOver.py
# Run a shell command on several machines using ssh
#
__version__ = tuple([int(x) for x in
'$Revision: 1.2 $'.split()[1].split('.')])
__author__ = "Kael Fischer"

import sys
import time
import optparse
from subprocess import Popen, PIPE


HOSTS = ["nfs1","nfs2","compute1","compute2","db1" ]

def main(sysargs):

oneLineUsage = "Usage: %prog [options] '<remote command>'"

op = optparse.OptionParser(
oneLineUsage,
version="%prog " + '.'.join([str(x) for x in __version__]))

(opts,args) = op.parse_args(sysargs)


try:
if len(args) == 0:
raise RuntimeError, "No remote command specified."
except Exception, eData:
print >> sys.stderr, ("\nUsage Error: %s\n" %eData.message)
print >> sys.stderr, op.format_help()
return 1

cmd = ' '.join(args)
print cmd

# make one running pipe object per host
pipes = [remotePipe(h,cmd) for h in HOSTS]

# report the results in turn
for i,p in enumerate(pipes):
print HOSTS[i] +':'
while p.poll() == None:
time.sleep(0.5)
print p.stdout.read()

return(0) # we did it!

def remotePipe(host,cmd,block=False):
p=Popen("ssh %s '%s'" %(host, cmd),shell=True,stdout=PIPE)
if block:
while p.poll() == None:
time.sleep(1)
return p

if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))


What's cool about that? Well all the processes are running on the hosts simultaneously and that makes it go fast.

Isn't that insecure? Could be, depending on the context. For ways to secure that kind of thing more, read this article by Brian Hatch: http://www.hackinglinuxexposed.com/articles/20021211.html. It was the basis for the intermachine communication in the PHABRIX and most especially prun was built using his authprogs as a starting concept (with greater flexibility and extra security layers added).

2 comments:

  1. That is cool.

    The obvious next step is to make it a service, so that way you can implement job scheduling and monitoring. Who needs SGE?

    PS. You remember how SGE would often let jobs die silently, requiring us to report and record status at the end of our BLAST jobs, etc? Happens here too, had to use a similar workaround.

    ReplyDelete
  2. Indeed, what would you think about using rpyc for that. Recall at UCSF, I was using SEG to spawn rpyc jobs and then controlling them with all from a coordinating python process. Of course this master job also made the spawned and cleaned up the SGE calls.

    A problem with the BLAST case is that is that the NCBI toolbox code does some weird buffering. Before "subprocess" I found using select on the pipes reading from BLAST jobs was unreliable. Really I think the problem with BLAST not finishing (in a non-obvious way) is a NCBI-BLAST problem.

    ReplyDelete