#!/usr/bin/env python # # This program takes something that resembles a shell script and runs # it, spitting input (commands from the script) and output into text # files, for use in examples. import cStringIO import errno import getopt import os import pty import re import select import shutil import signal import stat import sys import tempfile import time tex_subs = { '\\': '\\textbackslash{}', '{': '\\{', '}': '\\}', } def gensubs(s): start = 0 for i, c in enumerate(s): sub = tex_subs.get(c) if sub: yield s[start:i] start = i + 1 yield sub yield s[start:] def tex_escape(s): return ''.join(gensubs(s)) def maybe_unlink(name): try: os.unlink(name) return True except OSError, err: if err.errno != errno.ENOENT: raise return False class example: shell = '/usr/bin/env bash' ps1 = '__run_example_ps1__ ' ps2 = '__run_example_ps2__ ' pi_re = re.compile(r'#\$\s*(name|ignore):\s*(.*)$') timeout = 5 def __init__(self, name, verbose): self.name = name self.verbose = verbose self.poll = select.poll() def parse(self): '''yield each hunk of input from the file.''' fp = open(self.name) cfp = cStringIO.StringIO() for line in fp: cfp.write(line) if not line.rstrip().endswith('\\'): yield cfp.getvalue() cfp.seek(0) cfp.truncate() def status(self, s): sys.stdout.write(s) if not s.endswith('\n'): sys.stdout.flush() def send(self, s): if self.verbose: print >> sys.stderr, '>', self.debugrepr(s) while s: count = os.write(self.cfd, s) s = s[count:] def debugrepr(self, s): rs = repr(s) limit = 60 if len(rs) > limit: return ('%s%s ... [%d bytes]' % (rs[:limit], rs[0], len(s))) else: return rs timeout = 5 def read(self): events = self.poll.poll(self.timeout * 1000) if not events: print >> sys.stderr, '[timed out after %d seconds]' % self.timeout os.kill(self.pid, signal.SIGHUP) return '' return os.read(self.cfd, 1024) def receive(self): out = cStringIO.StringIO() while True: try: if self.verbose: sys.stderr.write('< ') s = self.read() except OSError, err: if err.errno == errno.EIO: return '', '' raise if self.verbose: print >> sys.stderr, self.debugrepr(s) out.write(s) s = out.getvalue() if s.endswith(self.ps1): return self.ps1, s.replace('\r\n', '\n')[:-len(self.ps1)] if s.endswith(self.ps2): return self.ps2, s.replace('\r\n', '\n')[:-len(self.ps2)] def sendreceive(self, s): self.send(s) ps, r = self.receive() if r.startswith(s): r = r[len(s):] return ps, r def run(self): ofp = None basename = os.path.basename(self.name) self.status('running %s ' % basename) tmpdir = tempfile.mkdtemp(prefix=basename) # remove the marker file that we tell make to use to see if # this run succeeded maybe_unlink(self.name + '.run') rcfile = os.path.join(tmpdir, '.hgrc') rcfp = open(rcfile, 'w') print >> rcfp, '[ui]' print >> rcfp, "username = Bryan O'Sullivan " rcfile = os.path.join(tmpdir, '.bashrc') rcfp = open(rcfile, 'w') print >> rcfp, 'PS1="%s"' % self.ps1 print >> rcfp, 'PS2="%s"' % self.ps2 print >> rcfp, 'unset HISTFILE' print >> rcfp, 'export EXAMPLE_DIR="%s"' % os.getcwd() print >> rcfp, 'export HGMERGE=merge' print >> rcfp, 'export LANG=C' print >> rcfp, 'export LC_ALL=C' print >> rcfp, 'export TZ=GMT' print >> rcfp, 'export HGRC="%s/.hgrc"' % tmpdir print >> rcfp, 'export HGRCPATH=$HGRC' print >> rcfp, 'cd %s' % tmpdir rcfp.close() sys.stdout.flush() sys.stderr.flush() self.pid, self.cfd = pty.fork() if self.pid == 0: cmdline = ['/usr/bin/env', 'bash', '--noediting', '--noprofile', '--norc'] try: os.execv(cmdline[0], cmdline) except OSError, err: print >> sys.stderr, '%s: %s' % (cmdline[0], err.strerror) sys.stderr.flush() os._exit(0) self.poll.register(self.cfd, select.POLLIN | select.POLLERR | select.POLLHUP) prompts = { '': '', self.ps1: '$', self.ps2: '>', } ignore = [ r'\d+:[0-9a-f]{12}', # changeset number:hash r'[0-9a-f]{40}', # long changeset hash r'[0-9a-f]{12}', # short changeset hash r'^(?:---|\+\+\+) .*', # diff header with dates r'^date:.*', # date #r'^diff -r.*', # "diff -r" is followed by hash r'^# Date \d+ \d+', # hg patch header ] err = False try: try: # eat first prompt string from shell self.read() # setup env and prompt ps, output = self.sendreceive('source %s\n' % rcfile) for hunk in self.parse(): # is this line a processing instruction? m = self.pi_re.match(hunk) if m: pi, rest = m.groups() if pi == 'name': self.status('.') out = rest assert os.sep not in out if ofp is not None: ofp.close() err = self.rename_output(ofp_basename, ignore) if out: ofp_basename = '%s.%s' % (self.name, out) ofp = open(ofp_basename + '.tmp', 'w') else: ofp = None elif pi == 'ignore': ignore.append(rest) elif hunk.strip(): # it's something we should execute newps, output = self.sendreceive(hunk) if not ofp: continue # first, print the command we ran if not hunk.startswith('#'): nl = hunk.endswith('\n') hunk = ('%s \\textbf{%s}' % (prompts[ps], tex_escape(hunk.rstrip('\n')))) if nl: hunk += '\n' ofp.write(hunk) # then its output ofp.write(tex_escape(output)) ps = newps self.status('\n') except: print >> sys.stderr, '(killed)' os.kill(self.pid, signal.SIGKILL) pid, rc = os.wait() raise else: try: ps, output = self.sendreceive('exit\n') if ofp is not None: ofp.write(output) ofp.close() err = self.rename_output(ofp_basename, ignore) os.close(self.cfd) except IOError: pass os.kill(self.pid, signal.SIGTERM) pid, rc = os.wait() if rc: if os.WIFEXITED(rc): print >> sys.stderr, '(exit %s)' % os.WEXITSTATUS(rc) elif os.WIFSIGNALED(rc): print >> sys.stderr, '(signal %s)' % os.WTERMSIG(rc) else: open(self.name + '.run', 'w') return rc or err finally: shutil.rmtree(tmpdir) def rename_output(self, base, ignore): mangle_re = re.compile('(?:' + '|'.join(ignore) + ')') def mangle(s): return mangle_re.sub('', s) def matchfp(fp1, fp2): while True: s1 = mangle(fp1.readline()) s2 = mangle(fp2.readline()) if cmp(s1, s2): break if not s1: return True return False oldname = base + '.out' tmpname = base + '.tmp' errname = base + '.err' errfp = open(errname, 'w+') for line in open(tmpname): errfp.write(mangle_re.sub('', line)) os.rename(tmpname, base + '.lxo') errfp.seek(0) try: oldfp = open(oldname) except IOError, err: if err.errno != errno.ENOENT: raise os.rename(errname, oldname) return if matchfp(oldfp, errfp): os.unlink(errname) else: print >> sys.stderr, '\nOutput of %s has changed!' % base os.system('diff -u %s %s 1>&2' % (oldname, errname)) return True def main(path='.'): opts, args = getopt.getopt(sys.argv[1:], 'v', ['verbose']) verbose = False for o, a in opts: if o in ('-v', '--verbose'): verbose = True errs = 0 if args: for a in args: try: st = os.lstat(a) except OSError, err: print >> sys.stderr, '%s: %s' % (a, err.strerror) errs += 1 continue if stat.S_ISREG(st.st_mode) and st.st_mode & 0111: if example(a, verbose).run(): errs += 1 else: print >> sys.stderr, '%s: not a file, or not executable' % a errs += 1 return errs for name in os.listdir(path): if name == 'run-example' or name.startswith('.'): continue if name.endswith('.out') or name.endswith('~'): continue if name.endswith('.run'): continue pathname = os.path.join(path, name) try: st = os.lstat(pathname) except OSError, err: # could be an output file that was removed while we ran if err.errno != errno.ENOENT: raise continue if stat.S_ISREG(st.st_mode) and st.st_mode & 0111: if example(pathname, verbose).run(): errs += 1 print >> open(os.path.join(path, '.run'), 'w'), time.asctime() return errs if __name__ == '__main__': sys.exit(main())