#!/usr/bin/env python # # This program takes something that resembles a shell script and runs # it, spitting input (commands from the script) and output into text # files, for use in examples. import cStringIO import errno import getopt import os import pty import re import select import shutil import signal import stat import sys import tempfile import time tex_subs = { '\\': '\\textbackslash{}', '{': '\\{', '}': '\\}', } def gensubs(s): start = 0 for i, c in enumerate(s): sub = tex_subs.get(c) if sub: yield s[start:i] start = i + 1 yield sub yield s[start:] def tex_escape(s): return ''.join(gensubs(s)) class example: shell = '/usr/bin/env bash' prompt = '__run_example_prompt__ ' pi_re = re.compile(r'#\$\s*(name):\s*(.*)$') timeout = 5 def __init__(self, name, verbose): self.name = name self.verbose = verbose self.poll = select.poll() def parse(self): '''yield each hunk of input from the file.''' fp = open(self.name) cfp = cStringIO.StringIO() for line in fp: cfp.write(line) if not line.rstrip().endswith('\\'): yield cfp.getvalue() cfp.seek(0) cfp.truncate() def status(self, s): sys.stdout.write(s) if not s.endswith('\n'): sys.stdout.flush() def send(self, s): if self.verbose: print >> sys.stderr, '>', self.debugrepr(s) while s: count = os.write(self.cfd, s) s = s[count:] def debugrepr(self, s): rs = repr(s) limit = 60 if len(rs) > limit: return ('%s%s ... [%d bytes]' % (rs[:limit], rs[0], len(s))) else: return rs timeout = 5 def read(self): events = self.poll.poll(self.timeout * 1000) if not events: print >> sys.stderr, '[timed out after %d seconds]' % self.timeout os.kill(self.pid, signal.SIGHUP) return '' return os.read(self.cfd, 1024) def receive(self): out = cStringIO.StringIO() while True: try: if self.verbose: sys.stderr.write('< ') s = self.read() except OSError, err: if err.errno == errno.EIO: return '' raise if self.verbose: print >> sys.stderr, self.debugrepr(s) out.write(s) s = out.getvalue() if s.endswith(self.prompt): return s.replace('\r\n', '\n')[:-len(self.prompt)] def sendreceive(self, s): self.send(s) r = self.receive() if r.startswith(s): r = r[len(s):] return r def run(self): ofp = None basename = os.path.basename(self.name) self.status('running %s ' % basename) tmpdir = tempfile.mkdtemp(prefix=basename) rcfile = os.path.join(tmpdir, '.hgrc') rcfp = open(rcfile, 'w') print >> rcfp, '[ui]' print >> rcfp, "username = Bryan O'Sullivan " rcfile = os.path.join(tmpdir, '.bashrc') rcfp = open(rcfile, 'w') print >> rcfp, 'PS1="%s"' % self.prompt print >> rcfp, 'unset HISTFILE' print >> rcfp, 'export EXAMPLE_DIR="%s"' % os.getcwd() print >> rcfp, 'export LANG=C' print >> rcfp, 'export LC_ALL=C' print >> rcfp, 'export TZ=GMT' print >> rcfp, 'export HGRC="%s/.hgrc"' % tmpdir print >> rcfp, 'export HGRCPATH=$HGRC' print >> rcfp, 'cd %s' % tmpdir rcfp.close() sys.stdout.flush() sys.stderr.flush() self.pid, self.cfd = pty.fork() if self.pid == 0: cmdline = ['/usr/bin/env', 'bash', '--noediting', '--noprofile', '--norc'] try: os.execv(cmdline[0], cmdline) except OSError, err: print >> sys.stderr, '%s: %s' % (cmdline[0], err.strerror) sys.stderr.flush() os._exit(0) self.poll.register(self.cfd, select.POLLIN | select.POLLERR | select.POLLHUP) try: try: # eat first prompt string from shell self.read() # setup env and prompt self.sendreceive('source %s\n' % rcfile) for hunk in self.parse(): # is this line a processing instruction? m = self.pi_re.match(hunk) if m: pi, rest = m.groups() if pi == 'name': self.status('.') out = rest assert os.sep not in out if out: ofp = open('%s.%s.out' % (self.name, out), 'w') else: ofp = None elif hunk.strip(): # it's something we should execute output = self.sendreceive(hunk) if not ofp: continue # first, print the command we ran if not hunk.startswith('#'): nl = hunk.endswith('\n') hunk = ('$ \\textbf{%s}' % tex_escape(hunk.rstrip('\n'))) if nl: hunk += '\n' ofp.write(hunk) # then its output ofp.write(tex_escape(output)) self.status('\n') open(self.name + '.run', 'w') except: print >> sys.stderr, '(killed)' os.kill(self.pid, signal.SIGKILL) pid, rc = os.wait() raise else: try: output = self.sendreceive('exit\n') if ofp: ofp.write(output) os.close(self.cfd) except IOError: pass os.kill(self.pid, signal.SIGTERM) pid, rc = os.wait() if rc: if os.WIFEXITED(rc): print >> sys.stderr, '(exit %s)' % os.WEXITSTATUS(rc) elif os.WIFSIGNALED(rc): print >> sys.stderr, '(signal %s)' % os.WTERMSIG(rc) return rc finally: shutil.rmtree(tmpdir) def main(path='.'): opts, args = getopt.getopt(sys.argv[1:], 'v', ['verbose']) verbose = False for o, a in opts: if o in ('-v', '--verbose'): verbose = True errs = 0 if args: for a in args: try: st = os.lstat(a) except OSError, err: print >> sys.stderr, '%s: %s' % (a, err.strerror) errs += 1 continue if stat.S_ISREG(st.st_mode) and st.st_mode & 0111: if example(a, verbose).run(): errs += 1 else: print >> sys.stderr, '%s: not a file, or not executable' % a errs += 1 return errs for name in os.listdir(path): if name == 'run-example' or name.startswith('.'): continue if name.endswith('.out') or name.endswith('~'): continue if name.endswith('.run'): continue pathname = os.path.join(path, name) st = os.lstat(pathname) if stat.S_ISREG(st.st_mode) and st.st_mode & 0111: if example(pathname, verbose).run(): errs += 1 print >> open(os.path.join(path, '.run'), 'w'), time.asctime() return errs if __name__ == '__main__': sys.exit(main())