1*9c5db199SXin Li#!/usr/bin/python3 2*9c5db199SXin Li 3*9c5db199SXin Liimport unittest 4*9c5db199SXin Liimport common 5*9c5db199SXin Lifrom autotest_lib.client.common_lib.test_utils import mock 6*9c5db199SXin Lifrom autotest_lib.database import migrate, db_utils 7*9c5db199SXin Li 8*9c5db199SXin Liclass UtilsTest(unittest.TestCase): 9*9c5db199SXin Li 10*9c5db199SXin Li EXISTS_QUERY_BASE = ('SELECT table_name FROM information_schema.%s ' 11*9c5db199SXin Li 'WHERE table_schema = %%s') 12*9c5db199SXin Li DB_NAME = 'test_db' 13*9c5db199SXin Li 14*9c5db199SXin Li 15*9c5db199SXin Li def setUp(self): 16*9c5db199SXin Li self.god = mock.mock_god() 17*9c5db199SXin Li self.manager = self.god.create_mock_class(migrate.MigrationManager, 18*9c5db199SXin Li 'manager') 19*9c5db199SXin Li 20*9c5db199SXin Li self.god.stub_function(self.manager, 'execute') 21*9c5db199SXin Li self.god.stub_function(self.manager, 'get_db_name') 22*9c5db199SXin Li 23*9c5db199SXin Li 24*9c5db199SXin Li def tearDown(self): 25*9c5db199SXin Li self.god.unstub_all() 26*9c5db199SXin Li 27*9c5db199SXin Li 28*9c5db199SXin Li def test_check_exists(self): 29*9c5db199SXin Li views = ('view1', 'view2') 30*9c5db199SXin Li def _call_check_exists(): 31*9c5db199SXin Li db_utils.check_exists(self.manager, views, db_utils.VIEW_TYPE) 32*9c5db199SXin Li 33*9c5db199SXin Li self._setup_exists_expects(views, 'VIEWS') 34*9c5db199SXin Li _call_check_exists() 35*9c5db199SXin Li self.god.check_playback() 36*9c5db199SXin Li 37*9c5db199SXin Li self._setup_exists_expects(('view1',), 'VIEWS') 38*9c5db199SXin Li self.assertRaises(Exception, _call_check_exists) 39*9c5db199SXin Li self.god.check_playback() 40*9c5db199SXin Li 41*9c5db199SXin Li 42*9c5db199SXin Li def test_drop_views(self): 43*9c5db199SXin Li views = ('view1', 'view2') 44*9c5db199SXin Li self._setup_exists_expects(views, 'VIEWS') 45*9c5db199SXin Li 46*9c5db199SXin Li for view in views: 47*9c5db199SXin Li self.manager.execute.expect_call('DROP VIEW `%s`' % view) 48*9c5db199SXin Li 49*9c5db199SXin Li db_utils.drop_views(self.manager, views) 50*9c5db199SXin Li self.god.check_playback() 51*9c5db199SXin Li 52*9c5db199SXin Li 53*9c5db199SXin Li def test_rename(self): 54*9c5db199SXin Li mapping = { 55*9c5db199SXin Li 'table1' : 'new_table1', 56*9c5db199SXin Li 'table2' : 'new_table2', 57*9c5db199SXin Li } 58*9c5db199SXin Li self._setup_exists_expects((name for name, _ in mapping.iteritems()), 59*9c5db199SXin Li 'TABLES') 60*9c5db199SXin Li 61*9c5db199SXin Li for name, new_name in mapping.iteritems(): 62*9c5db199SXin Li self.manager.execute.expect_call( 63*9c5db199SXin Li 'RENAME TABLE `%s` TO `%s`' % (name, new_name)) 64*9c5db199SXin Li 65*9c5db199SXin Li db_utils.rename(self.manager, mapping) 66*9c5db199SXin Li self.god.check_playback() 67*9c5db199SXin Li 68*9c5db199SXin Li 69*9c5db199SXin Li def _setup_exists_expects(self, names, table): 70*9c5db199SXin Li self.manager.get_db_name.expect_call().and_return(self.DB_NAME) 71*9c5db199SXin Li self.manager.execute.expect_call( 72*9c5db199SXin Li self.EXISTS_QUERY_BASE % table, self.DB_NAME).and_return( 73*9c5db199SXin Li self._create_exists_query_result(names)) 74*9c5db199SXin Li 75*9c5db199SXin Li 76*9c5db199SXin Li def _create_exists_query_result(self, names): 77*9c5db199SXin Li return ((name, None) for name in names) 78*9c5db199SXin Li 79*9c5db199SXin Li 80*9c5db199SXin Liif __name__ == '__main__': 81*9c5db199SXin Li unittest.main() 82