#!/usr/bin/env python
#
# This program takes something that resembles a shell script and runs
# it, spitting input (commands from the script) and output into text
# files, for use in examples.
import cStringIO
import errno
import getopt
import glob
import os
import pty
import re
import select
import shutil
import signal
import stat
import sys
import tempfile
import time
xml_subs = {
'<': '<',
'>': '>',
'&': '&',
}
def gensubs(s):
start = 0
for i, c in enumerate(s):
sub = xml_subs.get(c)
if sub:
yield s[start:i]
start = i + 1
yield sub
yield s[start:]
def xml_escape(s):
return ''.join(gensubs(s))
def maybe_unlink(name):
try:
os.unlink(name)
return True
except OSError, err:
if err.errno != errno.ENOENT:
raise
return False
def find_path_to(program):
for p in os.environ.get('PATH', os.defpath).split(os.pathsep):
name = os.path.join(p, program)
if os.access(name, os.X_OK):
return p
return None
def result_name(name):
return os.path.normpath(os.path.join('results', name.replace(os.sep, '-')))
def wopen(name):
path = os.path.dirname(name)
if path:
try:
os.makedirs(path)
except OSError, err:
if err.errno != errno.EEXIST:
raise
return open(name, 'w')
class example:
entities = dict.fromkeys(l.rstrip() for l in open('auto-snippets.xml'))
def __init__(self, name, verbose, keep_change):
self.name = os.path.normpath(name)
self.verbose = verbose
self.keep_change = keep_change
def status(self, s):
sys.stdout.write(s)
if not s.endswith('\n'):
sys.stdout.flush()
def rename_output(self, base, ignore=[]):
mangle_re = re.compile('(?:' + '|'.join(ignore) + ')')
def mangle(s):
return mangle_re.sub('', s)
def matchfp(fp1, fp2):
while True:
s1 = mangle(fp1.readline())
s2 = mangle(fp2.readline())
if cmp(s1, s2):
break
if not s1:
return True
return False
oldname = result_name(base + '.out')
tmpname = result_name(base + '.tmp')
errname = result_name(base + '.err')
errfp = open(errname, 'w+')
for line in open(tmpname):
errfp.write(mangle_re.sub('', line))
os.rename(tmpname, result_name(base + '.lxo'))
errfp.seek(0)
try:
oldfp = open(oldname)
except IOError, err:
if err.errno != errno.ENOENT:
raise
os.rename(errname, oldname)
return False
if matchfp(oldfp, errfp):
os.unlink(errname)
return False
else:
print >> sys.stderr, '\nOutput of %s has changed!' % base
if self.keep_change:
os.rename(errname, oldname)
return False
else:
os.system('diff -u %s %s 1>&2' % (oldname, errname))
return True
class static_example(example):
def run(self):
self.status('running %s\n' % self.name)
s = open(self.name).read().rstrip()
s = s.replace('&', '&').replace('<', '<').replace('>', '>')
ofp = wopen(result_name(self.name + '.tmp'))
ofp.write('')
ofp.write(s)
ofp.write('\n')
ofp.close()
self.rename_output(self.name)
norm = self.name.replace(os.sep, '-')
example.entities[
'' % (norm, norm)] = 1
class shell_example(example):
shell = '/usr/bin/env bash'
ps1 = '__run_example_ps1__ '
ps2 = '__run_example_ps2__ '
pi_re = re.compile(r'#\$\s*(name|ignore):\s*(.*)$')
timeout = 10
def __init__(self, name, verbose, keep_change):
example.__init__(self, name, verbose, keep_change)
self.poll = select.poll()
def parse(self):
'''yield each hunk of input from the file.'''
fp = open(self.name)
cfp = cStringIO.StringIO()
for line in fp:
cfp.write(line)
if not line.rstrip().endswith('\\'):
yield cfp.getvalue()
cfp.seek(0)
cfp.truncate()
def send(self, s):
if self.verbose:
print >> sys.stderr, '>', self.debugrepr(s)
while s:
count = os.write(self.cfd, s)
s = s[count:]
def debugrepr(self, s):
rs = repr(s)
limit = 60
if len(rs) > limit:
return ('%s%s ... [%d bytes]' % (rs[:limit], rs[0], len(s)))
else:
return rs
timeout = 5
def read(self, hint):
events = self.poll.poll(self.timeout * 1000)
if not events:
print >> sys.stderr, ('[%stimed out after %d seconds]' %
(hint, self.timeout))
os.kill(self.pid, signal.SIGHUP)
return ''
return os.read(self.cfd, 1024)
def receive(self, hint):
out = cStringIO.StringIO()
while True:
try:
if self.verbose:
sys.stderr.write('< ')
s = self.read(hint)
except OSError, err:
if err.errno == errno.EIO:
return '', ''
raise
if self.verbose:
print >> sys.stderr, self.debugrepr(s)
out.write(s)
s = out.getvalue()
if s.endswith(self.ps1):
return self.ps1, s.replace('\r\n', '\n')[:-len(self.ps1)]
if s.endswith(self.ps2):
return self.ps2, s.replace('\r\n', '\n')[:-len(self.ps2)]
def sendreceive(self, s, hint):
self.send(s)
ps, r = self.receive(hint)
if r.startswith(s):
r = r[len(s):]
return ps, r
def run(self):
ofp = None
basename = os.path.basename(self.name)
self.status('running %s ' % basename)
tmpdir = tempfile.mkdtemp(prefix=basename)
# remove the marker file that we tell make to use to see if
# this run succeeded
maybe_unlink(self.name + '.run')
rcfile = os.path.join(tmpdir, '.hgrc')
rcfp = wopen(rcfile)
print >> rcfp, '[ui]'
print >> rcfp, "username = Bryan O'Sullivan "
rcfile = os.path.join(tmpdir, '.bashrc')
rcfp = wopen(rcfile)
print >> rcfp, 'PS1="%s"' % self.ps1
print >> rcfp, 'PS2="%s"' % self.ps2
print >> rcfp, 'unset HISTFILE'
path = ['/usr/bin', '/bin']
hg = find_path_to('hg')
if hg and hg not in path:
path.append(hg)
def re_export(envar):
v = os.getenv(envar)
if v is not None:
print >> rcfp, 'export ' + envar + '=' + v
print >> rcfp, 'export PATH=' + ':'.join(path)
re_export('PYTHONPATH')
print >> rcfp, 'export EXAMPLE_DIR="%s"' % os.getcwd()
print >> rcfp, 'export HGMERGE=merge'
print >> rcfp, 'export LANG=C'
print >> rcfp, 'export LC_ALL=C'
print >> rcfp, 'export TZ=GMT'
print >> rcfp, 'export HGRC="%s/.hgrc"' % tmpdir
print >> rcfp, 'export HGRCPATH=$HGRC'
print >> rcfp, 'cd %s' % tmpdir
rcfp.close()
sys.stdout.flush()
sys.stderr.flush()
self.pid, self.cfd = pty.fork()
if self.pid == 0:
cmdline = ['/usr/bin/env', '-i', 'bash', '--noediting',
'--noprofile', '--norc']
try:
os.execv(cmdline[0], cmdline)
except OSError, err:
print >> sys.stderr, '%s: %s' % (cmdline[0], err.strerror)
sys.stderr.flush()
os._exit(0)
self.poll.register(self.cfd, select.POLLIN | select.POLLERR |
select.POLLHUP)
prompts = {
'': '',
self.ps1: '$',
self.ps2: '>',
}
ignore = [
r'\d+:[0-9a-f]{12}', # changeset number:hash
r'[0-9a-f]{40}', # long changeset hash
r'[0-9a-f]{12}', # short changeset hash
r'^(?:---|\+\+\+) .*', # diff header with dates
r'^date:.*', # date
#r'^diff -r.*', # "diff -r" is followed by hash
r'^# Date \d+ \d+', # hg patch header
]
err = False
read_hint = ''
try:
try:
# eat first prompt string from shell
self.read(read_hint)
# setup env and prompt
ps, output = self.sendreceive('source %s\n' % rcfile,
read_hint)
for hunk in self.parse():
# is this line a processing instruction?
m = self.pi_re.match(hunk)
if m:
pi, rest = m.groups()
if pi == 'name':
self.status('.')
out = rest
if out in ('err', 'lxo', 'out', 'run', 'tmp'):
print >> sys.stderr, ('%s: illegal section '
'name %r' %
(self.name, out))
return 1
assert os.sep not in out
if ofp is not None:
ofp.write('\n')
ofp.close()
err |= self.rename_output(ofp_basename, ignore)
if out:
ofp_basename = '%s.%s' % (self.name, out)
norm = os.path.normpath(ofp_basename)
example.entities[
''
% (norm, norm)] = 1
read_hint = ofp_basename + ' '
ofp = wopen(result_name(ofp_basename + '.tmp'))
ofp.write('')
else:
ofp = None
elif pi == 'ignore':
ignore.append(rest)
elif hunk.strip():
# it's something we should execute
newps, output = self.sendreceive(hunk, read_hint)
if not ofp:
continue
# first, print the command we ran
if not hunk.startswith('#'):
nl = hunk.endswith('\n')
hunk = ('%s %s' %
(prompts[ps],
xml_escape(hunk.rstrip('\n'))))
if nl: hunk += '\n'
ofp.write(hunk)
# then its output
ofp.write(xml_escape(output))
ps = newps
self.status('\n')
except:
print >> sys.stderr, '(killed)'
os.kill(self.pid, signal.SIGKILL)
pid, rc = os.wait()
raise
else:
try:
ps, output = self.sendreceive('exit\n', read_hint)
if ofp is not None:
ofp.write(output)
ofp.write('\n')
ofp.close()
err |= self.rename_output(ofp_basename, ignore)
os.close(self.cfd)
except IOError:
pass
os.kill(self.pid, signal.SIGTERM)
pid, rc = os.wait()
err = err or rc
if err:
if os.WIFEXITED(rc):
print >> sys.stderr, '(exit %s)' % os.WEXITSTATUS(rc)
elif os.WIFSIGNALED(rc):
print >> sys.stderr, '(signal %s)' % os.WTERMSIG(rc)
else:
wopen(result_name(self.name + '.run'))
return err
finally:
shutil.rmtree(tmpdir)
def print_help(exit, msg=None):
if msg:
print >> sys.stderr, 'Error:', msg
print >> sys.stderr, 'Usage: run-example [options] [test...]'
print >> sys.stderr, 'Options:'
print >> sys.stderr, ' -a --all run all examples in this directory'
print >> sys.stderr, ' -h --help print this help message'
print >> sys.stderr, ' --help keep new output as desired output'
print >> sys.stderr, ' -v --verbose display extra debug output'
sys.exit(exit)
def main(path='.'):
if os.path.realpath(path).split(os.sep)[-1] != 'examples':
print >> sys.stderr, 'Not being run from the examples directory!'
sys.exit(1)
opts, args = getopt.getopt(sys.argv[1:], '?ahv',
['all', 'help', 'keep', 'verbose'])
verbose = False
run_all = False
keep_change = False
for o, a in opts:
if o in ('-h', '-?', '--help'):
print_help(0)
if o in ('-a', '--all'):
run_all = True
if o in ('--keep',):
keep_change = True
if o in ('-v', '--verbose'):
verbose = True
errs = 0
if args:
for a in args:
try:
st = os.lstat(a)
except OSError, err:
print >> sys.stderr, '%s: %s' % (a, err.strerror)
errs += 1
continue
if stat.S_ISREG(st.st_mode):
if st.st_mode & 0111:
if shell_example(a, verbose, keep_change).run():
errs += 1
elif a.endswith('.lst'):
static_example(a, verbose, keep_change).run()
else:
print >> sys.stderr, '%s: not a file, or not executable' % a
errs += 1
elif run_all:
names = glob.glob("*") + glob.glob("app*/*") + glob.glob("ch*/*")
names.sort()
for name in names:
if name == 'run-example' or name.endswith('~'): continue
pathname = os.path.join(path, name)
try:
st = os.lstat(pathname)
except OSError, err:
# could be an output file that was removed while we ran
if err.errno != errno.ENOENT:
raise
continue
if stat.S_ISREG(st.st_mode):
if st.st_mode & 0111:
if shell_example(pathname, verbose, keep_change).run():
errs += 1
elif pathname.endswith('.lst'):
static_example(pathname, verbose, keep_change).run()
print >> wopen(os.path.join(path, '.run')), time.asctime()
else:
print_help(1, msg='no test names given, and --all not provided')
fp = wopen('auto-snippets.xml')
for key in sorted(example.entities.iterkeys()):
print >> fp, key
fp.close()
return errs
if __name__ == '__main__':
try:
sys.exit(main())
except KeyboardInterrupt:
print >> sys.stderr, 'interrupted!'
sys.exit(1)