source: ldapplugin/0.12/ldapplugin/api.py @ 16527

Last change on this file since 16527 was 16527, checked in by Ryan J Ollos, 7 years ago

Fix indentation

  • Property svn:eol-style set to native
File size: 27.4 KB
Line 
1# -*- coding: utf-8 -*-
2#
3# LDAP permission extensions for Trac
4#
5# Copyright (C) 2003-2006 Edgewall Software
6# Copyright (C) 2005-2006 Emmanuel Blot <emmanuel.blot@free.fr>
7# All rights reserved.
8#
9# This software is licensed as described in the file COPYING, which
10# you should have received as part of this distribution. The terms
11# are also available at http://trac.edgewall.com/license.html.
12#
13# This software consists of voluntary contributions made by many
14# individuals. For the exact contribution history, see the revision
15# history and logs, available at http://projects.edgewall.com/trac/.
16#
17# Warning: this plug in has not been extensively tested, and may have security
18# issues. Do not use this plugin on production servers where security is
19# a concern.
20# Requires Python-LDAP, available from http://python-ldap.sourceforge.net
21#
22
23import ldap
24import re
25import time
26
27from trac.core import *
28from trac.perm import IPermissionGroupProvider, IPermissionStore
29from trac.util.text import exception_to_unicode
30
31LDAP_MODULE_CONFIG = [ 'enable', 'permfilter',
32                       'global_perms', 'manage_groups'
33                       'cache_ttl', 'cache_size',
34                       'group_bind', 'store_bind',
35                       'user_rdn', 'group_rdn' ]
36
37LDAP_DIRECTORY_PARAMS = [ 'host', 'port', 'use_tls', 'basedn',
38                          'bind_user', 'bind_passwd',
39                          'groupname', 'groupmember', 'groupmemberisdn',
40                          'groupattr', 'uidattr', 'permattr']
41
42GROUP_PREFIX = '@'
43
44# regular expression to explode a DN into a (attr, rdn, basedn)
45DN_RE = re.compile(r'^(?P<attr>.+?)=(?P<rdn>.+?),(?P<base>.+)$')
46
47
48class LdapPermissionGroupProvider(Component):
49    """
50    Provides permission groups from a LDAP directory
51    """
52    implements(IPermissionGroupProvider)
53
54    def __init__(self, ldap=None):
55        # looks for groups only if LDAP support is enabled
56        self.enabled = self.config.getbool('ldap', 'enable')
57        if not self.enabled:
58            return
59        self.util = LdapUtil(self.config)
60        # LDAP connection
61        self._ldap = ldap
62        # LDAP connection config
63        self._ldapcfg = {}
64        for name,value in self.config.options('ldap'):
65            if name in LDAP_DIRECTORY_PARAMS:
66                self._ldapcfg[str(name)] = value
67        # user entry local cache
68        self._cache = {}
69        # max time to live for a cache entry
70        self._cache_ttl = int(self.config.get('ldap', 'cache_ttl', str(15*60)))
71        # max cache entries
72        self._cache_size = min(25, int(self.config.get('ldap', 'cache_size',
73                                                       '100')))
74
75    # IPermissionProvider interface
76
77    def get_permission_groups(self, username):
78        """Return a list of names of the groups that the user with the
79        specified name is a member of."""
80
81        # anonymous and authenticated groups are set with the default provider
82        groups = []
83        if not self.enabled:
84            return groups
85
86        # stores the current time for the request (used for the cache)
87        current_time = time.time()
88
89        # test for if username in the cache
90        if username in self._cache:
91            # cache hit
92            lut, groups = self._cache[username]
93
94            # ensures that the cache is not too old
95            if current_time < lut+self._cache_ttl:
96                # sources the cache
97                # cache lut is not updated to ensure
98                # it is refreshed on a regular basis
99                self.env.log.debug('cached (%s): %s' % \
100                                   (username, ','.join(groups)))
101                return groups
102
103        # cache miss (either not found or too old)
104        if not self._ldap:
105            # new LDAP connection
106            bind = self.config.getbool('ldap', 'group_bind')
107            self._ldap = LdapConnection(self.env.log, bind, **self._ldapcfg)
108
109        # retrieves the user groups from LDAP
110        ldapgroups = self._get_user_groups(username)
111        # if some group is found
112        if ldapgroups:
113            # tests for cache size
114            if len(self._cache) >= self._cache_size:
115                # the cache is becoming too large, discards
116                # the less recently uses entries
117                cache_keys = self._cache.keys()
118                cache_keys.sort(lambda x,y: cmp(self._cache[x][0],
119                                                self._cache[y][0]))
120                # discards the 5% oldest
121                old_keys = cache_keys[:(5*self._cache_size)/100]
122                for k in old_keys:
123                    del self._cache[k]
124        else:
125            # deletes the cache if there's no group for this user
126            # for debug, until a failed LDAP connection returns an error...
127            if username in self._cache:
128                del self._cache[username]
129
130        # updates the cache
131        self._cache[username] = [current_time, ldapgroups]
132
133        # returns the user groups
134        groups.extend(ldapgroups)
135        if groups:
136            self.env.log.debug('groups: ' + ','.join(groups))
137
138        return groups
139
140    def flush_cache(self, username=None):
141        """Invalidate the entire cache or a named entry"""
142        if username is None:
143            self._cache = {}
144        elif self._cache.has_key(username):
145            del self._cache[username]
146
147    # Private API
148
149    def _get_user_groups(self, username):
150        """Returns a list of all groups a user belongs to"""
151        ldap_groups = self._ldap.get_groups()
152        groups = []
153        for group in ldap_groups:
154            if self._ldap.is_in_group(self.util.user_attrdn(username), group):
155                m = DN_RE.search(group)
156                if m:
157                    groupname = GROUP_PREFIX + m.group('rdn')
158                    if groupname not in groups:
159                        groups.append(groupname)
160        return groups
161
162
163class LdapPermissionStore(Component):
164    """
165    Stores and manages permissions with a LDAP directory backend
166    """
167    implements(IPermissionStore)
168
169    group_providers = ExtensionPoint(IPermissionGroupProvider)
170
171    def __init__(self, ldap=None):
172        # looks for groups only if LDAP support is enabled
173        self.enabled = self.config.getbool('ldap', 'enable')
174        if not self.enabled:
175            return
176        self.util = LdapUtil(self.config)
177        # LDAP connection
178        self._ldap = ldap
179        # LDAP connection config
180        self._ldapcfg = {}
181        for name,value in self.config.options('ldap'):
182            if name in LDAP_DIRECTORY_PARAMS:
183                self._ldapcfg[str(name)] = value
184        # user entry local cache
185        self._cache = {}
186        # max time to live for a cache entry
187        self._cache_ttl = int(self.config.get('ldap', 'cache_ttl', str(15*60)))
188        # max cache entries
189        cache_size = self.config.get('ldap', 'cache_size', '100')
190        self._cache_size = min(25, int(cache_size))
191        # environment name
192        envpath = self.env.path.replace('\\','/')
193        self.env_name = envpath[1+envpath.rfind('/'):]
194        # use directory-wide permissions
195        self.global_perms = self.config.getbool('ldap', 'global_perms')
196        self.manage_groups = self.config.getbool('ldap', 'manage_groups')
197
198    # IPermissionStore interface
199
200    def get_user_permissions(self, username):
201        """Retrieves the user permissions from the LDAP directory"""
202        if not self.enabled:
203            raise TracError("LdapPermissionStore is not enabled")
204        actions = self._get_cache_actions(username)
205        if not actions:
206            users = [username]
207            for provider in self.group_providers:
208                users += list(provider.get_permission_groups(username))
209            for user in users:
210                uid = self.util.create_dn(user)
211                for action in self._get_permissions(uid):
212                    if action not in actions:
213                        actions.append(action)
214
215            self.env.log.debug('new: %s' % actions)
216            self._update_cache_actions(username, actions)
217        perms = {}
218        for action in actions:
219            perms[action] = True
220        return perms
221
222    def get_users_with_permissions(self, permissions):
223        """Retrieve a list of users that have any of the specified permissions.
224        """
225        users = set([u[0] for u in self.env.get_known_users()])
226        result = set()
227        for user in users:
228            userperms = self.get_user_permissions(user)
229            for group in permissions:
230                if group in userperms:
231                    result.add(user)
232        return list(result)
233
234    def get_all_permissions(self):
235        """Retrieve the permissions for all users from the LDAP directory"""
236        # do not use the cache as this method is only used for administration
237        # tasks, not for runtime
238        if not self.enabled:
239            raise TracError("LdapPermissionStore is not enabled")
240        perms = []
241        filterstr = self.config.get('ldap', 'permfilter', 'objectclass=*')
242        basedn = self.config.get('ldap','basedn','').encode('ascii')
243        self._openldap()
244        dns = self._ldap.get_dn(basedn, filterstr.encode('ascii'))
245        permusers = []
246        for dn in dns:
247            user = self.util.extract_user_from_dn(dn)
248            if not user or user in permusers: continue
249            permusers.append(user)
250            self.log.debug("permission for %s (%s)" % (user, dn))
251            actions = self._ldap.get_attribute(dn, self._ldap.permattr)
252            for action in actions:
253                xaction = self._extract_action(action)
254                if not xaction:
255                    continue
256                perms.append((user, xaction))
257            if self.manage_groups:
258                for provider in self.group_providers:
259                    if isinstance(provider, LdapPermissionGroupProvider):
260                        for group in provider.get_permission_groups(user):
261                            perms.append((user, group))
262        return perms
263
264    def grant_permission(self, username, action):
265        """Store the new permission for the user in the LDAP directory"""
266        if not self.enabled:
267            raise TracError("LdapPermissionStore is not enabled")
268        if self.manage_groups and self.util.is_group(action):
269            self._flush_group_cache(username)
270            self._add_user_to_group(username.encode('ascii'), action)
271            return
272        uid = self.util.create_dn(username.encode('ascii'))
273        try:
274            permlist = self._get_permissions(uid)
275            action = action.encode('ascii')
276            if action not in permlist:
277                xaction = self._build_action(action)
278                self._ldap.add_attribute(uid, self._ldap.permattr, xaction)
279            if self.util.is_group(username):
280                # flush the cache as group dependencies are not known
281                self.flush_cache()
282            else:
283                self.flush_cache(username)
284                self._add_cache_actions(username, [action])
285        except ldap.LDAPError, e:
286            raise TracError, "Unable to grant permission %s to %s: %s" \
287                             % (action, username, e[0]['desc'])
288
289    def revoke_permission(self, username, action):
290        """Remove the permission for the user from the LDAP directory"""
291        if not self.enabled:
292            raise TracError("LdapPermissionStore is not enabled")
293        if self.manage_groups and self.util.is_group(action):
294            self._flush_group_cache(username)
295            self._remove_user_from_group(username.encode('ascii'), action)
296            return
297        uid = self.util.create_dn(username.encode('ascii'))
298        try:
299            permlist = self._get_permissions(uid)
300            if action in permlist:
301                action = action.encode('ascii')
302                xaction = self._build_action(action)
303                self._ldap.delete_attribute(uid, self._ldap.permattr, xaction)
304                if self.util.is_group(username):
305                    # flush the cache as group dependencies are not known
306                    self.flush_cache()
307                else:
308                    self.flush_cache(username)
309                    self._del_cache_actions(username, [action])
310        except ldap.LDAPError, e:
311            kind = self.global_perms and 'global' or 'project'
312            raise TracError, "Unable to revoke %s permission %s from %s: %s" \
313                             % (kind, action, username, e[0]['desc'])
314
315    # Private implementation
316
317    def _openldap(self):
318        """Open a new connection to the LDAP directory"""
319        if self._ldap is None:
320            bind = self.config.getbool('ldap', 'store_bind')
321            self._ldap = LdapConnection(self.env.log, bind, **self._ldapcfg)
322
323    def _get_permissions(self, uid):
324        """Retrieves the permissions from the LDAP directory"""
325        self._openldap()
326        actions = self._ldap.get_attribute(uid, self._ldap.permattr)
327        perms = []
328        for action in actions:
329            if action not in perms:
330                xaction = self._extract_action(action)
331                if xaction:
332                    perms.append(xaction)
333        return perms
334
335    def _extract_action(self, action):
336        """Filters the actions (global or per-project action)"""
337        items = action.split(':')
338        if len(items) == 1:
339            # no environment, consider global
340            return action
341        (name, xaction) = items
342        if name == self.env_name:
343            # environment, check it
344            return xaction
345        return None
346
347    def _build_action(self, action):
348        """Creates a global or per-project LDAP action"""
349        if self.global_perms:
350            return action
351        return "%s:%s" % (self.env_name, action)
352
353    def _add_user_to_group(self, user, group):
354        groupdn = self.util.create_dn(group)
355        userdn = self.util.create_dn(user)
356        self._openldap()
357        try:
358            self._ldap.add_attribute(groupdn, self._ldap.groupmember, userdn)
359            self.log.info("user %s added to group %s" % (user, group))
360        except ldap.TYPE_OR_VALUE_EXISTS, e:
361            # already in group, can safely ignore
362            self.log.debug("user %s already member of %s" % (user, group))
363            return
364        except ldap.LDAPError, e:
365            raise TracError, e[0]['desc']
366
367    def _remove_user_from_group(self, user, group):
368        groupdn = self.util.create_dn(group)
369        userdn = self.util.create_dn(user)
370        self._openldap()
371        try:
372            self._ldap.delete_attribute(groupdn, self._ldap.groupmember,
373                                        userdn)
374            self.log.info("user %s removed from group %s" % (user, group))
375        except ldap.OBJECT_CLASS_VIOLATION, e:
376            # probable cause is an empty group
377            raise TracError, "Ldap error (group %s would be emptied?)" % group
378        except ldap.LDAPError, e:
379            raise TracError, e[0]['desc']
380
381    def _get_cache_actions(self, username):
382        """Retrieves the user permissions from the cache, if any"""
383        if username in self._cache:
384            lut, actions = self._cache[username]
385            if time.time() < lut+self._cache_ttl:
386                self.env.log.debug('cached (%s): %s' % \
387                                   (username, ','.join(actions)))
388                return actions
389        return []
390
391    def _add_cache_actions(self, username, newactions):
392        """Add new user actions into the cache"""
393        self._cleanup_cache()
394        if username in self._cache:
395            lut, actions = self._cache[username]
396            for action in newactions:
397                if action not in actions:
398                    actions.append(action)
399            self._cache[username] = [time.time(), actions]
400        else:
401            self._cache[username] = [time.time(), newactions]
402
403    def _del_cache_actions(self, username, delactions):
404        """Remove user actions from the cache"""
405        if not username in self._cache:
406            return
407        lut, actions = self._cache[username]
408        newactions = []
409        for action in actions:
410            if action not in delactions:
411                newactions.append(action)
412        if len(newactions) == 0:
413            del self._cache[username]
414        else:
415            self._cache[username] = [time.time(), newactions]
416
417    def _update_cache_actions(self, username, actions):
418        """Set the cache entry for the user with the new actions"""
419        # if not action, delete the cache entry
420        if len(actions) == 0:
421            if username in self._cache:
422                del self._cache[username]
423            return
424        self._cleanup_cache()
425        # overwrite the cache entry with the new actions
426        self._cache[username] = [time.time(), actions]
427
428    def _cleanup_cache(self):
429        """Make sure the cache is not full or discard oldest entries"""
430        # if cache is full, removes the LRU entries
431        if len(self._cache) >= self._cache_size:
432            cache_keys = self._cache.keys()
433            cache_keys.sort(lambda x,y: cmp(self._cache[x][0],
434                                            self._cache[y][0]))
435            old_keys = cache_keys[:(5*self._cache_size)/100]
436            self.log.info("flushing %d cache entries" % len(old_keys))
437            for k in old_keys:
438                del self._cache[k]
439
440    def flush_cache(self, username=None):
441        """Delete all entries in the cache"""
442        if username is None:
443            self._cache = {}
444        elif self._cache.has_key(username):
445            del self._cache[username]
446        # we also need to flush the LDAP permission group provider
447        self._flush_group_cache(username)
448
449    def _flush_group_cache(self, username=None):
450        """Flush the group cache (if in use)"""
451        if self.manage_groups:
452            for provider in self.group_providers:
453                if isinstance(provider, LdapPermissionGroupProvider):
454                    provider.flush_cache(username)
455
456
457class LdapUtil(object):
458    """Utilities for LDAP data management"""
459
460    def __init__(self, config):
461        for k, default in [('groupattr', 'cn'),
462                           ('uidattr', 'uid'),
463                           ('basedn', None),
464                           ('user_rdn', None),
465                           ('group_rdn', None)]:
466            v = config.get('ldap', k, default)
467            if v: v = v.encode('ascii').lower()
468            self.__setattr__(k, v)
469
470    def is_group(self, username):
471        return username.startswith(GROUP_PREFIX)
472
473    def create_dn(self, username):
474        """Create a user or group LDAP DN from his/its name"""
475        if username.startswith(GROUP_PREFIX):
476            return self.group_attrdn(username[len(GROUP_PREFIX):])
477        else:
478            return self.user_attrdn(username)
479
480    def group_attrdn(self, group):
481        """Build the dn for a group"""
482        if self.group_rdn:
483            return "%s=%s,%s,%s" % \
484                   (self.groupattr, group, self.group_rdn, self.basedn)
485        else:
486            return "%s=%s,%s" % (self.groupattr, group, self.basedn)
487
488    def user_attrdn(self, user):
489        """Build the dn for a user"""
490        if self.user_rdn:
491            return "%s=%s,%s,%s" % \
492                   (self.uidattr, user, self.user_rdn, self.basedn)
493        else:
494            return "%s=%s,%s" % (self.uidattr, user, self.basedn)
495
496    def extract_user_from_dn(self, dn):
497        m = DN_RE.search(dn)
498        if m:
499            sub = m.group('base').lower()
500            basednlen = len(self.basedn)
501            if sub[len(sub)-basednlen:].lower() != self.basedn:
502                return None
503            rdn = sub[:-basednlen-1]
504            if rdn == self.group_rdn:
505                if m.group('attr').lower() == self.groupattr:
506                    return GROUP_PREFIX + m.group('rdn')
507            elif rdn == self.user_rdn:
508                if m.group('attr').lower() == self.uidattr:
509                    return m.group('rdn')
510        return None
511
512
513class LdapConnection(object):
514    """
515    Wrapper class for the LDAP directory
516    Use only synchronous LDAP calls
517    """
518
519    _BOOL_VAL = ['groupmemberisdn', 'use_tls']
520    _INT_VAL  = ['port']
521
522    def __init__(self, log, bind=False, **ldap):
523        self.log = log
524        self.bind = bind
525        self.host = 'localhost'
526        self.port = None
527        self.groupname = 'groupofnames'
528        self.groupmember = 'member'
529        self.groupattr = 'cn'
530        self.uidattr = 'uid'
531        self.permattr = 'tracperm'
532        self.bind_user = None
533        self.bind_passwd = None
534        self.basedn = None
535        self.groupmemberisdn = True
536        self.use_tls = False
537        for k, v in ldap.items():
538            if k in LdapConnection._BOOL_VAL:
539                self.__setattr__(k, as_bool(v))
540            elif k in LdapConnection._INT_VAL:
541                self.__setattr__(k, int(v))
542            else:
543                if isinstance(v, unicode):
544                    v = v.encode('ascii')
545                self.__setattr__(k, v)
546        if self.basedn is None:
547            raise TracError, "No basedn is defined"
548        if self.port is None:
549            self.port = self.use_tls and 636 or 389
550
551    def close(self):
552        """Close the connection with the LDAP directory"""
553        self._ds.unbind_s()
554        self._ds = None
555
556    def get_groups(self):
557        """Return a list of available group dns"""
558        groups = self.get_dn(self.basedn, 'objectclass=' + self.groupname)
559        return groups
560
561    def is_in_group(self, userdn, groupdn):
562        """Tell whether the uid is member of the group"""
563        if self.groupmemberisdn:
564            udn = userdn
565        else:
566            m = re.match('[^=]+=([^,]+)', userdn)
567            if m is None:
568                self.log.warn('Malformed userdn: %s' % userdn)
569                return False
570            udn = m.group(1)
571        for attempt in range(2):
572            cr = self._compare(groupdn, self.groupmember, udn)
573            if self._ds:
574                return cr
575        return False
576
577    def get_dn(self, basedn, filterstr):
578        """Return a list of dns that satisfy the LDAP filter"""
579        dns = []
580        for attempt in range(2):
581            sr = self._search(basedn, filterstr, ['dn'], ldap.SCOPE_SUBTREE)
582            if sr:
583                for (dn, attrs) in sr:
584                    dns.append(dn)
585                break
586            if self._ds:
587                break
588        return dns
589
590    def get_attribute(self, dn, attr):
591        """Return the values of the attribute of the dn entry"""
592        attributes = [ attr ]
593        (filt, base) = dn.split(',', 1)
594        values = []
595        for attempt in range(2):
596            sr = self._search(base, filterstr=filt, attributes=attributes)
597            if sr:
598                for (dn, attrs) in sr:
599                    if attrs.has_key(attr):
600                        values = attrs[attr]
601                break
602            if self._ds:
603                break
604        return values
605
606    def add_attribute(self, dn, attr, value):
607        """Add a new value to the attribute of the dn entry"""
608        try:
609            if not self.__dict__.has_key('_ds') or not self.__dict__['_ds']:
610                self._open()
611            self._ds.modify_s(dn, [(ldap.MOD_ADD, attr, value)])
612        except ldap.LDAPError, e:
613            self.log.error("unable to add attribute '%s' to uid '%s': %s" %
614                           (attr, dn, e[0]['desc']))
615            self._ds = False
616            raise e
617
618    def delete_attribute(self, dn, attr, value):
619        """Remove all attributes that match the value from the dn entry"""
620        try:
621            if not self.__dict__.has_key('_ds') or not self.__dict__['_ds']:
622                self._open()
623            self._ds.modify_s(dn, [(ldap.MOD_DELETE, attr, value)])
624        except ldap.LDAPError, e:
625            self.log.error("unable to remove attribute '%s' from uid '%s': %s" %
626                           (attr, dn, e[0]['desc']))
627            self._ds = False
628            raise e
629
630    def _open(self):
631        """Open and optionnally bind a new connection to the LDAP directory"""
632        try:
633            if self.use_tls:
634                ldap.set_option(ldap.OPT_REFERRALS, 0)
635                ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, \
636                                ldap.OPT_X_TLS_NEVER)
637                protocol = 'ldaps'
638            else:
639                protocol = 'ldap'
640            self._ds = ldap.initialize('%s://%s:%d/' % \
641                                       (protocol, self.host, self.port))
642            self._ds.protocol_version = ldap.VERSION3
643            if self.bind:
644                if not self.bind_user:
645                    raise TracError("Bind enabled but credentials not defined")
646                head = self.bind_user[:self.bind_user.find(',')]
647                if ( head.find('=') == -1 ):
648                    self.bind_user = '%s=%s' % (self.uidattr, self.bind_user)
649                self._ds.simple_bind_s(self.bind_user, self.bind_passwd)
650            else:
651                self._ds.simple_bind_s()
652        except ldap.LDAPError, e:
653            self._ds = None
654            traceback = exception_to_unicode(e, traceback=True)
655            if self.bind_user:
656                self.log.warning("Unable to open LDAP with user %s%s",
657                                 self.bind_user, traceback)
658            else:
659                self.log.warning("Unable to open LDAP%s", traceback)
660            raise TracError("Unable to open LDAP cnx: %s"
661                            % exception_to_unicode(e))
662
663    def _search(self, basedn, filterstr='(objectclass=*)', attributes=None,
664                scope=ldap.SCOPE_ONELEVEL):
665        """Search the LDAP directory"""
666        try:
667            if not self.__dict__.has_key('_ds') or not self.__dict__['_ds']:
668                self._open()
669            sr = self._ds.search_s(basedn, scope, filterstr, attributes)
670            return sr
671        except ldap.NO_SUCH_OBJECT, e:
672            self.log.warn("LDAP error: %s (%s)", e[0]['desc'], basedn)
673            return False;
674        except ldap.LDAPError, e:
675            self.log.error("LDAP error: %s", e[0]['desc'])
676            self._ds = False
677            return False;
678
679    def _compare(self, dn, attribute, value):
680        """Compare the attribute value of a LDAP DN"""
681        try:
682            if not self.__dict__.has_key('_ds') or not self.__dict__['_ds']:
683                self._open()
684            cr = self._ds.compare_s(dn, attribute, value)
685            return cr
686        except ldap.NO_SUCH_OBJECT, e:
687            self.log.warn("LDAP error: %s (%s)", e[0]['desc'], dn)
688            return False;
689        except ldap.LDAPError, e:
690            self.log.error("LDAP error: %s", e[0]['desc'])
691            self._ds = False
692            return False
693
694
695# as_bool copied from Trac 1.2 for compatibility. Can be removed when
696# support for Trac < 0.12.2 is dropped and trac.util.as_bool used instead.
697
698def as_bool(value, default=False):
699    """Convert the given value to a `bool`.
700
701    If `value` is a string, return `True` for any of "yes", "true",
702    "enabled", "on" or non-zero numbers, ignoring case. For non-string
703    arguments, return the argument converted to a `bool`, or `default`
704    if the conversion fails.
705
706    :since 1.2: the `default` argument can be specified.
707    """
708    if isinstance(value, basestring):
709        try:
710            return bool(float(value))
711        except ValueError:
712            value = value.strip().lower()
713            if value in ('yes', 'true', 'enabled', 'on'):
714                return True
715            elif value in ('no', 'false', 'disabled', 'off'):
716                return False
717            else:
718                return default
719    try:
720        return bool(value)
721    except (TypeError, ValueError):
722        return default
Note: See TracBrowser for help on using the repository browser.