igor@333: #!/usr/bin/env python igor@333: # igor@333: # This program takes something that resembles a shell script and runs igor@333: # it, spitting input (commands from the script) and output into text igor@333: # files, for use in examples. igor@333: igor@333: import cStringIO igor@333: import errno igor@333: import getopt igor@333: import os igor@333: import pty igor@333: import re igor@333: import select igor@333: import shutil igor@333: import signal igor@333: import stat igor@333: import sys igor@333: import tempfile igor@333: import time igor@333: igor@333: tex_subs = { igor@333: '\\': '\\textbackslash{}', igor@333: '{': '\\{', igor@333: '}': '\\}', igor@333: } igor@333: igor@333: def gensubs(s): igor@333: start = 0 igor@333: for i, c in enumerate(s): igor@333: sub = tex_subs.get(c) igor@333: if sub: igor@333: yield s[start:i] igor@333: start = i + 1 igor@333: yield sub igor@333: yield s[start:] igor@333: igor@333: def tex_escape(s): igor@333: return ''.join(gensubs(s)) igor@333: igor@333: def maybe_unlink(name): igor@333: try: igor@333: os.unlink(name) igor@333: return True igor@333: except OSError, err: igor@333: if err.errno != errno.ENOENT: igor@333: raise igor@333: return False igor@333: igor@333: def find_path_to(program): igor@333: for p in os.environ.get('PATH', os.defpath).split(os.pathsep): igor@333: name = os.path.join(p, program) igor@333: if os.access(name, os.X_OK): igor@333: return p igor@333: return None igor@333: igor@333: class example: igor@333: shell = '/usr/bin/env bash' igor@333: ps1 = '__run_example_ps1__ ' igor@333: ps2 = '__run_example_ps2__ ' igor@333: pi_re = re.compile(r'#\$\s*(name|ignore):\s*(.*)$') igor@333: igor@333: timeout = 10 igor@333: igor@333: def __init__(self, name, verbose): igor@333: self.name = name igor@333: self.verbose = verbose igor@333: self.poll = select.poll() igor@333: igor@333: def parse(self): igor@333: '''yield each hunk of input from the file.''' igor@333: fp = open(self.name) igor@333: cfp = cStringIO.StringIO() igor@333: for line in fp: igor@333: cfp.write(line) igor@333: if not line.rstrip().endswith('\\'): igor@333: yield cfp.getvalue() igor@333: cfp.seek(0) igor@333: cfp.truncate() igor@333: igor@333: def status(self, s): igor@333: sys.stdout.write(s) igor@333: if not s.endswith('\n'): igor@333: sys.stdout.flush() igor@333: igor@333: def send(self, s): igor@333: if self.verbose: igor@333: print >> sys.stderr, '>', self.debugrepr(s) igor@333: while s: igor@333: count = os.write(self.cfd, s) igor@333: s = s[count:] igor@333: igor@333: def debugrepr(self, s): igor@333: rs = repr(s) igor@333: limit = 60 igor@333: if len(rs) > limit: igor@333: return ('%s%s ... [%d bytes]' % (rs[:limit], rs[0], len(s))) igor@333: else: igor@333: return rs igor@333: igor@333: timeout = 5 igor@333: igor@333: def read(self, hint): igor@333: events = self.poll.poll(self.timeout * 1000) igor@333: if not events: igor@333: print >> sys.stderr, ('[%stimed out after %d seconds]' % igor@333: (hint, self.timeout)) igor@333: os.kill(self.pid, signal.SIGHUP) igor@333: return '' igor@333: return os.read(self.cfd, 1024) igor@333: igor@333: def receive(self, hint): igor@333: out = cStringIO.StringIO() igor@333: while True: igor@333: try: igor@333: if self.verbose: igor@333: sys.stderr.write('< ') igor@333: s = self.read(hint) igor@333: except OSError, err: igor@333: if err.errno == errno.EIO: igor@333: return '', '' igor@333: raise igor@333: if self.verbose: igor@333: print >> sys.stderr, self.debugrepr(s) igor@333: out.write(s) igor@333: s = out.getvalue() igor@333: if s.endswith(self.ps1): igor@333: return self.ps1, s.replace('\r\n', '\n')[:-len(self.ps1)] igor@333: if s.endswith(self.ps2): igor@333: return self.ps2, s.replace('\r\n', '\n')[:-len(self.ps2)] igor@333: igor@333: def sendreceive(self, s, hint): igor@333: self.send(s) igor@333: ps, r = self.receive(hint) igor@333: if r.startswith(s): igor@333: r = r[len(s):] igor@333: return ps, r igor@333: igor@333: def run(self): igor@333: ofp = None igor@333: basename = os.path.basename(self.name) igor@333: self.status('running %s ' % basename) igor@333: tmpdir = tempfile.mkdtemp(prefix=basename) igor@333: igor@333: # remove the marker file that we tell make to use to see if igor@333: # this run succeeded igor@333: maybe_unlink(self.name + '.run') igor@333: igor@333: rcfile = os.path.join(tmpdir, '.hgrc') igor@333: rcfp = open(rcfile, 'w') igor@333: print >> rcfp, '[ui]' igor@333: print >> rcfp, "username = Bryan O'Sullivan " igor@333: igor@333: rcfile = os.path.join(tmpdir, '.bashrc') igor@333: rcfp = open(rcfile, 'w') igor@333: print >> rcfp, 'PS1="%s"' % self.ps1 igor@333: print >> rcfp, 'PS2="%s"' % self.ps2 igor@333: print >> rcfp, 'unset HISTFILE' igor@333: path = ['/usr/bin', '/bin'] igor@333: hg = find_path_to('hg') igor@333: if hg and hg not in path: igor@333: path.append(hg) igor@333: def re_export(envar): igor@333: v = os.getenv(envar) igor@333: if v is not None: igor@333: print >> rcfp, 'export ' + envar + '=' + v igor@333: print >> rcfp, 'export PATH=' + ':'.join(path) igor@333: re_export('PYTHONPATH') igor@333: print >> rcfp, 'export EXAMPLE_DIR="%s"' % os.getcwd() igor@333: print >> rcfp, 'export HGMERGE=merge' igor@333: print >> rcfp, 'export LANG=C' igor@333: print >> rcfp, 'export LC_ALL=C' igor@333: print >> rcfp, 'export TZ=GMT' igor@333: print >> rcfp, 'export HGRC="%s/.hgrc"' % tmpdir igor@333: print >> rcfp, 'export HGRCPATH=$HGRC' igor@333: print >> rcfp, 'cd %s' % tmpdir igor@333: rcfp.close() igor@333: sys.stdout.flush() igor@333: sys.stderr.flush() igor@333: self.pid, self.cfd = pty.fork() igor@333: if self.pid == 0: igor@333: cmdline = ['/usr/bin/env', '-i', 'bash', '--noediting', igor@333: '--noprofile', '--norc'] igor@333: try: igor@333: os.execv(cmdline[0], cmdline) igor@333: except OSError, err: igor@333: print >> sys.stderr, '%s: %s' % (cmdline[0], err.strerror) igor@333: sys.stderr.flush() igor@333: os._exit(0) igor@333: self.poll.register(self.cfd, select.POLLIN | select.POLLERR | igor@333: select.POLLHUP) igor@333: igor@333: prompts = { igor@333: '': '', igor@333: self.ps1: '$', igor@333: self.ps2: '>', igor@333: } igor@333: igor@333: ignore = [ igor@333: r'\d+:[0-9a-f]{12}', # changeset number:hash igor@333: r'[0-9a-f]{40}', # long changeset hash igor@333: r'[0-9a-f]{12}', # short changeset hash igor@333: r'^(?:---|\+\+\+) .*', # diff header with dates igor@333: r'^date:.*', # date igor@333: #r'^diff -r.*', # "diff -r" is followed by hash igor@333: r'^# Date \d+ \d+', # hg patch header igor@333: ] igor@333: igor@333: err = False igor@333: read_hint = '' igor@333: igor@333: try: igor@333: try: igor@333: # eat first prompt string from shell igor@333: self.read(read_hint) igor@333: # setup env and prompt igor@333: ps, output = self.sendreceive('source %s\n' % rcfile, igor@333: read_hint) igor@333: for hunk in self.parse(): igor@333: # is this line a processing instruction? igor@333: m = self.pi_re.match(hunk) igor@333: if m: igor@333: pi, rest = m.groups() igor@333: if pi == 'name': igor@333: self.status('.') igor@333: out = rest igor@333: if out in ('err', 'lxo', 'out', 'run', 'tmp'): igor@333: print >> sys.stderr, ('%s: illegal section ' igor@333: 'name %r' % igor@333: (self.name, out)) igor@333: return 1 igor@333: assert os.sep not in out igor@333: if ofp is not None: igor@333: ofp.close() igor@333: err |= self.rename_output(ofp_basename, ignore) igor@333: if out: igor@333: ofp_basename = '%s.%s' % (self.name, out) igor@333: read_hint = ofp_basename + ' ' igor@333: ofp = open(ofp_basename + '.tmp', 'w') igor@333: else: igor@333: ofp = None igor@333: elif pi == 'ignore': igor@333: ignore.append(rest) igor@333: elif hunk.strip(): igor@333: # it's something we should execute igor@333: newps, output = self.sendreceive(hunk, read_hint) igor@333: if not ofp: igor@333: continue igor@333: # first, print the command we ran igor@333: if not hunk.startswith('#'): igor@333: nl = hunk.endswith('\n') igor@333: hunk = ('%s \\textbf{%s}' % igor@333: (prompts[ps], igor@333: tex_escape(hunk.rstrip('\n')))) igor@333: if nl: hunk += '\n' igor@333: ofp.write(hunk) igor@333: # then its output igor@333: ofp.write(tex_escape(output)) igor@333: ps = newps igor@333: self.status('\n') igor@333: except: igor@333: print >> sys.stderr, '(killed)' igor@333: os.kill(self.pid, signal.SIGKILL) igor@333: pid, rc = os.wait() igor@333: raise igor@333: else: igor@333: try: igor@333: ps, output = self.sendreceive('exit\n', read_hint) igor@333: if ofp is not None: igor@333: ofp.write(output) igor@333: ofp.close() igor@333: err |= self.rename_output(ofp_basename, ignore) igor@333: os.close(self.cfd) igor@333: except IOError: igor@333: pass igor@333: os.kill(self.pid, signal.SIGTERM) igor@333: pid, rc = os.wait() igor@333: err = err or rc igor@333: if err: igor@333: if os.WIFEXITED(rc): igor@333: print >> sys.stderr, '(exit %s)' % os.WEXITSTATUS(rc) igor@333: elif os.WIFSIGNALED(rc): igor@333: print >> sys.stderr, '(signal %s)' % os.WTERMSIG(rc) igor@333: else: igor@333: open(self.name + '.run', 'w') igor@333: return err igor@333: finally: igor@333: shutil.rmtree(tmpdir) igor@333: igor@333: def rename_output(self, base, ignore): igor@333: mangle_re = re.compile('(?:' + '|'.join(ignore) + ')') igor@333: def mangle(s): igor@333: return mangle_re.sub('', s) igor@333: def matchfp(fp1, fp2): igor@333: while True: igor@333: s1 = mangle(fp1.readline()) igor@333: s2 = mangle(fp2.readline()) igor@333: if cmp(s1, s2): igor@333: break igor@333: if not s1: igor@333: return True igor@333: return False igor@333: igor@333: oldname = base + '.out' igor@333: tmpname = base + '.tmp' igor@333: errname = base + '.err' igor@333: errfp = open(errname, 'w+') igor@333: for line in open(tmpname): igor@333: errfp.write(mangle_re.sub('', line)) igor@333: os.rename(tmpname, base + '.lxo') igor@333: errfp.seek(0) igor@333: try: igor@333: oldfp = open(oldname) igor@333: except IOError, err: igor@333: if err.errno != errno.ENOENT: igor@333: raise igor@333: os.rename(errname, oldname) igor@333: return False igor@333: if matchfp(oldfp, errfp): igor@333: os.unlink(errname) igor@333: return False igor@333: else: igor@333: print >> sys.stderr, '\nOutput of %s has changed!' % base igor@333: os.system('diff -u %s %s 1>&2' % (oldname, errname)) igor@333: return True igor@333: igor@333: def print_help(exit, msg=None): igor@333: if msg: igor@333: print >> sys.stderr, 'Error:', msg igor@333: print >> sys.stderr, 'Usage: run-example [options] [test...]' igor@333: print >> sys.stderr, 'Options:' igor@333: print >> sys.stderr, ' -a --all run all tests in this directory' igor@333: print >> sys.stderr, ' -h --help print this help message' igor@333: print >> sys.stderr, ' -v --verbose display extra debug output' igor@333: sys.exit(exit) igor@333: igor@333: def main(path='.'): igor@333: opts, args = getopt.getopt(sys.argv[1:], '?ahv', igor@333: ['all', 'help', 'verbose']) igor@333: verbose = False igor@333: run_all = False igor@333: for o, a in opts: igor@333: if o in ('-h', '-?', '--help'): igor@333: print_help(0) igor@333: if o in ('-a', '--all'): igor@333: run_all = True igor@333: if o in ('-v', '--verbose'): igor@333: verbose = True igor@333: errs = 0 igor@333: if args: igor@333: for a in args: igor@333: try: igor@333: st = os.lstat(a) igor@333: except OSError, err: igor@333: print >> sys.stderr, '%s: %s' % (a, err.strerror) igor@333: errs += 1 igor@333: continue igor@333: if stat.S_ISREG(st.st_mode) and st.st_mode & 0111: igor@333: if example(a, verbose).run(): igor@333: errs += 1 igor@333: else: igor@333: print >> sys.stderr, '%s: not a file, or not executable' % a igor@333: errs += 1 igor@333: elif run_all: igor@333: names = os.listdir(path) igor@333: names.sort() igor@333: for name in names: igor@333: if name == 'run-example' or name.startswith('.'): continue igor@333: if name.endswith('.out') or name.endswith('~'): continue igor@333: if name.endswith('.run'): continue igor@333: pathname = os.path.join(path, name) igor@333: try: igor@333: st = os.lstat(pathname) igor@333: except OSError, err: igor@333: # could be an output file that was removed while we ran igor@333: if err.errno != errno.ENOENT: igor@333: raise igor@333: continue igor@333: if stat.S_ISREG(st.st_mode) and st.st_mode & 0111: igor@333: if example(pathname, verbose).run(): igor@333: errs += 1 igor@333: print >> open(os.path.join(path, '.run'), 'w'), time.asctime() igor@333: else: igor@333: print_help(1, msg='no test names given, and --all not provided') igor@333: return errs igor@333: igor@333: if __name__ == '__main__': igor@333: try: igor@333: sys.exit(main()) igor@333: except KeyboardInterrupt: igor@333: print >> sys.stderr, 'interrupted!' igor@333: sys.exit(1)