source: xmlrpcplugin/trunk/tracrpc/tests/__init__.py

Last change on this file was 18519, checked in by Jun Omae, 7 months ago

XmlRpcPlugin: suppress code style warnings (closes #14216)

File size: 11.3 KB
RevLine 
[6067]1# -*- coding: utf-8 -*-
2"""
3License: BSD
4
[13201]5(c) 2009-2013 ::: www.CodeResort.com - BV Network AS (simon-code@bvnetwork.no)
[6067]6"""
[18513]7__all__ = ()
[6067]8
[18513]9import base64
10import contextlib
[6064]11import os
[18513]12import socket
13import subprocess
14import sys
[6064]15import time
[18513]16import unittest
17
18if sys.version_info[0] == 2:
[18518]19    from urllib2 import BaseHandler, HTTPError, Request, build_opener, urlopen
[18513]20    from urllib import urlencode
[18229]21    from urlparse import urlparse, urlsplit
[18518]22    HTTPBasicAuthHandler = HTTPPasswordMgrWithPriorAuth = None
[18229]23else:
24    from urllib.error import HTTPError
[18513]25    from urllib.parse import urlencode, urlparse, urlsplit
[18229]26    from urllib.request import (HTTPBasicAuthHandler,
[18518]27                                HTTPPasswordMgrWithPriorAuth, Request,
[18513]28                                build_opener, urlopen)
[6064]29
[18513]30from trac.env import Environment
31from trac.util import create_file
32from trac.util.compat import close_fds
[13201]33
[18519]34from ..util import to_b
35
[18517]36try:
37    from trac.test import MockRequest
38except ImportError:
39    def MockRequest(env):
40        import io
41        from trac.test import MockPerm
42        from trac.util.datefmt import utc
43        from trac.web.api import Request as TracRequest
44        from trac.web.main import FakeSession
45        out = io.BytesIO()
46        environ = {'wsgi.url_scheme': 'http', 'wsgi.input': io.BytesIO(b''),
47                   'REQUEST_METHOD': 'GET', 'SERVER_NAME': 'example.org',
48                   'SERVER_PORT': 80, 'SCRIPT_NAME': '/trac',
49                   'trac.base_url': 'http://example.org/trac'}
50        start_response = lambda status, headers: out.write
51        req = TracRequest(environ, start_response)
52        req.callbacks.update({
53            'authname': lambda req: 'anonymous',
54            'perm': lambda req: MockPerm(),
55            'session': lambda req: FakeSession(),
56            'chrome': lambda req: {},
57            'tz': lambda req: utc,
58            'locale': lambda req: None,
59            'form_token': lambda req: 'A' * 20,
60        })
61        return req
[6064]62
[18517]63try:
64    from trac.test import rmtree
65except ImportError:
66    from shutil import rmtree
67
68
[18513]69def _get_topdir():
70    path = os.path.dirname(os.path.abspath(__file__))
71    suffix = '/tracrpc/tests'.replace('/', os.sep)
72    if not path.endswith(suffix):
73        raise RuntimeError("%r doesn't end with %r" % (path, suffix))
74    return path[:-len(suffix)]
[6064]75
76
[18513]77def _get_testdir():
78    dir_ = os.environ.get('TMP') or _get_topdir()
79    if not os.path.isabs(dir_):
80        raise RuntimeError('Non absolute directory: %s' % repr(dir_))
81    return os.path.join(dir_, 'rpctestenv')
[6064]82
83
[18518]84if HTTPPasswordMgrWithPriorAuth:
85    def _build_opener_auth(url, user, password):
86        manager = HTTPPasswordMgrWithPriorAuth()
87        manager.add_password(None, url, user, password, is_authenticated=True)
88        handler = HTTPBasicAuthHandler(manager)
89        return build_opener(handler)
90else:
91    class HTTPBasicAuthPriorHandler(BaseHandler):
92
93        def __init__(self, url, user, password):
94            self.url = url
95            self.user = user
96            self.password = password
97
98        def http_request(self, request):
99            if not request.has_header('Authorization') and \
100                    self.url == request.get_full_url():
101                cred = '%s:%s' % (self.user, self.password)
102                encoded = b64encode(to_b(cred))
103                request.add_header('Authorization', 'Basic ' + encoded)
104            return request
105
106    def _build_opener_auth(url, user, password):
107        handler = HTTPBasicAuthPriorHandler(url, user, password)
108        return build_opener(handler)
109
110
[18513]111class RpcTestEnvironment(object):
[6069]112
[18513]113    _testdir = _get_testdir()
114    _plugins_dir = os.path.join(_testdir, 'plugins')
115    _devnull = None
116    _log = None
117    _port = None
118    _envpath = None
119    _htpasswd = None
120    _env = None
121    _tracd = None
122    url = None
123    url_anon = None
124    url_auth = None
125    url_user = None
126    url_admin = None
[6064]127
[18513]128    def __init__(self):
129        if os.path.isdir(self._testdir):
130            rmtree(self._testdir)
131        os.mkdir(self._testdir)
132        os.mkdir(self._plugins_dir)
[13203]133
[18513]134    @property
135    def tracdir(self):
136        return self._envpath
[6064]137
[18513]138    def init(self):
139        self._devnull = os.open(os.devnull, os.O_RDWR)
140        self._log = os.open(os.path.join(self._testdir, 'tracd.log'),
141                            os.O_WRONLY | os.O_CREAT | os.O_APPEND)
142        self._port = get_ephemeral_port()
143        self.check_call([sys.executable, 'setup.py', 'develop', '-mxd',
144                         self._plugins_dir])
145        self._envpath = os.path.join(self._testdir, 'trac')
146        self.url = 'http://127.0.0.1:%d/%s' % \
147                   (self._port, os.path.basename(self._envpath))
148        self._htpasswd = os.path.join(self._testdir, 'htpasswd.txt')
149        create_file(self._htpasswd,
150                    'admin:$apr1$CJoMFGDO$W5ERyxnTl6qAUa9BbE0QV1\n'
151                    'user:$apr1$ZQuTwNFe$ReYgDiL/gduTvjO29qdYx0\n')
152        inherit = os.path.join(self._testdir, 'inherit.ini')
153        with open(inherit, 'w') as f:
154            f.write('[inherit]\n'
155                    'plugins_dir = %s\n'
156                    '[components]\n'
157                    'tracrpc.* = enabled\n'
158                    '[logging]\n'
159                    'log_type = file\n'
160                    'log_level = INFO\n'
161                    '[trac]\n'
162                    'base_url = %s\n' %
163                    (self._plugins_dir, self.url))
164        args = [sys.executable, '-m', 'trac.admin.console', self._envpath]
165        with self.popen(args, stdin=subprocess.PIPE) as proc:
166            proc.stdin.write(
167                b'initenv --inherit=%s project sqlite:db/trac.db\n'
168                b'permission add admin TRAC_ADMIN\n'
169                b'permission add anonymous XML_RPC\n'
[18517]170                % to_b(inherit))
[18513]171        self.url_anon = '%s/rpc' % self.url
172        self.url_auth = '%s/login/rpc' % self.url
173        self.url_user = '%s/login/xmlrpc' % \
174                        self.url.replace('://', '://user:user@')
175        self.url_admin = '%s/login/xmlrpc' % \
176                         self.url.replace('://', '://admin:admin@')
177        self.start()
[7916]178
[18513]179    def cleanup(self):
180        self.stop()
181        if self._env:
182            self._env.shutdown()
183            self._env = None
184        if self._devnull is not None:
185            os.close(self._devnull)
186            self._devnull = None
187        if self._log is not None:
188            os.close(self._log)
189            self._log = None
190
191    def start(self):
192        if self._tracd and self._tracd.returncode is None:
193            raise RuntimeError('tracd is running')
194        args = [
195            sys.executable, '-m', 'trac.web.standalone',
196            '--port=%d' % self._port,
197            '--basic-auth=*,%s,realm' % self._htpasswd, self._envpath,
198        ]
199        self._tracd = self.popen(args, stdout=self._log, stderr=self._log)
200        start = time.time()
201        while time.time() - start < 10:
202            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
[7916]203            try:
[18513]204                s.connect(('127.0.0.1', self._port))
205            except socket.error:
206                time.sleep(0.125)
207            else:
208                break
209            finally:
210                s.close()
211        else:
212            raise RuntimeError('Timed out waiting for tracd to start')
[7916]213
[18513]214    def stop(self):
215        if self._tracd:
216            try:
217                self._tracd.terminate()
218            except EnvironmentError:
219                pass
220            self._tracd.wait()
221            self._tracd = None
[7916]222
[18513]223    def restart(self):
224        self.stop()
225        self.start()
[7916]226
[18513]227    def popen(self, *args, **kwargs):
228        kwargs.setdefault('stdin', self._devnull)
229        kwargs.setdefault('stdout', self._devnull)
230        kwargs.setdefault('stderr', self._devnull)
231        kwargs.setdefault('close_fds', close_fds)
232        return Popen(*args, **kwargs)
233
234    def check_call(self, *args, **kwargs):
235        kwargs.setdefault('stdin', self._devnull)
236        kwargs.setdefault('stdout', subprocess.PIPE)
237        kwargs.setdefault('stderr', subprocess.PIPE)
238        with self.popen(*args, **kwargs) as proc:
239            stdout, stderr = proc.communicate()
240            if proc.returncode != 0:
241                raise RuntimeError('Exited with %d (stdout %r, stderr %r)' %
242                                   (proc.returncode, stdout, stderr))
243
244    def get_trac_environment(self):
245        if not self._env:
246            self._env = Environment(self._envpath)
247        return self._env
248
249    def _tracadmin(self, *args):
250        self.check_call((sys.executable, '-m', 'trac.admin.console',
251                         self._envpath) + args)
252
253
254if hasattr(subprocess.Popen, '__enter__'):
255    Popen = subprocess.Popen
256else:
257    class Popen(subprocess.Popen):
258
259        def __enter__(self):
260            return self
261
262        def __exit__(self, *args):
263            try:
264                if self.stdin:
265                    self.stdin.close()
266            finally:
267                self.wait()
268            for f in (self.stdout, self.stderr):
269                if f:
270                    f.close()
271
272
273def get_ephemeral_port():
274    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
275    try:
276        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
277        s.bind(('127.0.0.1', 0))
278        s.listen(1)
279        return s.getsockname()[1]
280    finally:
281        s.close()
282
283
284_rpc_testenv = None
285
286
287def _new_testenv():
288    global _rpc_testenv
289    if not _rpc_testenv:
290        _rpc_testenv = RpcTestEnvironment()
291        _rpc_testenv.init()
292
293
294def _del_testenv():
295    global _rpc_testenv
296    if _rpc_testenv:
297        _rpc_testenv.cleanup()
298        _rpc_testenv = None
299
300
301class TracRpcTestCase(unittest.TestCase):
302
303    @property
304    def _testenv(self):
305        return _rpc_testenv
306
[18518]307    def _opener_auth(self, url, user, password):
308        return _build_opener_auth(url, user, password)
309
[18513]310    @contextlib.contextmanager
311    def _plugin(self, source, filename):
312        filename = os.path.join(_rpc_testenv.tracdir, 'plugins', filename)
313        create_file(filename, source)
314        try:
315            _rpc_testenv.restart()
316            yield
317        finally:
318            os.unlink(filename)
319            _rpc_testenv.restart()
320
321    def _grant_perm(self, username, *actions):
322        _rpc_testenv._tracadmin('permission', 'add', username, *actions)
323        _rpc_testenv.restart()
324
325    def _revoke_perm(self, username, *actions):
326        _rpc_testenv._tracadmin('permission', 'remove', username, *actions)
327        _rpc_testenv.restart()
328
329
330class TracRpcTestSuite(unittest.TestSuite):
331
332    def run(self, result):
333        if _rpc_testenv:
334            created = False
335        else:
336            _new_testenv()
337            created = True
338        try:
339            return super(TracRpcTestSuite, self).run(result)
340        finally:
341            if created:
342                _del_testenv()
343
344
345def b64encode(s):
346    if not isinstance(s, bytes):
347        s = s.encode('utf-8')
348    rv = base64.b64encode(s)
349    if isinstance(rv, bytes):
350        rv = rv.decode('ascii')
351    return rv
352
353
354def form_urlencoded(data):
[18517]355    return to_b(urlencode(data))
[18513]356
357
358def makeSuite(testCaseClass, suiteClass=unittest.TestSuite):
359    loader = unittest.TestLoader()
360    loader.suiteClass = suiteClass
361    return loader.loadTestsFromTestCase(testCaseClass)
362
363
364def test_suite():
365    suite = TracRpcTestSuite()
366    from . import api, xml_rpc, json_rpc, ticket, wiki, web_ui, search
367    for mod in (api, xml_rpc, json_rpc, ticket, wiki, web_ui, search):
368        suite.addTest(mod.test_suite())
369    return suite
Note: See TracBrowser for help on using the repository browser.