source: announcerplugin/trunk/announcer/distributors/xmppd.py

Last change on this file was 16900, checked in by Ryan J Ollos, 7 years ago

TracAnnouncer 1.2.0dev: Use IEmailAddressResolver from Trac 1.2

Refs #12120.

File size: 9.8 KB
Line 
1# -*- coding: utf-8 -*-
2#
3# Copyright (c) 2010, Robert Corsaro
4# Copyright (c) 2012, Steffen Hoffmann
5#
6# This software is licensed as described in the file COPYING, which
7# you should have received as part of this distribution.
8#
9
10import Queue
11import time
12import threading
13
14from trac.config import Option, BoolOption, IntOption, OrderedExtensionsOption
15from trac.core import Component, ExtensionPoint, implements
16from trac.notification.api import IEmailAddressResolver
17from xmpp import Client
18from xmpp.protocol import Message, JID
19
20from announcer.api import (
21    _, IAnnouncementDistributor, IAnnouncementFormatter,
22    IAnnouncementPreferenceProvider, IAnnouncementProducer)
23from announcer.resolvers import SpecifiedXmppResolver
24from announcer.util.settings import SubscriptionSetting
25
26
27class XmppDistributor(Component):
28    """Distribute announcements to XMPP clients."""
29
30    implements(IAnnouncementDistributor)
31
32    formatters = ExtensionPoint(IAnnouncementFormatter)
33
34    resolvers = OrderedExtensionsOption('announcer', 'xmpp_resolvers',
35        IEmailAddressResolver, 'SpecifiedXmppResolver',
36        """Comma seperated list of xmpp resolver components in the order
37        they will be called.  If an xmpp address is resolved, the remaining
38        resolvers will no be called.
39        """)
40
41    default_format = Option('announcer', 'default_xmpp_format',
42        'text/plain', """Default format for xmpp messages.""")
43
44    server = Option('xmpp', 'server', None,
45        """XMPP server hostname to use for jabber notifications.""")
46
47    port = IntOption('xmpp', 'port', 5222,
48        """XMPP server port to use for jabber notification.""")
49
50    user = Option('xmpp', 'user', 'trac@localhost',
51        """Sender address to use in xmpp message.""")
52
53    resource = Option('xmpp', 'resource', 'TracAnnouncerPlugin',
54        """Sender resource to use in xmpp message.""")
55
56    password = Option('xmpp', 'password', None,
57        """Password for XMPP server.""")
58
59    use_threaded_delivery = BoolOption('announcer', 'use_threaded_delivery',
60        False,
61        """If true, the actual delivery of the message will occur
62        in a separate thread.  Enabling this will improve responsiveness
63        for requests that end up with an announcement being sent over
64        email. It requires building Python with threading support
65        enabled-- which is usually the case. To test, start Python and
66        type 'import threading' to see if it raises an error.
67        """)
68
69    def __init__(self):
70        self.connections = {}
71        self.delivery_queue = None
72        self.xmpp_format_setting = \
73            SubscriptionSetting(self.env, 'xmpp_format', self.default_format)
74
75    def get_delivery_queue(self):
76        if not self.delivery_queue:
77            self.delivery_queue = Queue.Queue()
78            thread = DeliveryThread(self.delivery_queue, self.send)
79            thread.start()
80        return self.delivery_queue
81
82    # IAnnouncementDistributor methods
83
84    def transports(self):
85        yield 'xmpp'
86
87    def distribute(self, transport, recipients, event):
88        self.log.info("XmppDistributor called")
89        if transport != 'xmpp':
90            return
91        fmtdict = self._formats(transport, event.realm)
92        if not fmtdict:
93            self.log.error("XmppDistributor No formats found for %s %s",
94                           transport, event.realm)
95            return
96        msgdict = {}
97        for name, authed, address in recipients:
98            fmt = name and self._get_preferred_format(name, event.realm)
99            old_fmt = fmt
100            if fmt not in fmtdict:
101                self.log.debug("XmppDistributor format %s not available "
102                               "for %s %s, looking for an alternative",
103                               fmt, transport, event.realm)
104                # If the fmt is not available for this realm, then try to find
105                # an alternative
106                fmt = None
107                for f in fmtdict.values():
108                    fmt = f.alternative_style_for(
109                        transport, event.realm, old_fmt)
110                    if fmt:
111                        break
112            if not fmt:
113                self.log.error("XmppDistributor was unable to find a "
114                               "formatter for format %s", old_fmt)
115                continue
116            # TODO:  This won't work with multiple distributors
117            # rslvr = None
118            # figure out what the addr should be if it's not defined
119            # for rslvr in self.resolvers:
120            #    addr = rslvr.get_address_for_name(name, authed)
121            #    if addr: break
122            resolver = SpecifiedXmppResolver(self.env)
123            address = resolver.get_address_for_session(name, authed)
124            if address:
125                self.log.debug("XmppDistributor found the address '%s' for "
126                               "'%s (%s)' via: %s", address, name, authed and
127                               'authenticated' or 'not authenticated',
128                               resolver.__class__.__name__)
129                # ok, we found an addr, add the message
130                msgdict.setdefault(fmt, set()).add((name, authed, address))
131            else:
132                self.log.debug("XmppDistributor was unable to find an "
133                               "address for: %s (%s)", name, authed and
134                               'authenticated' or 'not authenticated')
135        for k, v in msgdict.items():
136            fmt = fmtdict.get(k)
137            if not v or not fmt:
138                continue
139            self.log.debug("XmppDistributor is sending event as '%s' to: %s",
140                           fmt, ', '.join(x[2] for x in v))
141            self._do_send(transport, event, k, v, fmt)
142
143    def _formats(self, transport, realm):
144        """Find valid formats for transport and realm."""
145        formats = {}
146        for f in self.formatters:
147            for style in f.styles(transport, realm):
148                formats[style] = f
149        self.log.debug("XmppDistributor has found the following formats "
150                       "capable of handling '%s' of '%s': %s", transport,
151                       realm, ', '.join(formats.keys()))
152        if not formats:
153            self.log.error("XmppDistributor is unable to continue without "
154                           "supporting formatters.")
155        return formats
156
157    def _get_preferred_format(self, sid, realm=None):
158        if realm:
159            name = 'xmpp_format_%s' % realm
160        else:
161            name = 'xmpp_format'
162        SubscriptionSetting(self.env, name, self.xmpp_format_setting.default)
163        return self.xmpp_format_setting.get_user_setting(sid)[0]
164
165    def _do_send(self, transport, event, format, recipients, formatter):
166        message = formatter.format(transport, event.realm, format, event)
167
168        package = (recipients, message)
169
170        start = time.time()
171        if self.use_threaded_delivery:
172            self.get_delivery_queue().put(package)
173        else:
174            self.send(*package)
175        stop = time.time()
176        self.log.debug("XmppDistributor took %s seconds to send.",
177                       round(stop - start, 2))
178
179    def send(self, recipients, message):
180        """Send message to recipients via xmpp."""
181        jid = JID(self.user)
182        if self.server:
183            server = self.server
184        else:
185            server = jid.getDomain()
186        cl = Client(server, port=self.port, debug=[])
187        if not cl.connect():
188            raise IOError("Couldn't connect to xmpp server %s" % server)
189        if not cl.auth(jid.getNode(), self.password, resource=self.resource):
190            cl.Connection.disconnect()
191            raise IOError("Xmpp auth erro using %s to %s", jid, server)
192        for recip in recipients:
193            cl.send(Message(recip[2], message))
194
195
196class XmppPreferencePanel(Component):
197
198    implements(IAnnouncementPreferenceProvider)
199
200    formatters = ExtensionPoint(IAnnouncementFormatter)
201    producers = ExtensionPoint(IAnnouncementProducer)
202    distributors = ExtensionPoint(IAnnouncementDistributor)
203
204    def get_announcement_preference_boxes(self, req):
205        yield 'xmpp', _("XMPP Formats")
206
207    def render_announcement_preference_box(self, req, panel):
208        supported_realms = {}
209        for producer in self.producers:
210            for realm in producer.realms():
211                for distributor in self.distributors:
212                    for transport in distributor.transports():
213                        for fmtr in self.formatters:
214                            for style in fmtr.styles(transport, realm):
215                                if realm not in supported_realms:
216                                    supported_realms[realm] = set()
217                                supported_realms[realm].add(style)
218
219        settings = {}
220        for realm in supported_realms:
221            name = 'xmpp_format_%s' % realm
222            dist = XmppDistributor(self.env).xmpp_format_setting.default
223            settings[realm] = SubscriptionSetting(self.env, name, dist)
224        if req.method == 'POST':
225            for realm, setting in settings.items():
226                name = 'xmpp_format_%s' % realm
227                setting.set_user_setting(req.session, req.args.get(name),
228                                         save=False)
229            req.session.save()
230        prefs = {}
231        for realm, setting in settings.items():
232            prefs[realm] = setting.get_user_setting(req.session.sid)[0]
233        data = dict(
234            realms=supported_realms,
235            preferences=prefs,
236        )
237        return 'prefs_announcer_xmpp.html', data
238
239
240class DeliveryThread(threading.Thread):
241    def __init__(self, queue, sender):
242        threading.Thread.__init__(self)
243        self._sender = sender
244        self._queue = queue
245        self.setDaemon(True)
246
247    def run(self):
248        while 1:
249            send_from, recipients, message = self._queue.get()
250            self._sender(send_from, recipients, message)
Note: See TracBrowser for help on using the repository browser.