Tuesday, June 30, 2009

tee functionality with python subprocess PIPEs

Sometimes we may want the output of a process to go to more then one place. In a shell, we would probably use tee(1).

One example application is dumping large databases with storage engine that can't give you a time of last modification, e.g. mysql, using innodb. Then I want the chksum of the dump, and if that is the same as the dump before, I'll unlink the older one and symlink it to the new dump. I also want to check the dump stream to make sure it has finished properly. And for good measure lets compress the stream on the fly. SURE, you could do all this after the dump completes but then you have to wait for a lot of disk I/O that the pipes avoid.

If you are like me, you'll want to use python. Note that if you take the stdout from one pipe and read it with more that one (n) other process, each process will only get a fraction of the data, ~1/n. So use a buffer to store the data and write it to each process that needs it.


__version__ = [int(x) for x in "$Revision: 1.2 $".split()[1].split('.')]
__author__ = "Kael Fischer <kael.fischer@gmail.com>"

.
.
.


syslog.syslog(syslog.LOG_ERR,"INFO: Starting mysqldump of %s on %s" % (db,host))
try:
# make output files
outFile = file(outfileName,'w')
digFile = file(digestName,'w')

# make pipes
dumper= subprocess.Popen(["mysqldump","--opt","--skip-dump-date",
"--single-transaction","--quick",db],
stdout=subprocess.PIPE)
grepper = subprocess.Popen(["grep", '-q','^-- Dump completed$'],
stdin=subprocess.PIPE,stdout=sys.stdout)
digester = subprocess.Popen(["md5", "-q"],stdin=subprocess.PIPE,stdout=digFile)
bziper = subprocess.Popen(["bzip2"],
stdin=subprocess.PIPE,stdout=outFile)

while dumper.poll() == None:
# use os.read NOT file.read
# file.read blocks.
# copy output from mysqldump to a buffer
buf=os.read(dumper.stdout.fileno(),5000)
# write buffer contents to 3 different processes
grepper.stdin.write(buf)
digester.stdin.write(buf)
bziper.stdin.write(buf)

# after dumper finishes,
# explicitly shut down input streams
# to other jobs
grepper.stdin.close()
digester.stdin.close()
bziper.stdin.close()

while grepper.poll() == None:
# wait if needed (shouldn't be)
time.sleep(1)

if grepper.returncode != 0:
raise RuntimeError("End of dump not found: grep returned - %s"%
grepper.returncode)

except BaseException, e:
# dump's no good, KeyboardInterrupt, whatever
syslog.syslog(syslog.LOG_ERR,"ERROR: %s : %s" % (type(e),e))
syslog.syslog(syslog.LOG_ERR,"ERROR: mysqldump of %s on %s failed" % (db,host))

# unsuccessful
# put files back where they were
# and exit

# exercise for reader


syslog.syslog(syslog.LOG_ERR,"INFO: mysqldump of %s on %s finished" % (db,host))

No comments:

Post a Comment