1 | # -*- coding: utf-8 -*- |
---|
2 | # |
---|
3 | # Copyright (c) 2010, Robert Corsaro |
---|
4 | # Copyright (c) 2012, Steffen Hoffmann |
---|
5 | # |
---|
6 | # This software is licensed as described in the file COPYING, which |
---|
7 | # you should have received as part of this distribution. |
---|
8 | # |
---|
9 | |
---|
10 | import Queue |
---|
11 | import time |
---|
12 | import threading |
---|
13 | |
---|
14 | from trac.config import Option, BoolOption, IntOption, OrderedExtensionsOption |
---|
15 | from trac.core import Component, ExtensionPoint, implements |
---|
16 | from trac.notification.api import IEmailAddressResolver |
---|
17 | from xmpp import Client |
---|
18 | from xmpp.protocol import Message, JID |
---|
19 | |
---|
20 | from announcer.api import ( |
---|
21 | _, IAnnouncementDistributor, IAnnouncementFormatter, |
---|
22 | IAnnouncementPreferenceProvider, IAnnouncementProducer) |
---|
23 | from announcer.resolvers import SpecifiedXmppResolver |
---|
24 | from announcer.util.settings import SubscriptionSetting |
---|
25 | |
---|
26 | |
---|
27 | class XmppDistributor(Component): |
---|
28 | """Distribute announcements to XMPP clients.""" |
---|
29 | |
---|
30 | implements(IAnnouncementDistributor) |
---|
31 | |
---|
32 | formatters = ExtensionPoint(IAnnouncementFormatter) |
---|
33 | |
---|
34 | resolvers = OrderedExtensionsOption('announcer', 'xmpp_resolvers', |
---|
35 | IEmailAddressResolver, 'SpecifiedXmppResolver', |
---|
36 | """Comma seperated list of xmpp resolver components in the order |
---|
37 | they will be called. If an xmpp address is resolved, the remaining |
---|
38 | resolvers will no be called. |
---|
39 | """) |
---|
40 | |
---|
41 | default_format = Option('announcer', 'default_xmpp_format', |
---|
42 | 'text/plain', """Default format for xmpp messages.""") |
---|
43 | |
---|
44 | server = Option('xmpp', 'server', None, |
---|
45 | """XMPP server hostname to use for jabber notifications.""") |
---|
46 | |
---|
47 | port = IntOption('xmpp', 'port', 5222, |
---|
48 | """XMPP server port to use for jabber notification.""") |
---|
49 | |
---|
50 | user = Option('xmpp', 'user', 'trac@localhost', |
---|
51 | """Sender address to use in xmpp message.""") |
---|
52 | |
---|
53 | resource = Option('xmpp', 'resource', 'TracAnnouncerPlugin', |
---|
54 | """Sender resource to use in xmpp message.""") |
---|
55 | |
---|
56 | password = Option('xmpp', 'password', None, |
---|
57 | """Password for XMPP server.""") |
---|
58 | |
---|
59 | use_threaded_delivery = BoolOption('announcer', 'use_threaded_delivery', |
---|
60 | False, |
---|
61 | """If true, the actual delivery of the message will occur |
---|
62 | in a separate thread. Enabling this will improve responsiveness |
---|
63 | for requests that end up with an announcement being sent over |
---|
64 | email. It requires building Python with threading support |
---|
65 | enabled-- which is usually the case. To test, start Python and |
---|
66 | type 'import threading' to see if it raises an error. |
---|
67 | """) |
---|
68 | |
---|
69 | def __init__(self): |
---|
70 | self.connections = {} |
---|
71 | self.delivery_queue = None |
---|
72 | self.xmpp_format_setting = \ |
---|
73 | SubscriptionSetting(self.env, 'xmpp_format', self.default_format) |
---|
74 | |
---|
75 | def get_delivery_queue(self): |
---|
76 | if not self.delivery_queue: |
---|
77 | self.delivery_queue = Queue.Queue() |
---|
78 | thread = DeliveryThread(self.delivery_queue, self.send) |
---|
79 | thread.start() |
---|
80 | return self.delivery_queue |
---|
81 | |
---|
82 | # IAnnouncementDistributor methods |
---|
83 | |
---|
84 | def transports(self): |
---|
85 | yield 'xmpp' |
---|
86 | |
---|
87 | def distribute(self, transport, recipients, event): |
---|
88 | self.log.info("XmppDistributor called") |
---|
89 | if transport != 'xmpp': |
---|
90 | return |
---|
91 | fmtdict = self._formats(transport, event.realm) |
---|
92 | if not fmtdict: |
---|
93 | self.log.error("XmppDistributor No formats found for %s %s", |
---|
94 | transport, event.realm) |
---|
95 | return |
---|
96 | msgdict = {} |
---|
97 | for name, authed, address in recipients: |
---|
98 | fmt = name and self._get_preferred_format(name, event.realm) |
---|
99 | old_fmt = fmt |
---|
100 | if fmt not in fmtdict: |
---|
101 | self.log.debug("XmppDistributor format %s not available " |
---|
102 | "for %s %s, looking for an alternative", |
---|
103 | fmt, transport, event.realm) |
---|
104 | # If the fmt is not available for this realm, then try to find |
---|
105 | # an alternative |
---|
106 | fmt = None |
---|
107 | for f in fmtdict.values(): |
---|
108 | fmt = f.alternative_style_for( |
---|
109 | transport, event.realm, old_fmt) |
---|
110 | if fmt: |
---|
111 | break |
---|
112 | if not fmt: |
---|
113 | self.log.error("XmppDistributor was unable to find a " |
---|
114 | "formatter for format %s", old_fmt) |
---|
115 | continue |
---|
116 | # TODO: This won't work with multiple distributors |
---|
117 | # rslvr = None |
---|
118 | # figure out what the addr should be if it's not defined |
---|
119 | # for rslvr in self.resolvers: |
---|
120 | # addr = rslvr.get_address_for_name(name, authed) |
---|
121 | # if addr: break |
---|
122 | resolver = SpecifiedXmppResolver(self.env) |
---|
123 | address = resolver.get_address_for_session(name, authed) |
---|
124 | if address: |
---|
125 | self.log.debug("XmppDistributor found the address '%s' for " |
---|
126 | "'%s (%s)' via: %s", address, name, authed and |
---|
127 | 'authenticated' or 'not authenticated', |
---|
128 | resolver.__class__.__name__) |
---|
129 | # ok, we found an addr, add the message |
---|
130 | msgdict.setdefault(fmt, set()).add((name, authed, address)) |
---|
131 | else: |
---|
132 | self.log.debug("XmppDistributor was unable to find an " |
---|
133 | "address for: %s (%s)", name, authed and |
---|
134 | 'authenticated' or 'not authenticated') |
---|
135 | for k, v in msgdict.items(): |
---|
136 | fmt = fmtdict.get(k) |
---|
137 | if not v or not fmt: |
---|
138 | continue |
---|
139 | self.log.debug("XmppDistributor is sending event as '%s' to: %s", |
---|
140 | fmt, ', '.join(x[2] for x in v)) |
---|
141 | self._do_send(transport, event, k, v, fmt) |
---|
142 | |
---|
143 | def _formats(self, transport, realm): |
---|
144 | """Find valid formats for transport and realm.""" |
---|
145 | formats = {} |
---|
146 | for f in self.formatters: |
---|
147 | for style in f.styles(transport, realm): |
---|
148 | formats[style] = f |
---|
149 | self.log.debug("XmppDistributor has found the following formats " |
---|
150 | "capable of handling '%s' of '%s': %s", transport, |
---|
151 | realm, ', '.join(formats.keys())) |
---|
152 | if not formats: |
---|
153 | self.log.error("XmppDistributor is unable to continue without " |
---|
154 | "supporting formatters.") |
---|
155 | return formats |
---|
156 | |
---|
157 | def _get_preferred_format(self, sid, realm=None): |
---|
158 | if realm: |
---|
159 | name = 'xmpp_format_%s' % realm |
---|
160 | else: |
---|
161 | name = 'xmpp_format' |
---|
162 | SubscriptionSetting(self.env, name, self.xmpp_format_setting.default) |
---|
163 | return self.xmpp_format_setting.get_user_setting(sid)[0] |
---|
164 | |
---|
165 | def _do_send(self, transport, event, format, recipients, formatter): |
---|
166 | message = formatter.format(transport, event.realm, format, event) |
---|
167 | |
---|
168 | package = (recipients, message) |
---|
169 | |
---|
170 | start = time.time() |
---|
171 | if self.use_threaded_delivery: |
---|
172 | self.get_delivery_queue().put(package) |
---|
173 | else: |
---|
174 | self.send(*package) |
---|
175 | stop = time.time() |
---|
176 | self.log.debug("XmppDistributor took %s seconds to send.", |
---|
177 | round(stop - start, 2)) |
---|
178 | |
---|
179 | def send(self, recipients, message): |
---|
180 | """Send message to recipients via xmpp.""" |
---|
181 | jid = JID(self.user) |
---|
182 | if self.server: |
---|
183 | server = self.server |
---|
184 | else: |
---|
185 | server = jid.getDomain() |
---|
186 | cl = Client(server, port=self.port, debug=[]) |
---|
187 | if not cl.connect(): |
---|
188 | raise IOError("Couldn't connect to xmpp server %s" % server) |
---|
189 | if not cl.auth(jid.getNode(), self.password, resource=self.resource): |
---|
190 | cl.Connection.disconnect() |
---|
191 | raise IOError("Xmpp auth erro using %s to %s", jid, server) |
---|
192 | for recip in recipients: |
---|
193 | cl.send(Message(recip[2], message)) |
---|
194 | |
---|
195 | |
---|
196 | class XmppPreferencePanel(Component): |
---|
197 | |
---|
198 | implements(IAnnouncementPreferenceProvider) |
---|
199 | |
---|
200 | formatters = ExtensionPoint(IAnnouncementFormatter) |
---|
201 | producers = ExtensionPoint(IAnnouncementProducer) |
---|
202 | distributors = ExtensionPoint(IAnnouncementDistributor) |
---|
203 | |
---|
204 | def get_announcement_preference_boxes(self, req): |
---|
205 | yield 'xmpp', _("XMPP Formats") |
---|
206 | |
---|
207 | def render_announcement_preference_box(self, req, panel): |
---|
208 | supported_realms = {} |
---|
209 | for producer in self.producers: |
---|
210 | for realm in producer.realms(): |
---|
211 | for distributor in self.distributors: |
---|
212 | for transport in distributor.transports(): |
---|
213 | for fmtr in self.formatters: |
---|
214 | for style in fmtr.styles(transport, realm): |
---|
215 | if realm not in supported_realms: |
---|
216 | supported_realms[realm] = set() |
---|
217 | supported_realms[realm].add(style) |
---|
218 | |
---|
219 | settings = {} |
---|
220 | for realm in supported_realms: |
---|
221 | name = 'xmpp_format_%s' % realm |
---|
222 | dist = XmppDistributor(self.env).xmpp_format_setting.default |
---|
223 | settings[realm] = SubscriptionSetting(self.env, name, dist) |
---|
224 | if req.method == 'POST': |
---|
225 | for realm, setting in settings.items(): |
---|
226 | name = 'xmpp_format_%s' % realm |
---|
227 | setting.set_user_setting(req.session, req.args.get(name), |
---|
228 | save=False) |
---|
229 | req.session.save() |
---|
230 | prefs = {} |
---|
231 | for realm, setting in settings.items(): |
---|
232 | prefs[realm] = setting.get_user_setting(req.session.sid)[0] |
---|
233 | data = dict( |
---|
234 | realms=supported_realms, |
---|
235 | preferences=prefs, |
---|
236 | ) |
---|
237 | return 'prefs_announcer_xmpp.html', data |
---|
238 | |
---|
239 | |
---|
240 | class DeliveryThread(threading.Thread): |
---|
241 | def __init__(self, queue, sender): |
---|
242 | threading.Thread.__init__(self) |
---|
243 | self._sender = sender |
---|
244 | self._queue = queue |
---|
245 | self.setDaemon(True) |
---|
246 | |
---|
247 | def run(self): |
---|
248 | while 1: |
---|
249 | send_from, recipients, message = self._queue.get() |
---|
250 | self._sender(send_from, recipients, message) |
---|