source: fulltextsearchplugin/trunk/fulltextsearchplugin/fulltextsearch.py

Last change on this file was 16711, checked in by Ryan J Ollos, 6 years ago

FullTextSearchPlugin 0.1: Require TracTags >= 0.7

Refs #13228.

File size: 33.3 KB
Line 
1from Queue import Queue
2import os
3from datetime import datetime
4import operator
5import re
6import sunburnt
7from sunburnt.sunburnt import grouper
8import types
9
10from trac.env import IEnvironmentSetupParticipant
11from trac.core import Component, implements, Interface, TracError
12from trac.ticket.api import (ITicketChangeListener, IMilestoneChangeListener,
13                             TicketSystem)
14from trac.ticket.model import Ticket, Milestone
15from trac.ticket.web_ui import TicketModule
16from trac.ticket.roadmap import MilestoneModule
17from trac.wiki.api import IWikiChangeListener, WikiSystem
18from trac.wiki.model import WikiPage
19from trac.wiki.web_ui import WikiModule
20from trac.util.text import shorten_line
21from trac.attachment import IAttachmentChangeListener, Attachment
22from trac.attachment import AttachmentModule
23from trac.versioncontrol.api import IRepositoryChangeListener, Changeset
24from trac.versioncontrol.web_ui import ChangesetModule
25from trac.resource import (get_resource_shortname, get_resource_url,
26                           Resource, ResourceNotFound)
27from trac.search import ISearchSource, shorten_result
28from trac.util.translation import _
29from trac.config import IntOption
30from trac.config import ListOption
31from trac.config import Option
32from trac.util.compat import partial
33from trac.util.datefmt import to_datetime, to_utimestamp, utc
34from trac.web.chrome import add_warning
35
36from componentdependencies import IRequireComponents
37from tractags.db import TagSetup
38
39from fulltextsearchplugin.dates import normalise_datetime
40
41__all__ = ['IFullTextSearchSource',
42           'FullTextSearchObject', 'Backend', 'FullTextSearch',
43           ]
44
45def _do_nothing(*args, **kwargs):
46    pass
47
48def _sql_in(seq):
49    '''Return '(%s,%s,...%s)' suitable to use in a SQL in clause.
50    '''
51    return '(%s)' % ','.join('%s' for x in seq)
52
53def _res_id(resource):
54    if resource.parent:
55        return u"%s:%s:%s:%s" % (resource.realm, resource.parent.realm,
56                                 resource.parent.id, resource.id)
57    else:
58        return u"%s:%s"% (resource.realm, resource.id)
59
60class IFullTextSearchSource(Interface):
61    pass
62
63class FullTextSearchModule(Component):
64    pass
65
66class FullTextSearchObject(object):
67    '''Minimal behaviour class to store documents going to/comping from Solr.
68    '''
69    def __init__(self, project, realm, id=None,
70                 parent_realm=None, parent_id=None,
71                 title=None, author=None, changed=None, created=None,
72                 oneline=None, tags=None, involved=None,
73                 popularity=None, body=None, comments=None, action=None,
74                 **kwarg):
75        # we can't just filter on the first part of id, because
76        # wildcards are not supported by dismax in solr yet
77        self.project = project
78        if isinstance(realm, Resource):
79            self.resource = realm
80        else:
81            if isinstance(parent_realm, Resource):
82                parent = parent_realm
83            else:
84                parent = Resource(parent_realm, parent_id)
85            self.resource = Resource(realm, id, parent=parent)
86
87        self.title = title
88        self.author = author
89        self.changed = normalise_datetime(changed)
90        self.created = normalise_datetime(created)
91        self.oneline = oneline
92        self.tags = tags
93        self.involved = involved
94        self.popularity = popularity
95        self.body = body
96        self.comments = comments
97        self.action = action
98
99    def _get_realm(self):
100        return self.resource.realm
101    def _set_realm(self, val):
102        self.resource.realm = val
103    realm = property(_get_realm, _set_realm)
104
105    def _get_id(self):
106        return self.resource.id
107    def _set_id(self, val):
108        self.resource.id = val
109    id = property(_get_id, _set_id)
110
111    @property
112    def parent_realm(self):
113        if self.resource.parent:
114            return self.resource.parent.realm
115
116    @property
117    def parent_id(self):
118        if self.resource.parent:
119            return self.resource.parent.id
120
121    @property
122    def doc_id(self):
123        return u"%s:%s" % (self.project, _res_id(self.resource))
124
125    def __repr__(self):
126        from pprint import pformat
127        r = '<FullTextSearchObject %s>' % pformat(self.__dict__)
128        return r
129
130
131class Backend(Queue):
132    """
133    """
134
135    def __init__(self, solr_endpoint, log, si_class=sunburnt.SolrInterface):
136        Queue.__init__(self)
137        self.log = log
138        self.solr_endpoint = solr_endpoint
139        self.si_class = si_class
140
141    def create(self, item, quiet=False):
142        item.action = 'CREATE'
143        self.put(item)
144        self.commit(quiet=quiet)
145
146    def modify(self, item, quiet=False):
147        item.action = 'MODIFY'
148        self.put(item)
149        self.commit(quiet=quiet)
150
151    def delete(self, item, quiet=False):
152        item.action = 'DELETE'
153        self.put(item)
154        self.commit(quiet=quiet)
155
156    def add(self, item, quiet=False):
157        if isinstance(item, list):
158            for i in item:
159                self.put(i)
160        else:
161            self.put(item)
162        self.commit(quiet=quiet)
163
164    def remove(self, project_id, realms=None):
165        '''Delete docs from index where project=project_id AND realm in realms
166
167        If realms is not specified then delete all documents in project_id.
168        '''
169        s = self.si_class(self.solr_endpoint)
170        Q = s.query().Q
171        q = s.query(u'project:%s' % project_id)
172        if realms:
173            query = q.query(reduce(operator.or_,
174                                   [Q(u'realm:%s' % realm)
175                                    for realm in realms]))
176        # I would have like some more info back
177        s.delete(queries=[query])
178        s.commit()
179
180    def commit(self, quiet=False):
181        try:
182            s = self.si_class(self.solr_endpoint)
183        except Exception, e:
184            if quiet:
185                self.log.error("Could not commit to Solr due to: %s", e)
186                return
187            else:
188                raise
189        while not self.empty():
190            item = self.get()
191            if item.action in ('CREATE', 'MODIFY'):
192                if hasattr(item.body, 'read'):
193                    try:
194                        s.add(item, extract=True, filename=item.id)
195                    except sunburnt.SolrError, e:
196                        response, content = e.args
197                        self.log.error("Encountered a Solr error "
198                                       "indexing '%s'. "
199                                       "Solr returned: %s %s",
200                                       item, response, content)
201                else:
202                    s.add(item) #We can add multiple documents if we want
203            elif item.action == 'DELETE':
204                s.delete(item)
205            else:
206                if quiet:
207                    self.log.error("Unknown Solr action %s on %s",
208                                   item.action, item)
209                else:
210                    raise ValueError("Unknown Solr action %s on %s"
211                                     % (item.action, item))
212            try:
213                s.commit()
214            except Exception, e:
215                self.log.exception('%s %r', item, item)
216                if not quiet:
217                    raise
218
219    def optimize(self):
220        s = self.si_class(self.solr_endpoint)
221        try:
222            s.optimize()
223        except Exception:
224            self.log.exception("Error optimizing %s", self.solr_endpoint)
225            raise
226
227
228class FullTextSearch(Component):
229    """Search all ChangeListeners and prepare the output for a full text
230       backend."""
231    implements(ITicketChangeListener, IWikiChangeListener,
232               IAttachmentChangeListener, IMilestoneChangeListener,
233               IRepositoryChangeListener, ISearchSource,
234               IEnvironmentSetupParticipant, IRequireComponents)
235
236    solr_endpoint = Option("search", "solr_endpoint",
237                           default="http://localhost:8983/solr/",
238                           doc="URL to use for HTTP REST calls to Solr")
239
240    search_realms = ListOption("search", "fulltext_search_realms",
241        default=['ticket', 'wiki', 'milestone', 'changeset', 'source',
242                 'attachment'],
243        doc="""Realms for which full-text search should be enabled.
244
245        This option does not affect the realms available for indexing.
246        """)
247
248    max_size = IntOption("search", "max_size", 10*2**20, # 10 MB
249        doc="""Maximum document size (in bytes) to indexed.
250        """)
251
252    #Warning, sunburnt is case sensitive via lxml on xpath searches while solr is not
253    #in the default schema fieldType and fieldtype mismatch gives problem
254    def __init__(self):
255        self.backend = Backend(self.solr_endpoint, self.log)
256        self.project = os.path.split(self.env.path)[1]
257        self._realms = [
258            (u'ticket',     u'Tickets',     True,   self._reindex_ticket),
259            (u'wiki',       u'Wiki',        True,   self._reindex_wiki),
260            (u'milestone',  u'Milestones',  True,   self._reindex_milestone),
261            (u'changeset',  u'Changesets',  True,   self._reindex_changeset),
262            (u'source',     u'File archive', True,  None),
263            (u'attachment', u'Attachments', True,   self._reindex_attachment),
264            ]
265        self._indexers = dict((name, indexer) for name, label, enabled, indexer
266                                              in self._realms if indexer)
267        self._fallbacks = {
268            'TicketModule': TicketModule,
269            'WikiModule': WikiModule,
270            'MilestoneModule': MilestoneModule,
271            'ChangesetModule': ChangesetModule,
272            }
273
274    @property
275    def index_realms(self):
276        return [name for name, label, enabled, indexer in self._realms
277                     if indexer]
278
279    def _index(self, realm, resources, check_cb, index_cb,
280               feedback_cb, finish_cb):
281        """Iterate through `resources` to index `realm`, return index count
282
283        realm       Trac realm to which items in resources belong
284        resources   Iterable of Trac resources e.g. WikiPage, Attachment
285        check_cb    Callable that accepts a resource & status,
286                    returns True if it needs to be indexed
287        index_cb    Callable that accepts a resource, indexes it
288        feedback_cb Callable that accepts a realm & resource argument
289        finish_cb   Callable that accepts a realm & resource argument. The
290                    resource will be None if no resources are indexed
291        """
292        i = -1
293        resource = None
294        resources = (r for r in resources if check_cb(r, self._get_status(r)))
295        for i, resource in enumerate(resources):
296            index_cb(resource)
297            feedback_cb(realm, resource)
298        finish_cb(realm, resource)
299        return i + 1
300
301    def _reindex_changeset(self, realm, feedback, finish_fb):
302        """Iterate all changesets and call self.changeset_added on them"""
303        # TODO Multiple repository support
304        repo = self.env.get_repository()
305        def all_revs():
306            rev = repo.oldest_rev
307            yield rev
308            while 1:
309                rev = repo.next_rev(rev)
310                if rev is None:
311                    return
312                yield rev
313        def check(changeset, status):
314            return status is None or changeset.date > to_datetime(int(status))
315        resources = (repo.get_changeset(rev) for rev in all_revs())
316        index = partial(self.changeset_added, repo)
317        return self._index(realm, resources, check, index, feedback, finish_fb)
318
319    def _update_changeset(self, changeset):
320        self._set_status(changeset, to_utimestamp(changeset.date))
321
322    def _reindex_wiki(self, realm, feedback, finish_fb):
323        def check(page, status):
324            return status is None or page.time > to_datetime(int(status))
325        resources = (WikiPage(self.env, name)
326                     for name in WikiSystem(self.env).get_pages())
327        index = self.wiki_page_added
328        return self._index(realm, resources, check, index, feedback, finish_fb)
329
330    def _update_wiki(self, page):
331        self._set_status(page, to_utimestamp(page.time))
332
333    def _reindex_attachment(self, realm, feedback, finish_fb):
334        db = self.env.get_read_db()
335        cursor = db.cursor()
336        # This plugin was originally written for #define 4, a Trac derivative
337        # that includes versioned attachments. TO try and keep compatibility
338        # with both check support by checking for a version attribute on an
339        # Attachment. Instantiating Attachment doesn't perform any queries,
340        # so it doesn't matter if ticket:42 actually exists
341        # The versioned attachment code used by #define is published on github
342        # https://github.com/moreati/trac-gitsvn/tree/0.12-versionedattachments
343        canary = Attachment(self.env, 'ticket', 42)
344        if hasattr(canary, 'version'):
345            # Adapted from Attachment.select()
346            cursor.execute("""
347                SELECT type, id, filename, version, description, size, time,
348                       author, ipnr, status, deleted
349                FROM attachment
350                JOIN (SELECT type AS c_type, id AS c_id,
351                             filename AS c_filename, MAX(version) AS c_version
352                      FROM attachment
353                      WHERE deleted IS NULL
354                      GROUP BY c_type, c_id, c_filename) AS current
355                     ON type = c_type AND id = c_id
356                        AND filename = c_filename AND version = c_version
357                ORDER BY time""",
358                )
359        else:
360            cursor.execute(
361                "SELECT type,id,filename,description,size,time,author,ipnr "
362                "FROM attachment "
363                "ORDER by time",
364                )
365        def att(row):
366            parent_realm, parent_id = row[0], row[1]
367            attachment = Attachment(self.env, parent_realm, parent_id)
368            attachment._from_database(*row[2:])
369            return attachment
370        def check(attachment, status):
371            return (status is None
372                    or attachment.date > to_datetime(int(status)))
373        resources = (att(row) for row in cursor)
374        index = self.attachment_added
375        return self._index(realm, resources, check, index, feedback, finish_fb)
376
377    def _update_attachment(self, attachment):
378        self._set_status(attachment, to_utimestamp(attachment.date))
379
380    def _reindex_ticket(self, realm, feedback, finish_fb):
381        db = self.env.get_read_db()
382        cursor = db.cursor()
383        cursor.execute("SELECT id FROM ticket")
384        def check(ticket, status):
385            return (status is None
386                    or ticket.values['changetime'] > to_datetime(int(status)))
387        resources = (Ticket(self.env, tkt_id) for (tkt_id,) in cursor)
388        index = self.ticket_created
389        return self._index(realm, resources, check, index, feedback, finish_fb)
390
391    def _update_ticket(self, ticket):
392        self._set_status(ticket, to_utimestamp(ticket.values['changetime']))
393
394    def _reindex_milestone(self, realm, feedback, finish_fb):
395        resources = Milestone.select(self.env)
396        def check(milestone, check):
397            return True
398        index = self.milestone_created
399        return self._index(realm, resources, check, index, feedback, finish_fb)
400
401    def _check_realms(self, realms):
402        """Check specfied realms are supported by this component
403
404        Raise exception if unsupported realms are found.
405        """
406        if realms is None:
407            realms = self.index_realms
408        unsupported_realms = set(realms).difference(set(self.index_realms))
409        if unsupported_realms:
410            raise TracError(_("These realms are not supported by "
411                              "FullTextSearch: %(realms)s",
412                              realms=self._fmt_realms(unsupported_realms)))
413        return realms
414
415    def _fmt_realms(self, realms):
416        return ', '.join(realms)
417
418    def remove_index(self, realms=None):
419        realms = self._check_realms(realms)
420        self.log.info("Removing realms from index: %s",
421                      self._fmt_realms(realms))
422        @self.env.with_transaction()
423        def do_remove(db):
424            cursor = db.cursor()
425            self.backend.remove(self.project, realms)
426            cursor.executemany("DELETE FROM system WHERE name LIKE %s",
427                               [('fulltextsearch_%s:%%' % r,) for r in realms])
428
429    def index(self, realms=None, clean=False, feedback=None, finish_fb=None):
430        realms = self._check_realms(realms)
431        feedback = feedback or _do_nothing
432        finish_fb = finish_fb or _do_nothing
433
434        if clean:
435            self.remove_index(realms)
436        self.log.info("Started indexing realms: %s",
437                      self._fmt_realms(realms))
438        summary = {}
439        for realm in realms:
440            indexer = self._indexers[realm]
441            num_indexed = indexer(realm, feedback, finish_fb)
442            self.log.debug('Indexed %i resources in realm: "%s"',
443                           num_indexed, realm)
444            summary[realm] = num_indexed
445        self.log.info("Completed indexing realms: %s",
446                      ', '.join('%s (%i)' % (r, summary[r]) for r in realms))
447        return summary
448
449    def optimize(self):
450        self.log.info("Started optimizing index")
451        self.backend.optimize()
452        self.log.info("Completed optimizing")
453
454    # IRequireComponents methods
455    def requires(self):
456        return [TagSetup]
457
458    # IEnvironmentSetupParticipant methods
459    def environment_created(self):
460        pass
461
462    def environment_needs_upgrade(self, db):
463        pass
464
465    def upgrade_environment(self, db):
466        pass
467
468    # Index status helpers
469    def _get_status(self, resource):
470        '''Return index status of `resource`, or None if nothing is recorded.
471        '''
472        db = self.env.get_read_db()
473        cursor = db.cursor()
474        cursor.execute("SELECT value FROM system WHERE name = %s",
475                       (self._status_id(resource),))
476        row = cursor.fetchone()
477        if row:
478            return row[0]
479        else:
480            return None
481
482    def _set_status(self, resource, status):
483        '''Save the index status of a resource'''
484        @self.env.with_transaction()
485        def do_update(db):
486            cursor = db.cursor()
487            row = (str(status), self._status_id(resource))
488            # TODO use try/except, but take care with psycopg2 and rollbacks
489            cursor.execute("DELETE FROM system WHERE name = %s", row[1:])
490            cursor.execute("INSERT INTO system (value, name) VALUES (%s, %s)",
491                           row)
492
493    def _status_id(self, resource):
494        return 'fulltextsearch_%s' % _res_id(resource.resource)
495
496    # ITicketChangeListener methods
497    def ticket_created(self, ticket):
498        ticketsystem = TicketSystem(self.env)
499        resource_name = get_resource_shortname(self.env, ticket.resource)
500        resource_desc = ticketsystem.get_resource_description(ticket.resource,
501                                                              format='summary')
502        so = FullTextSearchObject(
503                self.project, ticket.resource,
504                title = u"%(title)s: %(message)s" % {'title': resource_name,
505                                                     'message': resource_desc},
506                author = ticket.values.get('reporter'),
507                changed = ticket.values.get('changetime'),
508                created = ticket.values.get('time'),
509                tags = ticket.values.get('keywords'),
510                involved = re.split(r'[;,\s]+', ticket.values.get('cc', ''))
511                           or ticket.values.get('reporter'),
512                popularity = 0, #FIXME
513                oneline = shorten_result(ticket.values.get('description', '')),
514                body = u'%r' % (ticket.values,),
515                comments = [t[4] for t in ticket.get_changelog()],
516                )
517        self.backend.create(so, quiet=True)
518        self._update_ticket(ticket)
519        self.log.debug("Ticket added for indexing: %s", ticket)
520
521    def ticket_changed(self, ticket, comment, author, old_values):
522        self.ticket_created(ticket)
523
524    def ticket_deleted(self, ticket):
525        so = FullTextSearchObject(self.project, ticket.resource)
526        self.backend.delete(so, quiet=True)
527        self._update_ticket(ticket)
528        self.log.debug("Ticket deleted; deleting from index: %s", ticket)
529
530    #IWikiChangeListener methods
531    def wiki_page_added(self, page):
532        history = list(page.get_history())
533        so = FullTextSearchObject(
534                self.project, page.resource,
535                title = u'%s: %s' % (page.name, shorten_line(page.text)),
536                author = page.author,
537                changed = page.time,
538                created = history[-1][1], # .time of oldest version
539                tags = self._page_tags(page.resource.realm, page.name),
540                involved = list(set(r[2] for r in history)),
541                popularity = 0, #FIXME
542                oneline = shorten_result(page.text),
543                body = page.text,
544                comments = [r[3] for r in history],
545                )
546        self.backend.create(so, quiet=True)
547        self._update_wiki(page)
548        self.log.debug("WikiPage created for indexing: %s", page.name)
549
550    def wiki_page_changed(self, page, version, t, comment, author, ipnr):
551        self.wiki_page_added(page)
552
553    def wiki_page_deleted(self, page):
554        so = FullTextSearchObject(self.project, page.resource)
555        self.backend.delete(so, quiet=True)
556        self._update_wiki(page)
557
558    def wiki_page_version_deleted(self, page):
559        #We don't care about old versions
560        pass
561
562    def wiki_page_renamed(self, page, old_name):
563        so = FullTextSearchObject(self.project, page.resource.realm, old_name)
564        self.backend.delete(so, quiet=True)
565        self.wiki_page_added(page)
566
567    def _page_tags(self, realm, page):
568        db = self.env.get_read_db()
569        cursor = db.cursor()
570        try:
571            cursor.execute('SELECT tag FROM tags '
572                           'WHERE tagspace=%s AND name=%s '
573                           'ORDER BY tag',
574                           (realm, page))
575        except Exception, e:
576            # Prior to Trac 0.13 errors from a wrapped cursor are returned as
577            # the native exceptions from the database library
578            # http://trac.edgewall.org/ticket/6348
579            # sqlite3 raises OperationalError instead of ProgrammingError if
580            # a queried table doesn't exist
581            # http://bugs.python.org/issue7394
582            # Following an error PostgresSQL requires that any transaction be
583            # rolled back before further commands/queries are executes
584            # psycopg2 raises InternalError to signal this
585            # http://initd.org/psycopg/docs/faq.html
586            if e.__class__.__name__ in ('ProgrammingError',
587                                        'OperationalError'):
588                db.rollback()
589                return iter([])
590            else:
591                raise e
592        return (tag for (tag,) in cursor)
593
594    #IAttachmentChangeListener methods
595    def attachment_added(self, attachment):
596        """Called when an attachment is added."""
597        if hasattr(attachment, 'version'):
598            history = list(attachment.get_history())
599            created = history[-1].date
600            involved = list(set(a.author for a in history))
601            comments = list(set(a.description for a in history
602                                if a.description))
603        else:
604            created = attachment.date
605            involved = attachment.author
606            comments = [attachment.description]
607        so = FullTextSearchObject(
608                self.project, attachment.resource,
609                title = attachment.title,
610                author = attachment.author,
611                changed = attachment.date,
612                created = created,
613                comments = comments,
614                involved = involved,
615                )
616        if attachment.size <= self.max_size:
617            try:
618                so.body = attachment.open()
619            except ResourceNotFound:
620                self.log.warning('Missing attachment file "%s" encountered '
621                                 'whilst indexing full text search',
622                                 attachment)
623        try:
624            self.backend.create(so, quiet=True)
625            self._update_attachment(attachment)
626        finally:
627            if hasattr(so.body, 'close'):
628                so.body.close()
629
630    def attachment_deleted(self, attachment):
631        """Called when an attachment is deleted."""
632        so = FullTextSearchObject(self.project, attachment.resource)
633        self.backend.delete(so, quiet=True)
634        self._update_attachment(attachment)
635
636    def attachment_reparented(self, attachment, old_parent_realm, old_parent_id):
637        """Called when an attachment is reparented."""
638        self.attachment_added(attachment)
639
640    #IMilestoneChangeListener methods
641    def milestone_created(self, milestone):
642        so = FullTextSearchObject(
643                self.project, milestone.resource,
644                title = u'%s: %s' % (milestone.name,
645                                     shorten_line(milestone.description)),
646                changed = milestone.completed or milestone.due
647                                              or datetime.now(utc),
648                involved = (),
649                popularity = 0, #FIXME
650                oneline = shorten_result(milestone.description),
651                body = milestone.description,
652                )
653        self.backend.create(so, quiet=True)
654        self.log.debug("Milestone created for indexing: %s", milestone)
655
656    def milestone_changed(self, milestone, old_values):
657        """
658        `old_values` is a dictionary containing the previous values of the
659        milestone properties that changed. Currently those properties can be
660        'name', 'due', 'completed', or 'description'.
661        """
662        self.milestone_created(milestone)
663
664    def milestone_deleted(self, milestone):
665        """Called when a milestone is deleted."""
666        so = FullTextSearchObject(self.project, milestone.resource)
667        self.backend.delete(so, quiet=True)
668
669    def _fill_so(self, changeset, node):
670        so = FullTextSearchObject(
671                self.project, node.resource,
672                title = node.path,
673                oneline = u'[%s]: %s' % (changeset.rev, shorten_result(changeset.message)),
674                comments = [changeset.message],
675                changed = node.get_last_modified(),
676                action = 'CREATE',
677                author = changeset.author,
678                created = changeset.date
679                )
680        if node.content_length <= self.max_size:
681            so.body = node.get_content()
682        return so
683
684    def _changes(self, repos, changeset):
685        for path, kind, change, base_path, base_rev in changeset.get_changes():
686            if change == Changeset.MOVE:
687                yield FullTextSearchObject(self.project, 'source', base_path,
688                                           repos.resource, action='DELETE')
689            elif change == Changeset.DELETE:
690                yield FullTextSearchObject(self.project, 'source', path,
691                                           repos.resource, action='DELETE')
692            if change in (Changeset.ADD, Changeset.EDIT, Changeset.COPY,
693                          Changeset.MOVE):
694                node = repos.get_node(path, changeset.rev)
695                yield self._fill_so(changeset, node)
696
697    #IRepositoryChangeListener methods
698    def changeset_added(self, repos, changeset):
699        """Called after a changeset has been added to a repository."""
700        #Index the commit message
701        so = FullTextSearchObject(
702                self.project, changeset.resource,
703                title=u'[%s]: %s' % (changeset.rev,
704                                       shorten_line(changeset.message)),
705                oneline=shorten_result(changeset.message),
706                body=changeset.message,
707                author=changeset.author,
708                created=changeset.date,
709                changed=changeset.date,
710                )
711        self.backend.create(so, quiet=True)
712        self._update_changeset(changeset)
713
714        # Index the file contents of this revision, a changeset can involve
715        # thousands of files - so submit in batches to avoid exceeding the
716        # available file handles
717        sos = (so for so in self._changes(repos, changeset))
718        for chunk in grouper(sos, 25):
719            try:
720                self.backend.add(chunk, quiet=True)
721                self.log.debug("Indexed %i repository changes at revision %i",
722                               len(chunk), changeset.rev)
723            finally:
724                for so in chunk:
725                    if hasattr(so.body, 'close'):
726                        so.body.close()
727
728    def changeset_modified(self, repos, changeset, old_changeset):
729        """Called after a changeset has been modified in a repository.
730
731        The `old_changeset` argument contains the metadata of the changeset
732        prior to the modification. It is `None` if the old metadata cannot
733        be retrieved.
734        """
735        #Hmm, I wonder if this is called instead of the above method or after
736        pass
737
738    # ISearchSource methods.
739
740    def get_search_filters(self, req):
741        return [(name, label, enabled)
742                for name, label, enabled, indexer in self._realms
743                if name in self.search_realms]
744
745    def get_search_results(self, req, terms, filters):
746        filters = self._check_filters(filters)
747        if not filters:
748            return []
749        try:
750            query, response = self._do_search(terms, filters)
751        except Exception, e:
752            self.log.error("Couldn't perform Full text search, falling back "
753                           "to built-in search sources: %s", e)
754            return self._do_fallback(req, terms, filters)
755        docs = (FullTextSearchObject(**doc) for doc in self._docs(query))
756        def _result(doc):
757            changed = doc.changed
758            href = get_resource_url(self.env, doc.resource, req.href)
759            title = doc.title or get_resource_shortname(self.env, doc.resource)
760            author = ", ".join(doc.author or [])
761            excerpt = doc.oneline or ''
762            return (href, title, changed, author, excerpt)
763        return [_result(doc) for doc in docs]
764
765    def _check_filters(self, filters):
766        """Return only the filters currently enabled for search.
767        """
768        return [f for f in filters if f in self.search_realms]
769
770    def _build_filter_query(self, si, filters):
771        """Return a SOLR filter query that matches any of the chosen filters
772        (realms).
773
774        The filter is of the form realm:realm1 OR realm:realm2 OR ...
775        """
776        Q = si.query().Q
777        my_filters = [f for f in filters if f in self.search_realms]
778        def rec(list1):
779            if len(list1) > 2:
780                return Q(realm=list1.pop()) | rec(list1)
781            elif len(list1) == 2:
782                return Q(realm=list1.pop()) | Q(realm=list1.pop())
783            elif len(list1) == 1:
784                return Q(realm=list1.pop())
785            else:
786                # NB A TypeError will be raised if this string is combined
787                #    with a LuceneQuery
788                return ""
789        return rec(my_filters[:])
790
791    def _do_search(self, terms, filters, facet='realm', sort_by=None,
792                                         field_limit=None):
793        try:
794            si = self.backend.si_class(self.solr_endpoint)
795        except:
796            raise
797
798        # Restrict search to chosen realms, if none of our filters were chosen
799        # then we won't have any results - return early, empty handed
800        # NB Also avoids TypeError if _build_filter_query() returns a string
801        filter_q = self._build_filter_query(si, filters)
802        if not filter_q:
803            return
804
805        # The index can store multiple projects, restrict results to this one
806        filter_q &= si.query().Q(project=self.project)
807
808        # Construct a query that searches for terms in docs that match chosen
809        # realms and current project
810        query = si.query(terms).filter(filter_q)
811
812        if facet:
813            query = query.facet_by(facet)
814        for field in sort_by or []:
815            query = query.sort_by(field)
816        if field_limit:
817            query = query.field_limit(field_limit)
818
819        # Submit the query to Solr, response contains the first 10 results
820        response = query.execute()
821        if facet:
822            self.log.debug("Facets: %s", response.facet_counts.facet_fields)
823
824        return query, response
825
826    def _docs(self, query, page_size=10):
827        """Return a generator of all the docs in query.
828        """
829        i = 0
830        while True:
831            response = query.paginate(start=i, rows=page_size).execute()
832            for doc in response:
833                yield doc
834            if len(response) < page_size:
835                break
836            i += page_size
837
838    def _do_fallback(self, req, terms, filters):
839        add_warning(req, _("Full text search is unavailable, some search "
840                           "results may be missing"))
841        # Based on SearchModule._do_search()
842        results = []
843        for name in self.env.config.getlist('search', 'disabled_sources'):
844            try:
845                source_class = self._fallbacks[name]
846            except KeyError:
847                continue
848            source = source_class(self.env)
849            results.extend(source.get_search_results(req, terms, filters)
850                           or [])
851        return results
Note: See TracBrowser for help on using the repository browser.