#!/usr/bin/env python # # This program takes something that resembles a shell script and runs # it, spitting input (commands from the script) and output into text # files, for use in examples. import cStringIO import errno import getopt import os import pty import re import select import shutil import signal import stat import sys import tempfile import time tex_subs = { '\\': '\\textbackslash{}', '{': '\\{', '}': '\\}', } def gensubs(s): start = 0 for i, c in enumerate(s): sub = tex_subs.get(c) if sub: yield s[start:i] start = i + 1 yield sub yield s[start:] def tex_escape(s): return ''.join(gensubs(s)) class example: shell = '/usr/bin/env bash' ps1 = '__run_example_ps1__ ' ps2 = '__run_example_ps2__ ' pi_re = re.compile(r'#\$\s*(name):\s*(.*)$') timeout = 5 def __init__(self, name, verbose): self.name = name self.verbose = verbose self.poll = select.poll() def parse(self): '''yield each hunk of input from the file.''' fp = open(self.name) cfp = cStringIO.StringIO() for line in fp: cfp.write(line) if not line.rstrip().endswith('\\'): yield cfp.getvalue() cfp.seek(0) cfp.truncate() def status(self, s): sys.stdout.write(s) if not s.endswith('\n'): sys.stdout.flush() def send(self, s): if self.verbose: print >> sys.stderr, '>', self.debugrepr(s) while s: count = os.write(self.cfd, s) s = s[count:] def debugrepr(self, s): rs = repr(s) limit = 60 if len(rs) > limit: return ('%s%s ... [%d bytes]' % (rs[:limit], rs[0], len(s))) else: return rs timeout = 5 def read(self): events = self.poll.poll(self.timeout * 1000) if not events: print >> sys.stderr, '[timed out after %d seconds]' % self.timeout os.kill(self.pid, signal.SIGHUP) return '' return os.read(self.cfd, 1024) def receive(self): out = cStringIO.StringIO() while True: try: if self.verbose: sys.stderr.write('< ') s = self.read() except OSError, err: if err.errno == errno.EIO: return '', '' raise if self.verbose: print >> sys.stderr, self.debugrepr(s) out.write(s) s = out.getvalue() if s.endswith(self.ps1): return self.ps1, s.replace('\r\n', '\n')[:-len(self.ps1)] if s.endswith(self.ps2): return self.ps2, s.replace('\r\n', '\n')[:-len(self.ps2)] def sendreceive(self, s): self.send(s) ps, r = self.receive() if r.startswith(s): r = r[len(s):] return ps, r def run(self): ofp = None basename = os.path.basename(self.name) self.status('running %s ' % basename) tmpdir = tempfile.mkdtemp(prefix=basename) rcfile = os.path.join(tmpdir, '.hgrc') rcfp = open(rcfile, 'w') print >> rcfp, '[ui]' print >> rcfp, "username = Bryan O'Sullivan " rcfile = os.path.join(tmpdir, '.bashrc') rcfp = open(rcfile, 'w') print >> rcfp, 'PS1="%s"' % self.ps1 print >> rcfp, 'PS2="%s"' % self.ps2 print >> rcfp, 'unset HISTFILE' print >> rcfp, 'export EXAMPLE_DIR="%s"' % os.getcwd() print >> rcfp, 'export LANG=C' print >> rcfp, 'export LC_ALL=C' print >> rcfp, 'export TZ=GMT' print >> rcfp, 'export HGRC="%s/.hgrc"' % tmpdir print >> rcfp, 'export HGRCPATH=$HGRC' print >> rcfp, 'cd %s' % tmpdir rcfp.close() sys.stdout.flush() sys.stderr.flush() self.pid, self.cfd = pty.fork() if self.pid == 0: cmdline = ['/usr/bin/env', 'bash', '--noediting', '--noprofile', '--norc'] try: os.execv(cmdline[0], cmdline) except OSError, err: print >> sys.stderr, '%s: %s' % (cmdline[0], err.strerror) sys.stderr.flush() os._exit(0) self.poll.register(self.cfd, select.POLLIN | select.POLLERR | select.POLLHUP) prompts = { '': '', self.ps1: '$', self.ps2: '>', } try: try: # eat first prompt string from shell self.read() # setup env and prompt ps, output = self.sendreceive('source %s\n' % rcfile) for hunk in self.parse(): # is this line a processing instruction? m = self.pi_re.match(hunk) if m: pi, rest = m.groups() if pi == 'name': self.status('.') out = rest assert os.sep not in out if out: ofp = open('%s.%s.out' % (self.name, out), 'w') else: ofp = None elif hunk.strip(): # it's something we should execute newps, output = self.sendreceive(hunk) if not ofp: continue # first, print the command we ran if not hunk.startswith('#'): nl = hunk.endswith('\n') hunk = ('%s \\textbf{%s}' % (prompts[ps], tex_escape(hunk.rstrip('\n')))) if nl: hunk += '\n' ofp.write(hunk) # then its output ofp.write(tex_escape(output)) ps = newps self.status('\n') open(self.name + '.run', 'w') except: print >> sys.stderr, '(killed)' os.kill(self.pid, signal.SIGKILL) pid, rc = os.wait() raise else: try: ps, output = self.sendreceive('exit\n') if ofp: ofp.write(output) os.close(self.cfd) except IOError: pass os.kill(self.pid, signal.SIGTERM) pid, rc = os.wait() if rc: if os.WIFEXITED(rc): print >> sys.stderr, '(exit %s)' % os.WEXITSTATUS(rc) elif os.WIFSIGNALED(rc): print >> sys.stderr, '(signal %s)' % os.WTERMSIG(rc) return rc finally: shutil.rmtree(tmpdir) def main(path='.'): opts, args = getopt.getopt(sys.argv[1:], 'v', ['verbose']) verbose = False for o, a in opts: if o in ('-v', '--verbose'): verbose = True errs = 0 if args: for a in args: try: st = os.lstat(a) except OSError, err: print >> sys.stderr, '%s: %s' % (a, err.strerror) errs += 1 continue if stat.S_ISREG(st.st_mode) and st.st_mode & 0111: if example(a, verbose).run(): errs += 1 else: print >> sys.stderr, '%s: not a file, or not executable' % a errs += 1 return errs for name in os.listdir(path): if name == 'run-example' or name.startswith('.'): continue if name.endswith('.out') or name.endswith('~'): continue if name.endswith('.run'): continue pathname = os.path.join(path, name) st = os.lstat(pathname) if stat.S_ISREG(st.st_mode) and st.st_mode & 0111: if example(pathname, verbose).run(): errs += 1 print >> open(os.path.join(path, '.run'), 'w'), time.asctime() return errs if __name__ == '__main__': sys.exit(main())