| 1 | # -*- coding: utf-8 -*- |
|---|
| 2 | # |
|---|
| 3 | # Copyright (C) 2011 Odd Simon Simonsen <oddsimons@gmail.com> |
|---|
| 4 | # Copyright (C) 2012 Steffen Hoffmann <hoff.st@web.de> |
|---|
| 5 | # |
|---|
| 6 | # This software is licensed as described in the file COPYING, which |
|---|
| 7 | # you should have received as part of this distribution. |
|---|
| 8 | # |
|---|
| 9 | |
|---|
| 10 | import shutil |
|---|
| 11 | import tempfile |
|---|
| 12 | import unittest |
|---|
| 13 | |
|---|
| 14 | from trac.db import Table, Column, Index |
|---|
| 15 | from trac.db.api import DatabaseManager |
|---|
| 16 | from trac.test import EnvironmentStub |
|---|
| 17 | |
|---|
| 18 | from tractags import db_default |
|---|
| 19 | from tractags.db import TagSetup |
|---|
| 20 | |
|---|
| 21 | |
|---|
| 22 | class TagSetupTestCase(unittest.TestCase): |
|---|
| 23 | |
|---|
| 24 | def setUp(self): |
|---|
| 25 | self.env = EnvironmentStub(enable=['trac.*']) |
|---|
| 26 | self.env.path = tempfile.mkdtemp() |
|---|
| 27 | self.db_mgr = DatabaseManager(self.env) |
|---|
| 28 | |
|---|
| 29 | def tearDown(self): |
|---|
| 30 | self.env.shutdown() |
|---|
| 31 | shutil.rmtree(self.env.path) |
|---|
| 32 | |
|---|
| 33 | # Helpers |
|---|
| 34 | |
|---|
| 35 | def _get_cursor_description(self, cursor): |
|---|
| 36 | return cursor.cursor.description |
|---|
| 37 | |
|---|
| 38 | def _revert_tractags_schema_init(self): |
|---|
| 39 | with self.env.db_transaction as db: |
|---|
| 40 | db("DROP TABLE IF EXISTS tags") |
|---|
| 41 | db("DROP TABLE IF EXISTS tags_change") |
|---|
| 42 | db("DELETE FROM system WHERE name='tags_version'") |
|---|
| 43 | db("DELETE FROM permission WHERE action %s" % db.like(), |
|---|
| 44 | ('TAGS_%',)) |
|---|
| 45 | |
|---|
| 46 | def get_db_version(self): |
|---|
| 47 | for version, in self.env.db_query(""" |
|---|
| 48 | SELECT value FROM system |
|---|
| 49 | WHERE name='tags_version' |
|---|
| 50 | """): |
|---|
| 51 | return int(version) |
|---|
| 52 | |
|---|
| 53 | # Tests |
|---|
| 54 | |
|---|
| 55 | def test_new_install(self): |
|---|
| 56 | setup = TagSetup(self.env) |
|---|
| 57 | # Current tractags schema is setup with enabled component anyway. |
|---|
| 58 | # Revert these changes for clean install testing. |
|---|
| 59 | self._revert_tractags_schema_init() |
|---|
| 60 | self.assertEquals(0, setup.get_schema_version()) |
|---|
| 61 | self.assertTrue(setup.environment_needs_upgrade()) |
|---|
| 62 | |
|---|
| 63 | setup.upgrade_environment() |
|---|
| 64 | self.assertFalse(setup.environment_needs_upgrade()) |
|---|
| 65 | with self.env.db_query as db: |
|---|
| 66 | cursor = db.cursor() |
|---|
| 67 | cursor.execute("SELECT * FROM tags") |
|---|
| 68 | cols = [col[0] for col in self._get_cursor_description(cursor)] |
|---|
| 69 | self.assertEquals([], cursor.fetchall()) |
|---|
| 70 | self.assertEquals(['tagspace', 'name', 'tag'], cols) |
|---|
| 71 | self.assertEquals(db_default.schema_version, self.get_db_version()) |
|---|
| 72 | |
|---|
| 73 | def test_upgrade_schema_v1(self): |
|---|
| 74 | # Ancient, unversioned schema - wiki only. |
|---|
| 75 | schema = [ |
|---|
| 76 | Table('wiki_namespace')[ |
|---|
| 77 | Column('name'), |
|---|
| 78 | Column('namespace'), |
|---|
| 79 | Index(['name', 'namespace']), |
|---|
| 80 | ] |
|---|
| 81 | ] |
|---|
| 82 | setup = TagSetup(self.env) |
|---|
| 83 | # Current tractags schema is setup with enabled component anyway. |
|---|
| 84 | # Revert these changes for clean install testing. |
|---|
| 85 | self._revert_tractags_schema_init() |
|---|
| 86 | |
|---|
| 87 | connector = self.db_mgr.get_connector()[0] |
|---|
| 88 | with self.env.db_transaction as db: |
|---|
| 89 | for table in schema: |
|---|
| 90 | for stmt in connector.to_sql(table): |
|---|
| 91 | db(stmt) |
|---|
| 92 | # Populate table with migration test data. |
|---|
| 93 | db("""INSERT INTO wiki_namespace (name, namespace) |
|---|
| 94 | VALUES ('WikiStart', 'tag')""") |
|---|
| 95 | |
|---|
| 96 | tags = self.env.db_query("SELECT * FROM wiki_namespace") |
|---|
| 97 | self.assertEquals([('WikiStart', 'tag')], tags) |
|---|
| 98 | self.assertEquals(1, setup.get_schema_version()) |
|---|
| 99 | self.assertTrue(setup.environment_needs_upgrade()) |
|---|
| 100 | |
|---|
| 101 | setup.upgrade_environment() |
|---|
| 102 | self.assertFalse(setup.environment_needs_upgrade()) |
|---|
| 103 | with self.env.db_query as db: |
|---|
| 104 | cursor = db.cursor() |
|---|
| 105 | cursor.execute("SELECT * FROM tags") |
|---|
| 106 | tags = cursor.fetchall() |
|---|
| 107 | cols = [col[0] for col in self._get_cursor_description(cursor)] |
|---|
| 108 | # Db content should be migrated. |
|---|
| 109 | self.assertEquals([('wiki', 'WikiStart', 'tag')], tags) |
|---|
| 110 | self.assertEquals(['tagspace', 'name', 'tag'], cols) |
|---|
| 111 | self.assertEquals(db_default.schema_version, self.get_db_version()) |
|---|
| 112 | |
|---|
| 113 | def test_upgrade_schema_v2(self): |
|---|
| 114 | # Just register a current, but unversioned schema. |
|---|
| 115 | schema = [ |
|---|
| 116 | Table('tags', key=('tagspace', 'name', 'tag'))[ |
|---|
| 117 | Column('tagspace'), |
|---|
| 118 | Column('name'), |
|---|
| 119 | Column('tag'), |
|---|
| 120 | Index(['tagspace', 'name']), |
|---|
| 121 | Index(['tagspace', 'tag']), |
|---|
| 122 | ] |
|---|
| 123 | ] |
|---|
| 124 | setup = TagSetup(self.env) |
|---|
| 125 | # Current tractags schema is setup with enabled component anyway. |
|---|
| 126 | # Revert these changes for clean install testing. |
|---|
| 127 | self._revert_tractags_schema_init() |
|---|
| 128 | |
|---|
| 129 | connector = self.db_mgr.get_connector()[0] |
|---|
| 130 | with self.env.db_transaction as db: |
|---|
| 131 | for table in schema: |
|---|
| 132 | for stmt in connector.to_sql(table): |
|---|
| 133 | db(stmt) |
|---|
| 134 | # Populate table with test data. |
|---|
| 135 | db("""INSERT INTO tags (tagspace, name, tag) |
|---|
| 136 | VALUES ('wiki', 'WikiStart', 'tag')""") |
|---|
| 137 | |
|---|
| 138 | tags = self.env.db_query("SELECT * FROM tags") |
|---|
| 139 | self.assertEquals([('wiki', 'WikiStart', 'tag')], tags) |
|---|
| 140 | self.assertEquals(2, setup.get_schema_version()) |
|---|
| 141 | self.assertTrue(setup.environment_needs_upgrade()) |
|---|
| 142 | |
|---|
| 143 | setup.upgrade_environment() |
|---|
| 144 | self.assertFalse(setup.environment_needs_upgrade()) |
|---|
| 145 | with self.env.db_query as db: |
|---|
| 146 | cursor = db.cursor() |
|---|
| 147 | cursor.execute("SELECT * FROM tags") |
|---|
| 148 | tags = cursor.fetchall() |
|---|
| 149 | cols = [col[0] for col in self._get_cursor_description(cursor)] |
|---|
| 150 | # Db should be unchanged. |
|---|
| 151 | self.assertEquals([('wiki', 'WikiStart', 'tag')], tags) |
|---|
| 152 | self.assertEquals(['tagspace', 'name', 'tag'], cols) |
|---|
| 153 | self.assertEquals(db_default.schema_version, self.get_db_version()) |
|---|
| 154 | |
|---|
| 155 | def test_upgrade_schema_v3(self): |
|---|
| 156 | # Add table for tag change records to the schema. |
|---|
| 157 | schema = [ |
|---|
| 158 | Table('tags', key=('tagspace', 'name', 'tag'))[ |
|---|
| 159 | Column('tagspace'), |
|---|
| 160 | Column('name'), |
|---|
| 161 | Column('tag'), |
|---|
| 162 | Index(['tagspace', 'name']), |
|---|
| 163 | Index(['tagspace', 'tag']), |
|---|
| 164 | ] |
|---|
| 165 | ] |
|---|
| 166 | setup = TagSetup(self.env) |
|---|
| 167 | # Current tractags schema is setup with enabled component anyway. |
|---|
| 168 | # Revert these changes for clean install testing. |
|---|
| 169 | self._revert_tractags_schema_init() |
|---|
| 170 | |
|---|
| 171 | connector = self.db_mgr.get_connector()[0] |
|---|
| 172 | with self.env.db_transaction as db: |
|---|
| 173 | for table in schema: |
|---|
| 174 | for stmt in connector.to_sql(table): |
|---|
| 175 | db(stmt) |
|---|
| 176 | # Preset system db table with old version. |
|---|
| 177 | db("""INSERT INTO system (name, value) |
|---|
| 178 | VALUES ('tags_version', '3')""") |
|---|
| 179 | |
|---|
| 180 | self.assertEquals(3, setup.get_schema_version()) |
|---|
| 181 | self.assertTrue(setup.environment_needs_upgrade()) |
|---|
| 182 | |
|---|
| 183 | setup.upgrade_environment() |
|---|
| 184 | self.assertFalse(setup.environment_needs_upgrade()) |
|---|
| 185 | with self.env.db_query as db: |
|---|
| 186 | cursor = db.cursor() |
|---|
| 187 | cursor.execute("SELECT * FROM tags_change") |
|---|
| 188 | cols = [col[0] for col in self._get_cursor_description(cursor)] |
|---|
| 189 | self.assertEquals(['tagspace', 'name', 'time', 'author', |
|---|
| 190 | 'oldtags', 'newtags'], cols) |
|---|
| 191 | self.assertEquals(db_default.schema_version, self.get_db_version()) |
|---|
| 192 | |
|---|
| 193 | |
|---|
| 194 | def test_suite(): |
|---|
| 195 | suite = unittest.TestSuite() |
|---|
| 196 | suite.addTest(unittest.makeSuite(TagSetupTestCase)) |
|---|
| 197 | return suite |
|---|
| 198 | |
|---|
| 199 | |
|---|
| 200 | if __name__ == '__main__': |
|---|
| 201 | unittest.main(defaultTest='test_suite') |
|---|