Tuesday, June 30, 2009

tee functionality with python subprocess PIPEs

Sometimes we may want the output of a process to go to more then one place. In a shell, we would probably use tee(1).

One example application is dumping large databases with storage engine that can't give you a time of last modification, e.g. mysql, using innodb. Then I want the chksum of the dump, and if that is the same as the dump before, I'll unlink the older one and symlink it to the new dump. I also want to check the dump stream to make sure it has finished properly. And for good measure lets compress the stream on the fly. SURE, you could do all this after the dump completes but then you have to wait for a lot of disk I/O that the pipes avoid.

If you are like me, you'll want to use python. Note that if you take the stdout from one pipe and read it with more that one (n) other process, each process will only get a fraction of the data, ~1/n. So use a buffer to store the data and write it to each process that needs it.


__version__ = [int(x) for x in "$Revision: 1.2 $".split()[1].split('.')]
__author__ = "Kael Fischer <kael.fischer@gmail.com>"

.
.
.


syslog.syslog(syslog.LOG_ERR,"INFO: Starting mysqldump of %s on %s" % (db,host))
try:
# make output files
outFile = file(outfileName,'w')
digFile = file(digestName,'w')

# make pipes
dumper= subprocess.Popen(["mysqldump","--opt","--skip-dump-date",
"--single-transaction","--quick",db],
stdout=subprocess.PIPE)
grepper = subprocess.Popen(["grep", '-q','^-- Dump completed$'],
stdin=subprocess.PIPE,stdout=sys.stdout)
digester = subprocess.Popen(["md5", "-q"],stdin=subprocess.PIPE,stdout=digFile)
bziper = subprocess.Popen(["bzip2"],
stdin=subprocess.PIPE,stdout=outFile)

while dumper.poll() == None:
# use os.read NOT file.read
# file.read blocks.
# copy output from mysqldump to a buffer
buf=os.read(dumper.stdout.fileno(),5000)
# write buffer contents to 3 different processes
grepper.stdin.write(buf)
digester.stdin.write(buf)
bziper.stdin.write(buf)

# after dumper finishes,
# explicitly shut down input streams
# to other jobs
grepper.stdin.close()
digester.stdin.close()
bziper.stdin.close()

while grepper.poll() == None:
# wait if needed (shouldn't be)
time.sleep(1)

if grepper.returncode != 0:
raise RuntimeError("End of dump not found: grep returned - %s"%
grepper.returncode)

except BaseException, e:
# dump's no good, KeyboardInterrupt, whatever
syslog.syslog(syslog.LOG_ERR,"ERROR: %s : %s" % (type(e),e))
syslog.syslog(syslog.LOG_ERR,"ERROR: mysqldump of %s on %s failed" % (db,host))

# unsuccessful
# put files back where they were
# and exit

# exercise for reader


syslog.syslog(syslog.LOG_ERR,"INFO: mysqldump of %s on %s finished" % (db,host))

Monday, June 29, 2009

python subprocess based parallel processing

The python threading module is cool and when combined with rpyc and the Sun Grid Engine you can get a lot done really fast on a cluster. I will blog that later, but using one of the tricks I use with rpyc with the newish 'subprocess' module in the python standard library, multi-process based parallel processing seems simpler then ever now.

This is a jiffy to run a shell command on a number of hosts.



#!/usr/local/bin/python -u
#
# runAllOver.py
# Run a shell command on several machines using ssh
#
__version__ = tuple([int(x) for x in
'$Revision: 1.2 $'.split()[1].split('.')])
__author__ = "Kael Fischer"

import sys
import time
import optparse
from subprocess import Popen, PIPE


HOSTS = ["nfs1","nfs2","compute1","compute2","db1" ]

def main(sysargs):

oneLineUsage = "Usage: %prog [options] '<remote command>'"

op = optparse.OptionParser(
oneLineUsage,
version="%prog " + '.'.join([str(x) for x in __version__]))

(opts,args) = op.parse_args(sysargs)


try:
if len(args) == 0:
raise RuntimeError, "No remote command specified."
except Exception, eData:
print >> sys.stderr, ("\nUsage Error: %s\n" %eData.message)
print >> sys.stderr, op.format_help()
return 1

cmd = ' '.join(args)
print cmd

# make one running pipe object per host
pipes = [remotePipe(h,cmd) for h in HOSTS]

# report the results in turn
for i,p in enumerate(pipes):
print HOSTS[i] +':'
while p.poll() == None:
time.sleep(0.5)
print p.stdout.read()

return(0) # we did it!

def remotePipe(host,cmd,block=False):
p=Popen("ssh %s '%s'" %(host, cmd),shell=True,stdout=PIPE)
if block:
while p.poll() == None:
time.sleep(1)
return p

if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))


What's cool about that? Well all the processes are running on the hosts simultaneously and that makes it go fast.

Isn't that insecure? Could be, depending on the context. For ways to secure that kind of thing more, read this article by Brian Hatch: http://www.hackinglinuxexposed.com/articles/20021211.html. It was the basis for the intermachine communication in the PHABRIX and most especially prun was built using his authprogs as a starting concept (with greater flexibility and extra security layers added).