#!/usr/bin/env python # trac-revprop-hook # ---------------------------------------------------------------------------- # Copyright (c) 2007-2008 Emmanuel Blot # ---------------------------------------------------------------------------- # This Subversion unified pre/post-revprop-change hook script is meant to # interface to the Trac (http://www.edgewall.com/products/trac/) issue # tracking/wiki/etc system. # # It should be called from the 'pre-revprop-change' and 'post-revprop-change' # scripts in Subversion, such as via: # # REPOS="$1" # REV="$2" # AUTHOR="$3" # PROPNAME="$4" # TRAC_ENV="/somewhere/trac/project/" # # cat | trac-revprop-hook -d "$REPOS" -p "$TRAC_ENV" -a "$AUTHOR" -r "$REV" \ # -p "$PROPNAME"
|"$" # import re import os import sys from optparse import OptionParser from trac.env import open_environment from ConfigParser import ConfigParser from repproxy import RepositoryProxy OK = 0 ERROR = 1 configpath = '/local/var/svn/config/access.conf' changeset_cmd_pattern = r'^(?P delivers|brings)(?P \!)?\s+((\[(?P \d+)\])(:\[(?P \d+)\])?)?' class RevpropHook(object): '''Hook script for revision property changes''' def __init__(self, post, project, rev, name, value, action=None, author=None, repos=None): if not os.path.isdir(os.environ['PYTHON_EGG_CACHE']): raise AssertionError("Invalid egg cache directory: %s" % os.environ['PYTHON_EGG_CACHE']); self.env = open_environment(project) self.repospath = self.env.config.get('trac', 'repository_dir') self.rev = int(rev) self.name = name self.value = value self.author = author self.action = action # sanity check: hook script repository match Trac repository if repos and self.repospath != repos: print >> sys.stderr, 'Invalid/incoherent repository %s %s' % \ (self.repospath, repos) sys.exit(-ERROR) (type, prop) = name.split(':') if post: if type == 'svn': # on post-revprop-change, update Trac w/ SVN properties # custom properties are not cached anyway ;-( self._update_trac() sys.exit(0) if type == 'svn': if prop == 'log': self._verify_log_msg() sys.exit(0) elif type == 'rth': try: func = getattr(self, '_verify_rth_%s' % prop) # if the property is a valid one and the request is about deletion if func and self.action == 'D': sys.exit(OK) func() and sys.exit(OK) print >> sys.stderr, 'Invalid value for property %s: %s' % \ (self.name, self.value) sys.exit(-ERROR) except AttributeError: # unexpected property, will be catched later pass print >> sys.stderr, 'This property (%s) cannot be modified' % name sys.exit(-ERROR) def _verify_log_msg(self): regex = re.compile(changeset_cmd_pattern, re.IGNORECASE) self.proxy = RepositoryProxy(self.repospath) oldlog = self.proxy.get_revision_log_message(self.rev) oldmo = regex.search(oldlog) if oldmo: newmo = regex.search(self.value) if (not newmo): print >> sys.stderr, \ 'Missing message:\n was: "%s"' % oldlog.split('\n')[0] sys.exit(-ERROR) if (oldmo.group('first') != newmo.group('first')) or \ (oldmo.group('second') != newmo.group('second')): if not self._is_admin(self.author) or not newmo.group('force'): print >> sys.stderr, \ 'Original parameters should be kept unaltered:\n' \ ' "%s"' % oldlog.split('\n')[0] sys.exit(-ERROR) def _verify_rth_deliver(self): # check that source revision are ordered integers try: prev = 1 for src in self.value.split(','): rev = int(src) if rev < prev: return None prev = rev return True except ValueError: return None def _verify_rth_bring(self): # check that source revision are ordered integers try: prev = 1 for src in self.value.split(','): rev = int(src) if rev < prev: return None prev = rev return True except ValueError: return None def _update_trac(self): # update Trac cache with the updated value repos = self.env.get_repository() repos.sync_changeset(self.rev) def _is_admin(self, author): ''' Verify whether the author has administrator priviledges ''' config = ConfigParser() if not os.path.isfile(configpath): raise AssertionError('Unable to find Subversion ACL for admins') config.read(configpath) admins = config.get('groups','admins') if not admins: raise AssertionError('Unable to retrieve Subversion ACL for admins') if not author.lower() in [s.strip() for s in admins.lower().split(',')]: return False return True if __name__ == "__main__": usage = "usage: %prog [options] " parser = OptionParser(usage) parser.add_option('-p', '--project', dest='project', help='path to the Trac project') parser.add_option('-r', '--revision', dest='rev', help='repository revision number') parser.add_option('-u', '--user', dest='author', help='author of the change') parser.add_option('-d', '--repos', dest='rep', help='subversion repository path') parser.add_option('-n', '--name', dest='prop', help='property name') parser.add_option('-a', '--action', dest='action', help='subversion action code (A|D|M)') (options, args) = parser.parse_args(sys.argv[1:]) if options.project is None: print "Unspecified project" sys.exit(-ERROR) if options.rev is None: print "Unspecified revision" sys.exit(-ERROR) if options.prop is None: print "Unspecified name" sys.exit(-ERROR) if len(args) < 1: print "Missing argument" sys.exit(-ERROR) # grab the first line from stdin (we don't care about other lines for new) value = sys.stdin.readline() # consider that the defaut hook is 'pre' so that changes are always # validated if an invalid command line is provided hook = RevpropHook(args[0] != 'pre', options.project, options.rev, options.prop, value, options.action, options.author, options.rep)