#!/usr/bin/env python # # This program takes something that resembles a shell script and runs # it, spitting input (commands from the script) and output into text # files, for use in examples. import cStringIO import errno import getopt import os import pty import re import select import shutil import signal import stat import sys import tempfile import time tex_subs = { '\\': '\\textbackslash{}', '{': '\\{', '}': '\\}', } def gensubs(s): start = 0 for i, c in enumerate(s): sub = tex_subs.get(c) if sub: yield s[start:i] start = i + 1 yield sub yield s[start:] def tex_escape(s): return ''.join(gensubs(s)) def maybe_unlink(name): try: os.unlink(name) return True except OSError, err: if err.errno != errno.ENOENT: raise return False def find_path_to(program): for p in os.environ.get('PATH', os.defpath).split(os.pathsep): name = os.path.join(p, program) if os.access(name, os.X_OK): return p return None class example: shell = '/usr/bin/env bash' ps1 = '__run_example_ps1__ ' ps2 = '__run_example_ps2__ ' pi_re = re.compile(r'#\$\s*(name|ignore):\s*(.*)$') timeout = 10 def __init__(self, name, verbose): self.name = name self.verbose = verbose self.poll = select.poll() def parse(self): '''yield each hunk of input from the file.''' fp = open(self.name) cfp = cStringIO.StringIO() for line in fp: cfp.write(line) if not line.rstrip().endswith('\\'): yield cfp.getvalue() cfp.seek(0) cfp.truncate() def status(self, s): sys.stdout.write(s) if not s.endswith('\n'): sys.stdout.flush() def send(self, s): if self.verbose: print >> sys.stderr, '>', self.debugrepr(s) while s: count = os.write(self.cfd, s) s = s[count:] def debugrepr(self, s): rs = repr(s) limit = 60 if len(rs) > limit: return ('%s%s ... [%d bytes]' % (rs[:limit], rs[0], len(s))) else: return rs timeout = 5 def read(self, hint): events = self.poll.poll(self.timeout * 1000) if not events: print >> sys.stderr, ('[%stimed out after %d seconds]' % (hint, self.timeout)) os.kill(self.pid, signal.SIGHUP) return '' return os.read(self.cfd, 1024) def receive(self, hint): out = cStringIO.StringIO() while True: try: if self.verbose: sys.stderr.write('< ') s = self.read(hint) except OSError, err: if err.errno == errno.EIO: return '', '' raise if self.verbose: print >> sys.stderr, self.debugrepr(s) out.write(s) s = out.getvalue() if s.endswith(self.ps1): return self.ps1, s.replace('\r\n', '\n')[:-len(self.ps1)] if s.endswith(self.ps2): return self.ps2, s.replace('\r\n', '\n')[:-len(self.ps2)] def sendreceive(self, s, hint): self.send(s) ps, r = self.receive(hint) if r.startswith(s): r = r[len(s):] return ps, r def run(self): ofp = None basename = os.path.basename(self.name) self.status('running %s ' % basename) tmpdir = tempfile.mkdtemp(prefix=basename) # remove the marker file that we tell make to use to see if # this run succeeded maybe_unlink(self.name + '.run') rcfile = os.path.join(tmpdir, '.hgrc') rcfp = open(rcfile, 'w') print >> rcfp, '[ui]' print >> rcfp, "username = Bryan O'Sullivan " rcfile = os.path.join(tmpdir, '.bashrc') rcfp = open(rcfile, 'w') print >> rcfp, 'PS1="%s"' % self.ps1 print >> rcfp, 'PS2="%s"' % self.ps2 print >> rcfp, 'unset HISTFILE' path = ['/usr/bin', '/bin'] hg = find_path_to('hg') if hg and hg not in path: path.append(hg) def re_export(envar): v = os.getenv(envar) if v is not None: print >> rcfp, 'export ' + envar + '=' + v print >> rcfp, 'export PATH=' + ':'.join(path) re_export('PYTHONPATH') print >> rcfp, 'export EXAMPLE_DIR="%s"' % os.getcwd() print >> rcfp, 'export HGMERGE=merge' print >> rcfp, 'export LANG=C' print >> rcfp, 'export LC_ALL=C' print >> rcfp, 'export TZ=GMT' print >> rcfp, 'export HGRC="%s/.hgrc"' % tmpdir print >> rcfp, 'export HGRCPATH=$HGRC' print >> rcfp, 'cd %s' % tmpdir rcfp.close() sys.stdout.flush() sys.stderr.flush() self.pid, self.cfd = pty.fork() if self.pid == 0: cmdline = ['/usr/bin/env', '-i', 'bash', '--noediting', '--noprofile', '--norc'] try: os.execv(cmdline[0], cmdline) except OSError, err: print >> sys.stderr, '%s: %s' % (cmdline[0], err.strerror) sys.stderr.flush() os._exit(0) self.poll.register(self.cfd, select.POLLIN | select.POLLERR | select.POLLHUP) prompts = { '': '', self.ps1: '$', self.ps2: '>', } ignore = [ r'\d+:[0-9a-f]{12}', # changeset number:hash r'[0-9a-f]{40}', # long changeset hash r'[0-9a-f]{12}', # short changeset hash r'^(?:---|\+\+\+) .*', # diff header with dates r'^date:.*', # date #r'^diff -r.*', # "diff -r" is followed by hash r'^# Date \d+ \d+', # hg patch header ] err = False read_hint = '' try: try: # eat first prompt string from shell self.read(read_hint) # setup env and prompt ps, output = self.sendreceive('source %s\n' % rcfile, read_hint) for hunk in self.parse(): # is this line a processing instruction? m = self.pi_re.match(hunk) if m: pi, rest = m.groups() if pi == 'name': self.status('.') out = rest if out in ('err', 'lxo', 'out', 'run', 'tmp'): print >> sys.stderr, ('%s: illegal section ' 'name %r' % (self.name, out)) return 1 assert os.sep not in out if ofp is not None: ofp.close() err |= self.rename_output(ofp_basename, ignore) if out: ofp_basename = '%s.%s' % (self.name, out) read_hint = ofp_basename + ' ' ofp = open(ofp_basename + '.tmp', 'w') else: ofp = None elif pi == 'ignore': ignore.append(rest) elif hunk.strip(): # it's something we should execute newps, output = self.sendreceive(hunk, read_hint) if not ofp: continue # first, print the command we ran if not hunk.startswith('#'): nl = hunk.endswith('\n') hunk = ('%s \\textbf{%s}' % (prompts[ps], tex_escape(hunk.rstrip('\n')))) if nl: hunk += '\n' ofp.write(hunk) # then its output ofp.write(tex_escape(output)) ps = newps self.status('\n') except: print >> sys.stderr, '(killed)' os.kill(self.pid, signal.SIGKILL) pid, rc = os.wait() raise else: try: ps, output = self.sendreceive('exit\n', read_hint) if ofp is not None: ofp.write(output) ofp.close() err |= self.rename_output(ofp_basename, ignore) os.close(self.cfd) except IOError: pass os.kill(self.pid, signal.SIGTERM) pid, rc = os.wait() err = err or rc if err: if os.WIFEXITED(rc): print >> sys.stderr, '(exit %s)' % os.WEXITSTATUS(rc) elif os.WIFSIGNALED(rc): print >> sys.stderr, '(signal %s)' % os.WTERMSIG(rc) else: open(self.name + '.run', 'w') return err finally: shutil.rmtree(tmpdir) def rename_output(self, base, ignore): mangle_re = re.compile('(?:' + '|'.join(ignore) + ')') def mangle(s): return mangle_re.sub('', s) def matchfp(fp1, fp2): while True: s1 = mangle(fp1.readline()) s2 = mangle(fp2.readline()) if cmp(s1, s2): break if not s1: return True return False oldname = base + '.out' tmpname = base + '.tmp' errname = base + '.err' errfp = open(errname, 'w+') for line in open(tmpname): errfp.write(mangle_re.sub('', line)) os.rename(tmpname, base + '.lxo') errfp.seek(0) try: oldfp = open(oldname) except IOError, err: if err.errno != errno.ENOENT: raise os.rename(errname, oldname) return False if matchfp(oldfp, errfp): os.unlink(errname) return False else: print >> sys.stderr, '\nOutput of %s has changed!' % base os.system('diff -u %s %s 1>&2' % (oldname, errname)) return True def print_help(exit, msg=None): if msg: print >> sys.stderr, 'Error:', msg print >> sys.stderr, 'Usage: run-example [options] [test...]' print >> sys.stderr, 'Options:' print >> sys.stderr, ' -a --all run all tests in this directory' print >> sys.stderr, ' -h --help print this help message' print >> sys.stderr, ' -v --verbose display extra debug output' sys.exit(exit) def main(path='.'): opts, args = getopt.getopt(sys.argv[1:], '?ahv', ['all', 'help', 'verbose']) verbose = False run_all = False for o, a in opts: if o in ('-h', '-?', '--help'): print_help(0) if o in ('-a', '--all'): run_all = True if o in ('-v', '--verbose'): verbose = True errs = 0 if args: for a in args: try: st = os.lstat(a) except OSError, err: print >> sys.stderr, '%s: %s' % (a, err.strerror) errs += 1 continue if stat.S_ISREG(st.st_mode) and st.st_mode & 0111: if example(a, verbose).run(): errs += 1 else: print >> sys.stderr, '%s: not a file, or not executable' % a errs += 1 elif run_all: names = os.listdir(path) names.sort() for name in names: if name == 'run-example' or name.startswith('.'): continue if name.endswith('.out') or name.endswith('~'): continue if name.endswith('.run'): continue pathname = os.path.join(path, name) try: st = os.lstat(pathname) except OSError, err: # could be an output file that was removed while we ran if err.errno != errno.ENOENT: raise continue if stat.S_ISREG(st.st_mode) and st.st_mode & 0111: if example(pathname, verbose).run(): errs += 1 print >> open(os.path.join(path, '.run'), 'w'), time.asctime() else: print_help(1, msg='no test names given, and --all not provided') return errs if __name__ == '__main__': try: sys.exit(main()) except KeyboardInterrupt: print >> sys.stderr, 'interrupted!' sys.exit(1)