foozy@708: #!/usr/bin/env python foozy@708: # foozy@708: # This program takes something that resembles a shell script and runs foozy@708: # it, spitting input (commands from the script) and output into text foozy@708: # files, for use in examples. foozy@708: foozy@708: import cStringIO foozy@708: import errno foozy@708: import getopt foozy@708: import os foozy@708: import pty foozy@708: import re foozy@708: import select foozy@708: import shutil foozy@708: import signal foozy@708: import stat foozy@708: import sys foozy@708: import tempfile foozy@708: import time foozy@708: foozy@708: tex_subs = { foozy@708: '\\': '\\textbackslash{}', foozy@708: '{': '\\{', foozy@708: '}': '\\}', foozy@708: } foozy@708: foozy@708: def gensubs(s): foozy@708: start = 0 foozy@708: for i, c in enumerate(s): foozy@708: sub = tex_subs.get(c) foozy@708: if sub: foozy@708: yield s[start:i] foozy@708: start = i + 1 foozy@708: yield sub foozy@708: yield s[start:] foozy@708: foozy@708: def tex_escape(s): foozy@708: return ''.join(gensubs(s)) foozy@708: foozy@708: def maybe_unlink(name): foozy@708: try: foozy@708: os.unlink(name) foozy@708: return True foozy@708: except OSError, err: foozy@708: if err.errno != errno.ENOENT: foozy@708: raise foozy@708: return False foozy@708: foozy@708: def find_path_to(program): foozy@708: for p in os.environ.get('PATH', os.defpath).split(os.pathsep): foozy@708: name = os.path.join(p, program) foozy@708: if os.access(name, os.X_OK): foozy@708: return p foozy@708: return None foozy@708: foozy@708: class example: foozy@708: shell = '/usr/bin/env bash' foozy@708: ps1 = '__run_example_ps1__ ' foozy@708: ps2 = '__run_example_ps2__ ' foozy@708: pi_re = re.compile(r'#\$\s*(name|ignore):\s*(.*)$') foozy@708: foozy@708: timeout = 10 foozy@708: foozy@708: def __init__(self, name, verbose): foozy@708: self.name = name foozy@708: self.verbose = verbose foozy@708: self.poll = select.poll() foozy@708: foozy@708: def parse(self): foozy@708: '''yield each hunk of input from the file.''' foozy@708: fp = open(self.name) foozy@708: cfp = cStringIO.StringIO() foozy@708: for line in fp: foozy@708: cfp.write(line) foozy@708: if not line.rstrip().endswith('\\'): foozy@708: yield cfp.getvalue() foozy@708: cfp.seek(0) foozy@708: cfp.truncate() foozy@708: foozy@708: def status(self, s): foozy@708: sys.stdout.write(s) foozy@708: if not s.endswith('\n'): foozy@708: sys.stdout.flush() foozy@708: foozy@708: def send(self, s): foozy@708: if self.verbose: foozy@708: print >> sys.stderr, '>', self.debugrepr(s) foozy@708: while s: foozy@708: count = os.write(self.cfd, s) foozy@708: s = s[count:] foozy@708: foozy@708: def debugrepr(self, s): foozy@708: rs = repr(s) foozy@708: limit = 60 foozy@708: if len(rs) > limit: foozy@708: return ('%s%s ... [%d bytes]' % (rs[:limit], rs[0], len(s))) foozy@708: else: foozy@708: return rs foozy@708: foozy@708: timeout = 5 foozy@708: foozy@708: def read(self, hint): foozy@708: events = self.poll.poll(self.timeout * 1000) foozy@708: if not events: foozy@708: print >> sys.stderr, ('[%stimed out after %d seconds]' % foozy@708: (hint, self.timeout)) foozy@708: os.kill(self.pid, signal.SIGHUP) foozy@708: return '' foozy@708: return os.read(self.cfd, 1024) foozy@708: foozy@708: def receive(self, hint): foozy@708: out = cStringIO.StringIO() foozy@708: while True: foozy@708: try: foozy@708: if self.verbose: foozy@708: sys.stderr.write('< ') foozy@708: s = self.read(hint) foozy@708: except OSError, err: foozy@708: if err.errno == errno.EIO: foozy@708: return '', '' foozy@708: raise foozy@708: if self.verbose: foozy@708: print >> sys.stderr, self.debugrepr(s) foozy@708: out.write(s) foozy@708: s = out.getvalue() foozy@708: if s.endswith(self.ps1): foozy@708: return self.ps1, s.replace('\r\n', '\n')[:-len(self.ps1)] foozy@708: if s.endswith(self.ps2): foozy@708: return self.ps2, s.replace('\r\n', '\n')[:-len(self.ps2)] foozy@708: foozy@708: def sendreceive(self, s, hint): foozy@708: self.send(s) foozy@708: ps, r = self.receive(hint) foozy@708: if r.startswith(s): foozy@708: r = r[len(s):] foozy@708: return ps, r foozy@708: foozy@708: def run(self): foozy@708: ofp = None foozy@708: basename = os.path.basename(self.name) foozy@708: self.status('running %s ' % basename) foozy@708: tmpdir = tempfile.mkdtemp(prefix=basename) foozy@708: foozy@708: # remove the marker file that we tell make to use to see if foozy@708: # this run succeeded foozy@708: maybe_unlink(self.name + '.run') foozy@708: foozy@708: rcfile = os.path.join(tmpdir, '.hgrc') foozy@708: rcfp = open(rcfile, 'w') foozy@708: print >> rcfp, '[ui]' foozy@708: print >> rcfp, "username = Bryan O'Sullivan " foozy@708: foozy@708: rcfile = os.path.join(tmpdir, '.bashrc') foozy@708: rcfp = open(rcfile, 'w') foozy@708: print >> rcfp, 'PS1="%s"' % self.ps1 foozy@708: print >> rcfp, 'PS2="%s"' % self.ps2 foozy@708: print >> rcfp, 'unset HISTFILE' foozy@708: path = ['/usr/bin', '/bin'] foozy@708: hg = find_path_to('hg') foozy@708: if hg and hg not in path: foozy@708: path.append(hg) foozy@708: def re_export(envar): foozy@708: v = os.getenv(envar) foozy@708: if v is not None: foozy@708: print >> rcfp, 'export ' + envar + '=' + v foozy@708: print >> rcfp, 'export PATH=' + ':'.join(path) foozy@708: re_export('PYTHONPATH') foozy@708: print >> rcfp, 'export EXAMPLE_DIR="%s"' % os.getcwd() foozy@708: print >> rcfp, 'export HGMERGE=merge' foozy@708: print >> rcfp, 'export LANG=C' foozy@708: print >> rcfp, 'export LC_ALL=C' foozy@708: print >> rcfp, 'export TZ=GMT' foozy@708: print >> rcfp, 'export HGRC="%s/.hgrc"' % tmpdir foozy@708: print >> rcfp, 'export HGRCPATH=$HGRC' foozy@708: print >> rcfp, 'cd %s' % tmpdir foozy@708: rcfp.close() foozy@708: sys.stdout.flush() foozy@708: sys.stderr.flush() foozy@708: self.pid, self.cfd = pty.fork() foozy@708: if self.pid == 0: foozy@708: cmdline = ['/usr/bin/env', '-i', 'bash', '--noediting', foozy@708: '--noprofile', '--norc'] foozy@708: try: foozy@708: os.execv(cmdline[0], cmdline) foozy@708: except OSError, err: foozy@708: print >> sys.stderr, '%s: %s' % (cmdline[0], err.strerror) foozy@708: sys.stderr.flush() foozy@708: os._exit(0) foozy@708: self.poll.register(self.cfd, select.POLLIN | select.POLLERR | foozy@708: select.POLLHUP) foozy@708: foozy@708: prompts = { foozy@708: '': '', foozy@708: self.ps1: '$', foozy@708: self.ps2: '>', foozy@708: } foozy@708: foozy@708: ignore = [ foozy@708: r'\d+:[0-9a-f]{12}', # changeset number:hash foozy@708: r'[0-9a-f]{40}', # long changeset hash foozy@708: r'[0-9a-f]{12}', # short changeset hash foozy@708: r'^(?:---|\+\+\+) .*', # diff header with dates foozy@708: r'^date:.*', # date foozy@708: #r'^diff -r.*', # "diff -r" is followed by hash foozy@708: r'^# Date \d+ \d+', # hg patch header foozy@708: ] foozy@708: foozy@708: err = False foozy@708: read_hint = '' foozy@708: foozy@708: try: foozy@708: try: foozy@708: # eat first prompt string from shell foozy@708: self.read(read_hint) foozy@708: # setup env and prompt foozy@708: ps, output = self.sendreceive('source %s\n' % rcfile, foozy@708: read_hint) foozy@708: for hunk in self.parse(): foozy@708: # is this line a processing instruction? foozy@708: m = self.pi_re.match(hunk) foozy@708: if m: foozy@708: pi, rest = m.groups() foozy@708: if pi == 'name': foozy@708: self.status('.') foozy@708: out = rest foozy@708: if out in ('err', 'lxo', 'out', 'run', 'tmp'): foozy@708: print >> sys.stderr, ('%s: illegal section ' foozy@708: 'name %r' % foozy@708: (self.name, out)) foozy@708: return 1 foozy@708: assert os.sep not in out foozy@708: if ofp is not None: foozy@708: ofp.close() foozy@708: err |= self.rename_output(ofp_basename, ignore) foozy@708: if out: foozy@708: ofp_basename = '%s.%s' % (self.name, out) foozy@708: read_hint = ofp_basename + ' ' foozy@708: ofp = open(ofp_basename + '.tmp', 'w') foozy@708: else: foozy@708: ofp = None foozy@708: elif pi == 'ignore': foozy@708: ignore.append(rest) foozy@708: elif hunk.strip(): foozy@708: # it's something we should execute foozy@708: newps, output = self.sendreceive(hunk, read_hint) foozy@708: if not ofp: foozy@708: continue foozy@708: # first, print the command we ran foozy@708: if not hunk.startswith('#'): foozy@708: nl = hunk.endswith('\n') foozy@708: hunk = ('%s \\textbf{%s}' % foozy@708: (prompts[ps], foozy@708: tex_escape(hunk.rstrip('\n')))) foozy@708: if nl: hunk += '\n' foozy@708: ofp.write(hunk) foozy@708: # then its output foozy@708: ofp.write(tex_escape(output)) foozy@708: ps = newps foozy@708: self.status('\n') foozy@708: except: foozy@708: print >> sys.stderr, '(killed)' foozy@708: os.kill(self.pid, signal.SIGKILL) foozy@708: pid, rc = os.wait() foozy@708: raise foozy@708: else: foozy@708: try: foozy@708: ps, output = self.sendreceive('exit\n', read_hint) foozy@708: if ofp is not None: foozy@708: ofp.write(output) foozy@708: ofp.close() foozy@708: err |= self.rename_output(ofp_basename, ignore) foozy@708: os.close(self.cfd) foozy@708: except IOError: foozy@708: pass foozy@708: os.kill(self.pid, signal.SIGTERM) foozy@708: pid, rc = os.wait() foozy@708: err = err or rc foozy@708: if err: foozy@708: if os.WIFEXITED(rc): foozy@708: print >> sys.stderr, '(exit %s)' % os.WEXITSTATUS(rc) foozy@708: elif os.WIFSIGNALED(rc): foozy@708: print >> sys.stderr, '(signal %s)' % os.WTERMSIG(rc) foozy@708: else: foozy@708: open(self.name + '.run', 'w') foozy@708: return err foozy@708: finally: foozy@708: shutil.rmtree(tmpdir) foozy@708: foozy@708: def rename_output(self, base, ignore): foozy@708: mangle_re = re.compile('(?:' + '|'.join(ignore) + ')') foozy@708: def mangle(s): foozy@708: return mangle_re.sub('', s) foozy@708: def matchfp(fp1, fp2): foozy@708: while True: foozy@708: s1 = mangle(fp1.readline()) foozy@708: s2 = mangle(fp2.readline()) foozy@708: if cmp(s1, s2): foozy@708: break foozy@708: if not s1: foozy@708: return True foozy@708: return False foozy@708: foozy@708: oldname = base + '.out' foozy@708: tmpname = base + '.tmp' foozy@708: errname = base + '.err' foozy@708: errfp = open(errname, 'w+') foozy@708: for line in open(tmpname): foozy@708: errfp.write(mangle_re.sub('', line)) foozy@708: os.rename(tmpname, base + '.lxo') foozy@708: errfp.seek(0) foozy@708: try: foozy@708: oldfp = open(oldname) foozy@708: except IOError, err: foozy@708: if err.errno != errno.ENOENT: foozy@708: raise foozy@708: os.rename(errname, oldname) foozy@708: return False foozy@708: if matchfp(oldfp, errfp): foozy@708: os.unlink(errname) foozy@708: return False foozy@708: else: foozy@708: print >> sys.stderr, '\nOutput of %s has changed!' % base foozy@708: os.system('diff -u %s %s 1>&2' % (oldname, errname)) foozy@708: return True foozy@708: foozy@708: def print_help(exit, msg=None): foozy@708: if msg: foozy@708: print >> sys.stderr, 'Error:', msg foozy@708: print >> sys.stderr, 'Usage: run-example [options] [test...]' foozy@708: print >> sys.stderr, 'Options:' foozy@708: print >> sys.stderr, ' -a --all run all tests in this directory' foozy@708: print >> sys.stderr, ' -h --help print this help message' foozy@708: print >> sys.stderr, ' -v --verbose display extra debug output' foozy@708: sys.exit(exit) foozy@708: foozy@708: def main(path='.'): foozy@708: opts, args = getopt.getopt(sys.argv[1:], '?ahv', foozy@708: ['all', 'help', 'verbose']) foozy@708: verbose = False foozy@708: run_all = False foozy@708: for o, a in opts: foozy@708: if o in ('-h', '-?', '--help'): foozy@708: print_help(0) foozy@708: if o in ('-a', '--all'): foozy@708: run_all = True foozy@708: if o in ('-v', '--verbose'): foozy@708: verbose = True foozy@708: errs = 0 foozy@708: if args: foozy@708: for a in args: foozy@708: try: foozy@708: st = os.lstat(a) foozy@708: except OSError, err: foozy@708: print >> sys.stderr, '%s: %s' % (a, err.strerror) foozy@708: errs += 1 foozy@708: continue foozy@708: if stat.S_ISREG(st.st_mode) and st.st_mode & 0111: foozy@708: if example(a, verbose).run(): foozy@708: errs += 1 foozy@708: else: foozy@708: print >> sys.stderr, '%s: not a file, or not executable' % a foozy@708: errs += 1 foozy@708: elif run_all: foozy@708: names = os.listdir(path) foozy@708: names.sort() foozy@708: for name in names: foozy@708: if name == 'run-example' or name.startswith('.'): continue foozy@708: if name.endswith('.out') or name.endswith('~'): continue foozy@708: if name.endswith('.run'): continue foozy@708: pathname = os.path.join(path, name) foozy@708: try: foozy@708: st = os.lstat(pathname) foozy@708: except OSError, err: foozy@708: # could be an output file that was removed while we ran foozy@708: if err.errno != errno.ENOENT: foozy@708: raise foozy@708: continue foozy@708: if stat.S_ISREG(st.st_mode) and st.st_mode & 0111: foozy@708: if example(pathname, verbose).run(): foozy@708: errs += 1 foozy@708: print >> open(os.path.join(path, '.run'), 'w'), time.asctime() foozy@708: else: foozy@708: print_help(1, msg='no test names given, and --all not provided') foozy@708: return errs foozy@708: foozy@708: if __name__ == '__main__': foozy@708: try: foozy@708: sys.exit(main()) foozy@708: except KeyboardInterrupt: foozy@708: print >> sys.stderr, 'interrupted!' foozy@708: sys.exit(1)