""" Implementation of a generic tagging API for Trac. The API lets plugins register use of a set of namespaces (tagspaces) then access and manipulate the tags in that tagspace. For integration of external programs, the API also allows other tag systems to be accessed transparently (see the ITaggingSystemProvider interface and the corresponding TaggingSystem class). Taggable names are contained in a tagspace and can be associated with any number of tags. eg. ('wiki', 'WikiStart', 'start') represents a 'start' tag to the 'WikiStart' page in the 'wiki' tagspace. For a component to register a new tagspace for use it must implement the ITagSpaceUser interface. To access tags for a tagspace use the following mechanism (using the 'wiki' tagspace in this example): {{{ #!python from tractags.api import TagEngine tags = TagEngine(env).tagspace.wiki # Display all names and the tags associated with each name for name in tags.get_tagged_names(): print name, list(tags.get_name_tags(name)) # Display all tags and the names associated with each tag for tag in tags.get_tags(): print tag, list(tags.get_tagged_names(tag)) # Add a start tag to WikiStart tags.add_tags(None, 'WikiStart', ['start']) }}} """ from trac.core import * from trac.env import IEnvironmentSetupParticipant from trac.db import Table, Column, Index import sys import re try: set = set except: from sets import Set as set try: sorted = sorted except NameError: def sorted(iterable): lst = list(iterable) lst.sort() return lst class ITagSpaceUser(Interface): """ Register that this component uses a set of tagspaces. If a tagspace is not registered, it can not be used. """ def tagspaces_used(): """ Return an iterator of tagspaces used by this plugin. """ class ITaggingSystemProvider(Interface): """ An implementation of a tag system. This allows other non-Trac-native tag systems to be accessed through one API. """ def get_tagspaces_provided(): """ Iterable of tagspaces provided by this tag system. """ def get_tagging_system(tagspace): """ Return the TaggingSystem responsible for tagspace. """ class TaggingSystem(object): """ An implementation of a tagging system. """ def __init__(self, env, tagspace): self.env = env self.tagspace = tagspace def walk_tagged_names(self, names, tags, predicate): """ Generator returning a tuple of (name, tags) for each tagged name in this tagspace that meets the predicate and is in the set of names and has any of the given tags. predicate is called with (name, tags) (The names and tags arguments are purely an optimisation opportunity for the underlying TaggingSystem) """ def get_name_tags(self, name): """ Get tags for a name. """ raise NotImplementedError def add_tags(self, req, name, tags): """ Tag name in tagspace with tags. """ raise NotImplementedError def replace_tags(self, req, name, tags): """ Replace existing tags on name with tags. """ self.remove_all_tags(req, name) self.add_tags(req, name, tags) def remove_tags(self, req, name, tags): """ Remove tags from a name in a tagspace. """ raise NotImplementedError def remove_all_tags(self, req, name): """ Remove all tags from a name in a tagspace. """ self.remove_tags(req, name, self.get_name_tags(name)) def name_details(self, name): """ Return a tuple of (href, htmllink, title). eg. ("/ticket/1", "#1", "Broken links") """ raise NotImplementedError class DefaultTaggingSystem(TaggingSystem): """ Default tagging system. Handles any number of namespaces registered via ITagSpaceUser. """ def walk_tagged_names(self, names, tags, predicate): db = self.env.get_db_cnx() cursor = db.cursor() args = [self.tagspace] sql = 'SELECT DISTINCT name, tag FROM tags WHERE tagspace=%s' if names: sql += ' AND name IN (' + ', '.join(['%s' for n in names]) + ')' args += names if tags: sql += ' AND name in (SELECT name FROM tags WHERE tag in (' + ', '.join(['%s' for t in tags]) + '))' args += tags sql += " ORDER BY name" cursor.execute(sql, args) tags = set(tags) current_name = None name_tags = set() for name, tag in cursor: if current_name != name: if current_name is not None: if predicate(current_name, name_tags): yield (current_name, name_tags) name_tags = set([tag]) current_name = name else: name_tags.add(tag) if current_name is not None and predicate(current_name, name_tags): yield (current_name, name_tags) def get_name_tags(self, name): db = self.env.get_db_cnx() cursor = db.cursor() cursor.execute('SELECT tag FROM tags WHERE tagspace=%s AND name=%s', (self.tagspace, name)) return set([row[0] for row in cursor]) def add_tags(self, req, name, tags): db = self.env.get_db_cnx() cursor = db.cursor() for tag in tags: cursor.execute('INSERT INTO tags (tagspace, name, tag) VALUES (%s, %s, %s)', (self.tagspace, name, tag)) db.commit() def remove_tags(self, req, name, tags): db = self.env.get_db_cnx() cursor = db.cursor() sql = "DELETE FROM tags WHERE tagspace = %s AND name = %s AND tag " \ "IN (" + ', '.join(["%s" for t in tags]) + ")" cursor.execute(sql, (self.tagspace, name) + tuple(tags)) db.commit() def remove_all_tags(self, req, name): db = self.env.get_db_cnx() cursor = db.cursor() cursor.execute('DELETE FROM tags WHERE tagspace=%s AND name=%s', (self.tagspace, name)) db.commit() def name_details(self, name): from trac.wiki.formatter import wiki_to_oneliner return (getattr(self.env.href, self.tagspace), wiki_to_oneliner('[%s:"%s" %s]' % (self.tagspace, name, name), self.env), '') class TagspaceProxy: """ A convenience for performing operations on a specific tagspace, including get_tags() and get_tagged_names(). Both of these functions will only search that tagspace, and will return values stripped of tagspace information. """ def __init__(self, engine, tagspace): self.engine = engine self.tagspace = tagspace self.tagsystem = engine._get_tagsystem(tagspace) def get_tags(self, *args, **kwargs): result = self.engine.get_tags(tagspaces=[self.tagspace], *args, **kwargs) if isinstance(result, set): return result else: out = {} for tag, names in result.iteritems(): out[tag] = set([name for _, name in names]) return out def get_tagged_names(self, *args, **kwargs): return self.engine.get_tagged_names(tagspaces=[self.tagspace], *args, **kwargs)[self.tagspace] def __getattr__(self, attr): return getattr(self.tagsystem, attr) class TagspaceDirector(object): """ A convenience similar to env.href, proxying to the correct TagSystem by attribute. """ def __init__(self, engine): self.engine = engine def __getattr__(self, tagspace): return self.tagspace(tagspace) def tagspace(self, tagspace): return TagspaceProxy(self.engine, tagspace) class TagEngine(Component): """ The core of the Trac tag API. This interface can be used to register tagspaces (ITagSpaceUser or register_tagspace()), add other tagging systems (ITaggingSystemProvider), and to control tags in a tagspace. """ _tagspace_re = re.compile(r'''^[a-zA-Z_][a-zA-Z0-9_]*$''') implements(ITaggingSystemProvider, IEnvironmentSetupParticipant) tag_users = ExtensionPoint(ITagSpaceUser) tagging_systems = ExtensionPoint(ITaggingSystemProvider) SCHEMA = [ Table('tags', key = ('tagspace', 'name', 'tag'))[ Column('tagspace'), Column('name'), Column('tag'), Index(['tagspace', 'name']), Index(['tagspace', 'tag']),] ] def __init__(self): self.tagspace = TagspaceDirector(self) self._tagsystem_cache = {} self._tag_link_cache = {} def _get_tagspaces(self): """ Get iterable of available tagspaces. """ out = [] for tagsystem in self.tagging_systems: for tagspace in tagsystem.get_tagspaces_provided(): out.append(tagspace) return out tagspaces = property(_get_tagspaces) def _get_tagsystem(self, tagspace): """ Returns a TaggingSystem proxy object with tagspace as the default tagspace. """ try: return self._tagsystem_cache[tagspace] except KeyError: for tagsystem in self.tagging_systems: if tagspace in tagsystem.get_tagspaces_provided(): self._tagsystem_cache[tagspace] = tagsystem.get_tagging_system(tagspace) return self._tagsystem_cache[tagspace] raise TracError("No such tagspace '%s'" % tagspace) # Public methods def flush_link_cache(self, tag=None): """ Flush the link cache entirely, or for a single tag. """ if not tag: self._tag_link_cache = {} elif tag in self._tag_link_cache: del self._tag_link_cache[tag] def walk_tagged_names(self, names=[], tags=[], tagspaces=[], predicate=lambda tagspace, name, tags: True): """ Generator returning (tagspace, name, tags) for all names in the given tagspaces. Objects must have at least one of tags, be in names and must meet the predicate. """ tagspaces = tagspaces or self.tagspaces for tagspace in tagspaces: tagsystem = self._get_tagsystem(tagspace) for name, name_tags in tagsystem.walk_tagged_names(names=names, tags=tags, predicate=lambda n, t: predicate(tagspace, n, t)): yield (tagspace, name, name_tags) def get_tags(self, names=[], tagspaces=[], operation='union', detailed=False): """ Get tags with the given names from the given tagspaces. 'operation' is the union or intersection of all tags on names. If detailed, return a set of {tag:set([(tagspace, name), ...])}, otherwise return a set of tags. """ assert type(names) in (list, tuple, set) tagspaces = tagspaces or self.tagspaces seed_set = True all_tags = set() tagged_names = {} for tagspace, name, tags in self.walk_tagged_names(names=names, tagspaces=tagspaces): for tag in tags: tagged_names.setdefault(tag, set()).add((tagspace, name)) if operation == 'intersection': if seed_set: seed_set = False all_tags.update(tags) else: all_tags.intersection_update(tags) if not all_tags: return detailed and {} or set() else: all_tags.update(tags) if detailed: out_tags = {} for tag in all_tags: out_tags[tag] = tagged_names[tag] return out_tags else: return all_tags def get_tagged_names(self, tags=[], tagspaces=[], operation='intersection', detailed=False): """ Get names with the given tags from tagspaces. 'operation' is the set operatin to perform on the sets of names tagged with each of the search tags, and can be either 'intersection' or 'union'. If detailed=True return a dictionary of {tagspace:{name:set([tag, ...])}} otherwise return a dictionary of {tagspace:set([name, ...])}. """ assert type(tags) in (list, tuple, set) tagspaces = tagspaces or self.tagspaces tags = set(tags) if detailed: output = dict([(ts, {}) for ts in tagspaces]) else: output = dict([(ts, set()) for ts in tagspaces]) for tagspace, name, name_tags in self.walk_tagged_names(tags=tags, tagspaces=tagspaces): if operation == 'intersection' and tags.intersection(name_tags) != tags: continue if detailed: output[tagspace][name] = name_tags else: output[tagspace].add(name) return output def get_tag_link(self, tag, is_expression=False): """ Return (href, title) to information about tag. This first checks for a Wiki page named , then uses /tags/. If is_expression=True, treat the tag as an expression rather than a simple tag. """ if tag in self._tag_link_cache: return self._tag_link_cache[tag] from tractags.wiki import WikiTaggingSystem page, title = WikiTaggingSystem(self.env).page_info(tag) if page.exists: result = (self.env.href.wiki(tag), title) else: tag_link = is_expression and tag or "'%s'" % tag result = (self.env.href.tags(tag_link), "Objects tagged ''%s''" % tag) self._tag_link_cache[tag] = result return result def name_details(self, tagspace, name): """ Return a tuple of (href, htmllink, title). eg. ("/ticket/1", "#1", "Broken links") """ return self._get_tagsystem(tagspace).name_details(name) # ITaggingSystemProvider methods def get_tagspaces_provided(self): for user in self.tag_users: for tagspace in user.tagspaces_used(): yield tagspace def get_tagging_system(self, tagspace): for taguser in self.tag_users: if tagspace in taguser.tagspaces_used(): return DefaultTaggingSystem(self.env, tagspace) raise TracError("No such tagspace '%s'" % tagspace) # IEnvironmentSetupParticipant methods def environment_created(self): self._upgrade_db(self.env.get_db_cnx()) def environment_needs_upgrade(self, db): cursor = db.cursor() if self._need_migration(db): return True try: cursor.execute("select count(*) from tags") cursor.fetchone() return False except: db.rollback() return True def upgrade_environment(self, db): self._upgrade_db(db) def _need_migration(self, db): cursor = db.cursor() try: cursor.execute("select count(*) from wiki_namespace") cursor.fetchone() self.env.log.debug("tractags needs to migrate old data") return True except: db.rollback() return False def _upgrade_db(self, db): try: try: from trac.db import DatabaseManager db_backend, _ = DatabaseManager(self.env)._get_connector() except ImportError: db_backend = self.env.get_db_cnx() cursor = db.cursor() for table in self.SCHEMA: for stmt in db_backend.to_sql(table): self.env.log.debug(stmt) cursor.execute(stmt) db.commit() # Migrate old data if self._need_migration(db): cursor = db.cursor() cursor.execute("INSERT INTO tags (tagspace, name, tag) SELECT 'wiki', name, namespace FROM wiki_namespace") cursor.execute("DROP TABLE wiki_namespace") db.commit() except Exception, e: db.rollback() self.env.log.error(e, exc_info=1) raise TracError(str(e))