#!/usr/bin/env python # trac-commit-hook # ---------------------------------------------------------------------------- # Copyright (c) 2004 Stephen Hansen # Copyright (c) 2005-2008 Emmanuel Blot, Jerome Souquieres # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to # deal in the Software without restriction, including without limitation the # rights to use, copy, modify, merge, publish, distribute, sublicense, and/or # sell copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # ---------------------------------------------------------------------------- import re import os import sys import string from datetime import datetime from optparse import OptionParser from trac.env import open_environment from trac.ticket.notification import TicketNotifyEmail from trac.ticket import Ticket, Milestone from trac.util.datefmt import utc from ConfigParser import ConfigParser from repproxy import RepositoryProxy OK = 0 ERROR = 1 # # Options parsing # parser = OptionParser() parser.add_option('-p', '--project', dest='project', help='Path to the Trac project.') parser.add_option('-r', '--revision', dest='rev', help='Repository revision number (post-commit).') parser.add_option('-t', '--transaction', dest='txn', help='Transaction number (pre-commit).') parser.add_option('-d', '--repos', dest='rep', help='The path to the Subversion repository.') (options, args) = parser.parse_args(sys.argv[1:]) # # Patterns to match message logs # ticket_cmd_pattern = re.compile(r'^(?Prefs|closes|fixes).?#(?P[0-9]+)', re.IGNORECASE) changeset_cmd_pattern = re.compile(r'^(?Pdelivers|brings)(?P\!)?\s+((\[(?P\d+)\])(\s*:\s*\[(?P\d+)\])?)?', re.IGNORECASE) create_pattern = re.compile(r'^(?Pcreates)', re.IGNORECASE) terminate_pattern = re.compile(r'^(?Pterminates)(?P\!)?\s', re.IGNORECASE) admin_pattern = re.compile(r'^(?Padmins)', re.IGNORECASE) # # SVN properties # deliver_prop_name = 'rth:deliver' bring_prop_name = 'rth:bring' export_prop_name = 'rth:export' # # SVN directories # dev_branch_dirs = ['/sandboxes', '/branches'] trunk_directory = '/trunk' configpath = '/local/var/svn/config/access.conf' # # Milestones # EXCLUDED_MILESTONES = [ u'Unclassified' ] TBD_MILESTONE = u'Next' class CommitHook(object): ''' The base class for pre and post -commit hooks Contains the base mechanism for parsing log messages looking for action keywords and some commun functions. ''' _ticket_cmds = {'closes': '_cmd_closes', 'fixes': '_cmd_closes', 'refs': '_cmd_refs'} _changeset_cmds = {'delivers': '_cmd_delivers', 'brings': '_cmd_brings'} def __init__(self, project=options.project, rev=options.rev, txn=options.txn, rep=options.rep): # Initialization self._init_proxy(rep, rev, txn) if rev: self.rev = int(rev) self.txn = txn self.rep = rep self.now = datetime.now(utc) self.project = project self.author = self._get_author() self.log = self._get_log() if not os.path.isdir(os.environ['PYTHON_EGG_CACHE']): raise AssertionError("Invalid egg cache directory: %s" % \ os.environ['PYTHON_EGG_CACHE']); self.env = open_environment(project) self.env.get_repository() bre = self.env.config.get('revtree', 'branch_re', r'^(?Ptrunk|(?:branches|tags|vendor)/[^/]+)' r'(?:/(?P.*))?$') self.bcre = re.compile(bre) # Nearly empty log message if not self.log: print>>sys.stderr, 'Log message is invalid' self.finalize(ERROR) # Administration commit administration = admin_pattern.search(self.log) if administration: self._cmd_admins() self.finalize(OK) # Branch deletion terminate = terminate_pattern.search(self.log) if terminate: rc = self._cmd_terminates(terminate.group('force') and True) self.finalize(OK) # Branch creation creation = create_pattern.search(self.log) if creation: rc = self._cmd_creates() self.finalize(rc) # Changeset-related commands chgset_cmd = changeset_cmd_pattern.search(self.log) if chgset_cmd: cmd = chgset_cmd.group('action').lower() if CommitHook._changeset_cmds.has_key(cmd): func = getattr(self, CommitHook._changeset_cmds[cmd]) rc = func(chgset_cmd.group('first'), chgset_cmd.group('second'), chgset_cmd.group('force') and True) self.finalize(rc) else: print >> sys.stderr, 'Invalid changeset action' self.finalize(ERROR) # Ticket-related commands ticket_cmd = ticket_cmd_pattern.search(self.log) if ticket_cmd: cmd = ticket_cmd.group('action').lower() if CommitHook._ticket_cmds.has_key(cmd): func = getattr(self, CommitHook._ticket_cmds[cmd]) rc = func(int(ticket_cmd.group('ticket'))) self.finalize(rc) else: print>>sys.stderr, 'No supported action in log message !' self.finalize(ERROR) # Unrecognized log message print>>sys.stderr, 'No known action in log message !' self.finalize(ERROR) def _next_milestone(self): ''' Returns the next milestone (i.e. the first non-completed milestone by chronological order) ''' db = self.env.get_db_cnx() xms = EXCLUDED_MILESTONES + [TBD_MILESTONE] ms = [m.name for m in Milestone.select(self.env, False, db) \ if m.name not in xms] return ms and ms[0] or None def _collect_branch_revs(self, rev1, rev2): ''' Collect all revisions sitting on the branch between rev1 and rev2 @return the revision list ''' if not rev1: print >> sys.stderr, 'Source revision not specified' self.finalize(ERROR) rev1 = int(rev1) if not rev2: rev2 = rev1 else: rev2 = int(rev2) if rev1 > rev2: print >> sys.stderr, 'Revision range is invalid %d:%d' \ % (rev1, rev2) self.finalize(ERROR) branch1 = self.proxy.find_revision_branch(rev1, self.bcre) print >> sys.stderr, "Branch1 %s" % (branch1) if not branch1: print >> sys.stderr, 'Revision %d does not exist' % rev1 self.finalize(ERROR) branch2 = self.proxy.find_revision_branch(rev2, self.bcre) print >> sys.stderr, "Branch2 %s" % (branch2) if not branch2: print >> sys.stderr, 'Revision %d does not exist' % rev2 self.finalize(ERROR) if branch1 != branch2: print >> sys.stderr, 'Revisions (%d,%d) not coherent: %s,%s' \ % (rev1, rev2, branch1, branch2) self.finalize(ERROR) revisions = [] for rev in range(rev1,rev2+1): revbranch = self.proxy.find_revision_branch(rev, self.bcre) if not revbranch: continue if revbranch != branch1: continue revisions.append('%d' % rev) return revisions def _collect_tickets(self, revisions): ''' Build a dictionary of all tickets referenced by the revision list, following bring links @param revisions a list of revisions @return a dictionary of tickets: the key is the ticket number, the value is the list of revisions related to this ticket. Each revision is itself a list [author, log] ''' ticket_dict = {} for rev in revisions: bring_prop = self.proxy.get_revision_property(int(rev), \ bring_prop_name) if bring_prop and len(bring_prop) > 0: bring_revs = bring_prop.split(',') subticket_dict = self._collect_tickets(bring_revs) ticket_dict.update(subticket_dict) else: rev_log = self.proxy.get_revision_log_message(int(rev)) rev_author = self.proxy.get_revision_author(int(rev)) mo = ticket_cmd_pattern.match(rev_log) if mo: tkid = int(mo.group('ticket')) if ticket_dict.has_key(tkid): ticket_dict[tkid].append([rev_author, rev_log]) else: ticket_dict[tkid] = [[rev_author, rev_log]] return ticket_dict def _is_ticket_open(self, ticket_id): ''' Check if a ticket is open ''' ticket = Ticket(self.env, ticket_id) is_open = ticket['status'] != 'closed' return is_open def _is_admin(self, author): ''' Verify whether the author has administrator priviledges ''' config = ConfigParser() if not os.path.isfile(configpath): raise AssertionError('Unable to find Subversion ACL for admins') config.read(configpath) admins = config.get('groups','admins') if not admins: raise AssertionError('Unable to retrieve Subversion ACL for admins') if not author.lower() in [s.strip() for s in admins.lower().split(',')]: return False return True def _is_dev_branch(self, dir_path): ''' Tell whether a directory is located inside a development branch or not ''' for dev_br in dev_branch_dirs: if dir_path[:len(dev_br)] == dev_br: return True return False class PreCommitHook(CommitHook): ''' Handles pre-commit-hook ''' def _init_proxy(self, rep, rev, txn): ''' Initialize the proxy with the specified transaction ''' self.proxy = RepositoryProxy(rep, txn) self.youngest = self.proxy.get_youngest_revision() if self.youngest == 0: sys.exit(OK) return OK def _get_author(self): ''' Get the transaction author ''' author = self.proxy.get_txn_author() return author def _get_log(self): ''' Get the transaction log message ''' log = self.proxy.get_txn_log_message() if len(log) < 2: return None # Be sure the first letter is uppercased log = log[0].upper() + log[1:] return log def _update_log(self, log): ''' Update the transaction log message ''' self.proxy.set_txn_log_message(log) def _is_txn_branch_directory(self): ''' Check if the directory of the transaction is a branch directory (located under the branches directory) ''' dst_branch = self.proxy.find_txn_branch(self.bcre) return self._is_dev_branch(dst_branch) def finalize(self, result): if OK == result: self._update_log(self.log) sys.exit(result) def _cmd_admins(self): ''' Administrative commit ''' if not self._is_admin(self.author): print >>sys.stderr, 'Only administrator can execute admin commits' self.finalize(ERROR) return OK def _cmd_creates(self): ''' Branch creation Check operation source and destination Copy import/symweek/symver properties from new branch ''' src = self.proxy.get_txn_copy_source() if not src: print >> sys.stderr, 'Cannot locate source revision ' \ '(not a copy ?)' self.finalize(ERROR) dstbranch = self.proxy.find_txn_branch(self.bcre) if not self._is_dev_branch(dstbranch): print >> sys.stderr, 'Cannot create a new branch outside %s' \ % dev_branch_dirs self.finalize(ERROR) return OK def _cmd_terminates(self, force): ''' Branch deletion Check that this is a valid and owned branch ''' change_gen = self.proxy.get_txn_changed_paths() try: item = change_gen.next() except StopIteration: print >> sys.stderr, 'No deleted path in the submitted revision' self.finalize(ERROR) try: change_gen.next() except StopIteration: pass else: print >> sys.stderr, 'Termination of more than one branch is not ' \ 'allowed' self.finalize(ERROR) (path, change) = item if change is not RepositoryProxy.PATH_DELETE: print >> sys.stderr, "The branch %s is not being deleted" % path self.finalize(ERROR) dstbranch = self.proxy.find_txn_branch(self.bcre) if not self._is_dev_branch(dstbranch): print >> sys.stderr, 'Cannot terminates a non-branch dir (%s)' \ % dev_branch_dirs self.finalize(ERROR) if not force: youngest = self.proxy.get_youngest_path_revision(path) # now checks that the deleter is the creator of the branch revs = [h[0] for h in self.proxy.get_history(youngest, path, 0)] if not revs: print >> sys.stderr, 'Malformed branch, cannot find ancestor ' \ 'from %s (%d)' % (path, youngest) first_rev = revs[-1] init_author = self.proxy.get_revision_author(first_rev) if init_author != self.author: print >> sys.stderr, 'Cannot delete a non self-owned branch ' \ '%s, owned by %s' \ % (path, init_author) self.finalize(ERROR) return OK def _cmd_closes(self, ticket_id): ''' Ticket close Check that the ticket is open Check that the operation occurs in a branch ''' if not self._is_ticket_open(ticket_id): print >> sys.stderr, 'The ticket %d mentionned in the log ' \ 'message must be open.' % ticket_id self.finalize(ERROR) if not self._is_txn_branch_directory(): print >> sys.stderr, 'Cannot apply changes to a non-branch dir' \ ' (%s)' % dev_branch_dirs self.finalize(ERROR) return OK def _cmd_refs(self, ticket_id): ''' Ticket reference Same pre-conditions as closes ''' return self._cmd_closes(ticket_id) def _cmd_brings(self, rev1, rev2, force): ''' Branch import Check revision range validity Check operation source and destination Collect all tickets related to this revision ''' # Get all revisions to bring revisions = self._collect_branch_revs(rev1, rev2) if not revisions: print >> sys.stderr, "Revisions %s %s %s" % (rev1, rev2, revisions) self.finalize(ERROR) # On error, the transaction is destroyed, so it is safe to apply the # property even if the hook fails. self.proxy.set_txn_property(bring_prop_name, ','.join(revisions)) dstbranch = self.proxy.find_txn_branch(self.bcre) if not dstbranch: print >> sys.stderr, 'Unable to locate bring destination' self.finalize(ERROR) branch1 = self.proxy.find_revision_branch(int(rev1), self.bcre) if dstbranch == branch1: print >> sys.stderr, 'Cannot bring to self (%s -> %s)' % \ (branch1, dstbranch) self.finalize(ERROR) if dstbranch == trunk_directory: print >> sys.stderr, 'Cannot bring to trunk' self.finalize(ERROR) # Try to collect all tickets. This will be used in the post-commit tickets = self._collect_tickets(revisions) return OK def _cmd_delivers(self, rev1, rev2, force): ''' Branch delivery Check revision range validity Check operation source and destination Check trunk availability Build consolidated log message ''' # Get all revisions to deliver revisions = self._collect_branch_revs(rev1, rev2) if not revisions: print >> sys.stderr, "Revisions %s %s %s" % (rev1, rev2, revisions) self.finalize(ERROR) # On error, the transaction is destroyed, so it is safe to apply the # property even if the hook fails. self.proxy.set_txn_property(deliver_prop_name, ','.join(revisions)) # Check that the destination branch is ok dstbranch = self.proxy.find_txn_branch(self.bcre) if not dstbranch: print >> sys.stderr, 'Unable to locate delivery destination' self.finalize(ERROR) # Ensure the branch is not delivered to self branch1 = self.proxy.find_revision_branch(int(rev1), self.bcre) if dstbranch == branch1: print >> sys.stderr, 'Cannot deliver to self (%s -> %s)' % \ (branch1, dstbranch) self.finalize(ERROR) # Ensure that the 'branch creation' revision is not selected as a source print >> sys.stderr, "BRANCH1: (%s) [%s]" % (rev1, branch1) brevs = [h[0] for h in self.proxy.get_history(int(rev1), branch1, None)] if rev1 == brevs[0]: print >> sys.stderr, \ 'Cannot deliver the initial branch revision (%d)' % rev1 self.finalize(ERROR) # if dstbranch == trunk_directory: # Checks whether the trunk has been exported to ClearCase, # tagged (validated) and imported back in Subversion. # If this cycle has started but not completed (export not # followed by import), no other delivery should be made to # the trunk #for rev, path in self.proxy.get_history(self.youngest, trunk_directory, 0): #importval = self.proxy.get_revision_property(rev, import_prop_name) #if importval: # import property encountered (CC->SVN), # trunk can be modified # break #exportval = self.proxy.get_revision_property(rev, export_prop_name) #if exportval: # # export property encountered (SVN->CC) w/o import # # trunk should not be modified # print >> sys.stderr, 'Cannot deliver to the trunk ' \ # 'as the latest CC export (rev %d on %s) has not been ' \ # 'tagged and imported back into SVN' % (rev, exportval) # if not force: # # Aborts if no force mode is specified # self.finalize(ERROR) # Build the new log including all tickets # @todo there's probably a better way to format this tickets = self._collect_tickets(revisions) full_log = self.log.decode('utf8') full_log += u'\n' for tkt_id in tickets: ticket = Ticket(self.env, tkt_id) full_log += u'\n* #%s: %s\n' % (tkt_id, ticket['summary']) full_log += u'\n'.join([u' %s' % (revi[1].decode('utf8')) \ for revi in tickets[tkt_id]]) full_log += u'\n' full_log = full_log[0].upper() + full_log[1:] self.log = full_log.encode('utf8') # Check if there is a next milestone defined in Trac # This is a bit too conservative, as a next milestone is only needed if # tickets are to be closed. Oh well... ms = self._next_milestone() if not ms: print >> sys.stderr, 'No defined next milestone, ' \ 'please fix up roadmap' self.finalize(ERROR) return OK class PostCommitHook(CommitHook): ''' Handles post-commit-hook ''' def _init_proxy(self, rep, rev, txn): ''' Initialize the proxy for the specified revision ''' # Initial repository creation if rev < 2: self.finalize(OK) self.proxy = RepositoryProxy(rep) return OK def _get_log(self): ''' Get the revision log message ''' log = self.proxy.get_revision_log_message(self.rev) return log def _get_author(self): ''' Get the revision author ''' author = self.proxy.get_revision_author(self.rev) return author def _cmd_imports(self, label, week, version): # Nothing to do, the properties have been set during the pre-commit return OK def _cmd_admins(self): # Nothing to do return OK def _cmd_terminates(self, force): # Nothing to do return OK def _cmd_creates(self): # Nothing to do, the tags have been set during the pre-commit return OK def _cmd_closes(self, ticketId): ''' Ticket closes Add backlink to the revision in the ticket Close the ticket ''' ticket_msg = "(In [%d]) %s" % (self.rev, self.log) # FIXME: replace self.now with the actual svn:date commit time # fix this in other script locations as well... commit_date = self.now try: ticket = Ticket(self.env, ticketId) ticket['status'] = 'closed' ticket['resolution'] = 'fixed' ticket.save_changes(self.author, ticket_msg, commit_date) except Exception, e: from trac.util import get_last_traceback print>>sys.stderr, 'Unexpected error while processing ticket ' \ 'ID %s: %s' % (ticketId, e) print >>sys.stderr, 'Traceback:\n', get_last_traceback() return ERROR try: # we do not want a notification failure to prevent from # backing up the revision tn = TicketNotifyEmail(self.env) tn.notify(ticket, newticket=0, modtime=commit_date) except Exception, e: from trac.util import get_last_traceback print>>sys.stderr, 'Unexpected error while processing ticket ' \ 'ID %s: %s' % (ticketId, e) print >>sys.stderr, 'Traceback:\n', get_last_traceback() return OK def _cmd_refs(self, ticketId): ''' Ticket reference Add backlink to the revision in the ticket ''' ticket_msg = "(In [%d]) %s" % (self.rev, self.log) try: ticket = Ticket(self.env, ticketId) ticket.save_changes(self.author, ticket_msg, self.now) tn = TicketNotifyEmail(self.env) tn.notify(ticket, newticket=0, modtime=self.now) return OK except Exception, e: from trac.util import get_last_traceback print>>sys.stderr, 'Unexpected error while processing ticket ' \ 'ID %s: %s' % (ticketId, e) print >>sys.stderr, 'Traceback:\n', get_last_traceback() return ERROR def _cmd_brings(self, rev1, rev2, force): ''' Branch import Add backlink to the revision in all related tickets ''' # Get all revisions to deliver revisions = self._collect_branch_revs(rev1, rev2) # Get all tickets related to these revisions tickets = self._collect_tickets(revisions) for tktid in tickets: ticket = Ticket(self.env, int(tktid)) ticket.save_changes(self.author, 'Brought in [%s]' % self.rev, self.now) tn = TicketNotifyEmail(self.env) tn.notify(ticket, newticket=0, modtime=self.now) return OK def _cmd_delivers(self, rev1, rev2, force): ''' Branch delivery Add backlink to the revision in all related tickets Update all closed tickets milestone ''' # Get all revisions to deliver revisions = self._collect_branch_revs(rev1, rev2) next_ms = self._next_milestone() # Get all tickets related to these revisions tickets = self._collect_tickets(revisions) for tktid in tickets: ticket = Ticket(self.env, int(tktid)) if ticket['status'] == 'closed': if ticket['milestone'] == TBD_MILESTONE: ticket['milestone'] = next_ms ticket.save_changes(self.author, 'Delivered in [%s]' % self.rev, self.now) tn = TicketNotifyEmail(self.env) tn.notify(ticket, newticket=0, modtime=self.now) return OK def finalize(self, result): if result == OK: eventfile = "%s/events/%d.tag" % (self.project, self.rev) fp = open(eventfile, "w") fp.write('please backup this revision\n') fp.close sys.exit(result) if __name__ == "__main__": if options.project is None: print "Unspecified project" sys.exit(ERROR) if options.rep is None: print "Unspecified repository" sys.exit(ERROR) if options.rev is None and not options.txn is None: PreCommitHook() if not options.rev is None and options.txn is None: PostCommitHook() else: print "A transaction OR a revision must be specified (but not both)" sys.exit(ERROR)