From 314cd2366fc47fdda1b3f8dbb427091781f00a88 Mon Sep 17 00:00:00 2001 From: Misiu Godfrey <40667992+misiugodfrey@users.noreply.github.com> Date: Wed, 13 Sep 2023 12:45:30 -0700 Subject: [PATCH] Fix Sqlite Transaction in Transaction Error (#7506) * Moved error logging after transaction rollback in Catalog and SysCatalog. Added dashboard concurrency testing for replace_dashboard API. * fix formatting issue * Further formatting fixes * Minor updates * Encapulate renameTables in proper transaction * Remove dead comment * Merged dashboard concurrency with other catalog concurrency tests * Minor cleanup Signed-off-by: Misiu Godfrey --- Calcite/Calcite.h | 7 +- Catalog/Catalog.cpp | 156 ++++++++---------- Catalog/Catalog.h | 20 ++- Catalog/SysCatalog.cpp | 12 +- Parser/ParserNode.cpp | 2 +- Tests/CreateAndDropTableDdlTest.cpp | 27 +++ .../mapd/tests/CatalogConcurrencyTest.java | 131 +++++++++------ 7 files changed, 207 insertions(+), 148 deletions(-) diff --git a/Calcite/Calcite.h b/Calcite/Calcite.h index 27c27ac234..c7f9b7b169 100644 --- a/Calcite/Calcite.h +++ b/Calcite/Calcite.h @@ -59,7 +59,7 @@ class TFilterPushDownInfo; class TPlanResult; class TCompletionHint; -class Calcite final { +class Calcite { public: Calcite(const int db_port, const int port, @@ -71,6 +71,7 @@ class Calcite final { Calcite(const SystemParameters& db_parameters, const std::string& data_dir, const std::string& udf_filename = ""); + Calcite() {} // sql_string may differ from what is in query_state due to legacy_syntax option. TPlanResult process(query_state::QueryStateProxy, std::string sql_string, @@ -86,9 +87,9 @@ class Calcite final { const int cursor); std::string getExtensionFunctionWhitelist(); std::string getUserDefinedFunctionWhitelist(); - void updateMetadata(std::string catalog, std::string table); + virtual void updateMetadata(std::string catalog, std::string table); void close_calcite_server(bool log = true); - ~Calcite(); + virtual ~Calcite(); std::string getRuntimeExtensionFunctionWhitelist(); void setRuntimeExtensionFunctions(const std::vector& udfs, const std::vector& udtfs, diff --git a/Catalog/Catalog.cpp b/Catalog/Catalog.cpp index 2c27f17204..473dc4f082 100644 --- a/Catalog/Catalog.cpp +++ b/Catalog/Catalog.cpp @@ -566,8 +566,8 @@ void Catalog::updateDefaultColumnValues() { sqliteConnector_.query("ALTER TABLE mapd_columns ADD default_value TEXT"); } } catch (std::exception& e) { - LOG(ERROR) << "Failed to make metadata update for default values` support"; sqliteConnector_.query("ROLLBACK TRANSACTION"); + LOG(ERROR) << "Failed to make metadata update for default values` support"; throw; } sqliteConnector_.query("END TRANSACTION"); @@ -4441,30 +4441,24 @@ void Catalog::renameTable(const TableDescriptor* td, const string& newTableName) } } -void Catalog::renamePhysicalTable(std::vector>& names, - std::vector& tableIds) { +void Catalog::renamePhysicalTables( + std::vector>& names, + std::vector& tableIds) { cat_write_lock write_lock(this); cat_sqlite_lock sqlite_lock(getObjForLock()); // execute the SQL query - try { - for (size_t i = 0; i < names.size(); i++) { - int tableId = tableIds[i]; - std::string& newTableName = names[i].second; - - sqliteConnector_.query_with_text_params( - "UPDATE mapd_tables SET name = ? WHERE tableid = ?", - std::vector{newTableName, std::to_string(tableId)}); - } - } catch (std::exception& e) { - sqliteConnector_.query("ROLLBACK TRANSACTION"); - throw; + for (size_t i = 0; i < names.size(); i++) { + int tableId = tableIds[i]; + const std::string& newTableName = names[i].second; + sqliteConnector_.query_with_text_params( + "UPDATE mapd_tables SET name = ? WHERE tableid = ?", + std::vector{newTableName, std::to_string(tableId)}); } // reset the table descriptors, give Calcite a kick for (size_t i = 0; i < names.size(); i++) { - std::string& curTableName = names[i].first; - std::string& newTableName = names[i].second; + const auto& [curTableName, newTableName] = names[i]; TableDescriptorMap::iterator tableDescIt = tableDescriptorMap_.find(to_upper(curTableName)); @@ -4483,38 +4477,31 @@ void Catalog::renamePhysicalTable(std::vectortableId // to account for possible chained renames // (for swap: a->b, b->c, c->d, d->a) - -const TableDescriptor* lookupTableDescriptor(Catalog* cat, - std::map& cachedTableMap, - std::string& curTableName) { - auto iter = cachedTableMap.find(curTableName); - if ((iter != cachedTableMap.end())) { - // get the cached tableId - // and use that to lookup the TableDescriptor - int tableId = (*iter).second; - if (tableId == -1) { - return NULL; - } else { - return cat->getMetadataForTable(tableId); - } +const TableDescriptor* Catalog::getCachedTableDescriptor( + const std::map& cached_table_map, + const std::string& cur_table_name) { + if (auto it = cached_table_map.find(cur_table_name); it != cached_table_map.end()) { + auto table_id = it->second; + return (table_id == -1) ? NULL : getMetadataForTable(table_id); } - - // else ... lookup in standard location - return cat->getMetadataForTable(curTableName); + return getMetadataForTable(cur_table_name); } -void replaceTableName(std::map& cachedTableMap, - std::string& curTableName, - std::string& newTableName, - int tableId) { +namespace { +void replace_cached_table_name(std::map& cachedTableMap, + const std::string& curTableName, + const std::string& newTableName, + int tableId) { // mark old/cur name as deleted cachedTableMap[curTableName] = -1; // insert the 'new' name cachedTableMap[newTableName] = tableId; } +} // namespace -void Catalog::renameTable(std::vector>& names) { +void Catalog::renameTables( + const std::vector>& names) { // tableId of all tables being renamed // ... in matching order to 'names' std::vector tableIds; @@ -4533,12 +4520,11 @@ void Catalog::renameTable(std::vector>& name // gather tableIds pre-execute; build maps for (size_t i = 0; i < names.size(); i++) { - std::string& curTableName = names[i].first; - std::string& newTableName = names[i].second; + const auto& [curTableName, newTableName] = names[i]; // make sure the table being renamed exists, // or will exist when executed in 'name' order - auto td = lookupTableDescriptor(this, cachedTableMap, curTableName); + auto td = getCachedTableDescriptor(cachedTableMap, curTableName); CHECK(td); tableIds.push_back(td->tableId); @@ -4546,7 +4532,7 @@ void Catalog::renameTable(std::vector>& name // don't overwrite as it should map to the first names index 'i' uniqueOrderedTableIds[td->tableId] = i; } - replaceTableName(cachedTableMap, curTableName, newTableName, td->tableId); + replace_cached_table_name(cachedTableMap, curTableName, newTableName, td->tableId); } CHECK_EQ(tableIds.size(), names.size()); @@ -4564,7 +4550,7 @@ void Catalog::renameTable(std::vector>& name // acquire the locks for all tables being renamed lockmgr::LockedTableDescriptors tableLocks; for (auto& idPair : uniqueOrderedTableIds) { - std::string& tableName = names[idPair.second].first; + const std::string& tableName = names[idPair.second].first; tableLocks.emplace_back( std::make_unique>( lockmgr::TableSchemaLockContainer::acquireTableDescriptor( @@ -4575,18 +4561,13 @@ void Catalog::renameTable(std::vector>& name { cat_write_lock write_lock(this); - cat_sqlite_lock sqlite_lock(getObjForLock()); - - sqliteConnector_.query("BEGIN TRANSACTION"); - // collect all (tables + physical tables) into a single list std::vector> allNames; std::vector allTableIds; for (size_t i = 0; i < names.size(); i++) { int tableId = tableIds[i]; - std::string& curTableName = names[i].first; - std::string& newTableName = names[i].second; + const auto& [curTableName, newTableName] = names[i]; // rename all corresponding physical tables if this is a logical table const auto physicalTableIt = logicalToPhysicalTableMapById_.find(tableId); @@ -4597,8 +4578,7 @@ void Catalog::renameTable(std::vector>& name int32_t physical_tb_id = physicalTables[k]; const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id); CHECK(phys_td); - std::string newPhysTableName = - generatePhysicalTableName(newTableName, static_cast(k + 1)); + std::string newPhysTableName = generatePhysicalTableName(newTableName, (k + 1)); allNames.emplace_back(phys_td->tableName, newPhysTableName); allTableIds.push_back(phys_td->tableId); } @@ -4606,43 +4586,36 @@ void Catalog::renameTable(std::vector>& name allNames.emplace_back(curTableName, newTableName); allTableIds.push_back(tableId); } - // rename all tables in one shot - renamePhysicalTable(allNames, allTableIds); - sqliteConnector_.query("END TRANSACTION"); - // cat write/sqlite locks are released when they go out scope + // rename all tables in one transaction + execInTransaction(&Catalog::renamePhysicalTables, allNames, allTableIds); } - { - // now update the SysCatalog - for (size_t i = 0; i < names.size(); i++) { - int tableId = tableIds[i]; - std::string& newTableName = names[i].second; - { - // update table name in direct and effective priv map - DBObjectKey key; - key.dbId = currentDB_.dbId; - key.objectId = tableId; - key.permissionType = static_cast(DBObjectType::TableDBObjectType); - - DBObject object(newTableName, TableDBObjectType); - object.setObjectKey(key); - - auto objdescs = SysCatalog::instance().getMetadataForObject( - currentDB_.dbId, static_cast(DBObjectType::TableDBObjectType), tableId); - for (auto obj : objdescs) { - Grantee* grnt = SysCatalog::instance().getGrantee(obj->roleName); - if (grnt) { - grnt->renameDbObject(object); - } + + // now update the SysCatalog + for (size_t i = 0; i < names.size(); i++) { + int tableId = tableIds[i]; + const std::string& newTableName = names[i].second; + { + // update table name in direct and effective priv map + DBObjectKey key; + key.dbId = currentDB_.dbId; + key.objectId = tableId; + key.permissionType = static_cast(DBObjectType::TableDBObjectType); + + DBObject object(newTableName, TableDBObjectType); + object.setObjectKey(key); + + auto objdescs = SysCatalog::instance().getMetadataForObject( + currentDB_.dbId, static_cast(DBObjectType::TableDBObjectType), tableId); + for (auto obj : objdescs) { + Grantee* grnt = SysCatalog::instance().getGrantee(obj->roleName); + if (grnt) { + grnt->renameDbObject(object); } - SysCatalog::instance().renameObjectsInDescriptorMap(object, *this); } + SysCatalog::instance().renameObjectsInDescriptorMap(object, *this); } } - - // -------- Cleanup -------- - - // table locks are released when 'tableLocks' goes out of scope } void Catalog::renameColumn(const TableDescriptor* td, @@ -5110,7 +5083,7 @@ void Catalog::eraseTablePhysicalData(const TableDescriptor* td) { } std::string Catalog::generatePhysicalTableName(const std::string& logicalTableName, - const int32_t& shardNumber) { + const size_t shardNumber) { std::string physicalTableName = logicalTableName + physicalTableNameTag_ + std::to_string(shardNumber); return (physicalTableName); @@ -7023,4 +6996,19 @@ void Catalog::removeFromColumnMap(ColumnDescriptor* cd) { columnDescriptorMap_.erase(ColumnKey{cd->tableId, to_upper(cd->columnName)}); columnDescriptorMapById_.erase(ColumnIdKey{cd->tableId, cd->columnId}); } + +// TODO(Misiu): Replace most sqlite transactions with this idiom. +template +void Catalog::execInTransaction(F&& f, Args&&... args) { + cat_write_lock write_lock(this); + cat_sqlite_lock sqlite_lock(this); + sqliteConnector_.query("BEGIN TRANSACTION"); + try { + (this->*f)(std::forward(args)...); + } catch (std::exception&) { + sqliteConnector_.query("ROLLBACK TRANSACTION"); + throw; + } + sqliteConnector_.query("END TRANSACTION"); +} } // namespace Catalog_Namespace diff --git a/Catalog/Catalog.h b/Catalog/Catalog.h index a276d38f58..e7b6ed0b9c 100644 --- a/Catalog/Catalog.h +++ b/Catalog/Catalog.h @@ -187,7 +187,7 @@ class Catalog final { void dropTable(const TableDescriptor* td); void truncateTable(const TableDescriptor* td); void renameTable(const TableDescriptor* td, const std::string& newTableName); - void renameTable(std::vector>& names); + void renameTables(const std::vector>& names); void renameColumn(const TableDescriptor* td, const ColumnDescriptor* cd, const std::string& newColumnName); @@ -265,6 +265,11 @@ class Catalog final { const DBMetadata& getCurrentDB() const { return currentDB_; } Data_Namespace::DataMgr& getDataMgr() const { return *dataMgr_; } std::shared_ptr getCalciteMgr() const { return calciteMgr_; } + void setCalciteMgr(const std::shared_ptr& new_calcite_mgr) { + // Used for testing. + calciteMgr_ = new_calcite_mgr; + } + const std::string& getCatalogBasePath() const { return basePath_; } const DictDescriptor* getMetadataForDict(int dict_ref, bool loadDict = true) const; @@ -675,8 +680,8 @@ class Catalog final { void executeDropTableSqliteQueries(const TableDescriptor* td); void doTruncateTable(const TableDescriptor* td); void renamePhysicalTable(const TableDescriptor* td, const std::string& newTableName); - void renamePhysicalTable(std::vector>& names, - std::vector& tableIds); + void renamePhysicalTables(std::vector>& names, + std::vector& tableIds); void instantiateFragmenter(TableDescriptor* td) const; void getAllColumnMetadataForTableImpl(const TableDescriptor* td, std::list& colDescs, @@ -685,7 +690,7 @@ class Catalog final { const bool fetchPhysicalColumns) const; std::string calculateSHA1(const std::string& data); std::string generatePhysicalTableName(const std::string& logicalTableName, - const int32_t& shardNumber); + const size_t shardNumber); std::vector parseDashboardObjects(const std::string& view_meta, const int& user_id); void createOrUpdateDashboardSystemRole(const std::string& view_meta, @@ -877,6 +882,13 @@ class Catalog final { : std::runtime_error{"No entry found for table: " + std::to_string(table_id)} {} }; + template + void execInTransaction(F&& f, Args&&... args); + + const TableDescriptor* getCachedTableDescriptor( + const std::map& cached_table_map, + const std::string& cur_table_name); + static constexpr const char* CATALOG_SERVER_NAME{"system_catalog_server"}; static constexpr const char* MEMORY_STATS_SERVER_NAME{"system_memory_stats_server"}; static constexpr const char* STORAGE_STATS_SERVER_NAME{"system_storage_stats_server"}; diff --git a/Catalog/SysCatalog.cpp b/Catalog/SysCatalog.cpp index ae00c4d9a0..affdf11680 100644 --- a/Catalog/SysCatalog.cpp +++ b/Catalog/SysCatalog.cpp @@ -398,8 +398,8 @@ void SysCatalog::importDataFromOldMapdDB() { std::string mapd_db_path = basePath_ + "/" + shared::kCatalogDirectoryName + "/mapd"; sqliteConnector_->query("ATTACH DATABASE `" + mapd_db_path + "` as old_cat"); sqliteConnector_->query("BEGIN TRANSACTION"); - LOG(INFO) << "Moving global metadata into a separate catalog"; try { + LOG(INFO) << "Moving global metadata into a separate catalog"; auto moveTableIfExists = [conn = sqliteConnector_.get()](const std::string& tableName, bool deleteOld = true) { conn->query("SELECT sql FROM old_cat.sqlite_master WHERE type='table' AND name='" + @@ -419,8 +419,8 @@ void SysCatalog::importDataFromOldMapdDB() { moveTableIfExists("mapd_privileges"); moveTableIfExists("mapd_version_history", false); } catch (const std::exception& e) { - LOG(ERROR) << "Failed to move global metadata into a separate catalog: " << e.what(); sqliteConnector_->query("ROLLBACK TRANSACTION"); + LOG(ERROR) << "Failed to move global metadata into a separate catalog: " << e.what(); try { sqliteConnector_->query("DETACH DATABASE old_cat"); } catch (const std::exception&) { @@ -719,8 +719,8 @@ void SysCatalog::updatePasswordsToHashes() { sqliteConnector_->query("DROP TABLE mapd_users"); sqliteConnector_->query("ALTER TABLE mapd_users_tmp RENAME TO mapd_users"); } catch (const std::exception& e) { - LOG(ERROR) << "Failed to hash passwords: " << e.what(); sqliteConnector_->query("ROLLBACK TRANSACTION"); + LOG(ERROR) << "Failed to hash passwords: " << e.what(); throw; } sqliteConnector_->query("END TRANSACTION"); @@ -768,8 +768,8 @@ void SysCatalog::updateBlankPasswordsToRandom() { std::vector{std::to_string(MAPD_VERSION), UPDATE_BLANK_PASSWORDS_TO_RANDOM}); } catch (const std::exception& e) { - LOG(ERROR) << "Failed to fix blank passwords: " << e.what(); sqliteConnector_->query("ROLLBACK TRANSACTION"); + LOG(ERROR) << "Failed to fix blank passwords: " << e.what(); throw; } sqliteConnector_->query("END TRANSACTION"); @@ -795,8 +795,8 @@ void SysCatalog::updateSupportUserDeactivation() { std::vector{std::to_string(MAPD_VERSION), UPDATE_SUPPORT_USER_DEACTIVATION}); } catch (const std::exception& e) { - LOG(ERROR) << "Failed to add support for user deactivation: " << e.what(); sqliteConnector_->query("ROLLBACK TRANSACTION"); + LOG(ERROR) << "Failed to add support for user deactivation: " << e.what(); throw; } sqliteConnector_->query("END TRANSACTION"); @@ -872,8 +872,8 @@ void SysCatalog::migrateDBAccessPrivileges() { } } } catch (const std::exception& e) { - LOG(ERROR) << "Failed to migrate db access privileges: " << e.what(); sqliteConnector_->query("ROLLBACK TRANSACTION"); + LOG(ERROR) << "Failed to migrate db access privileges: " << e.what(); throw; } sqliteConnector_->query("END TRANSACTION"); diff --git a/Parser/ParserNode.cpp b/Parser/ParserNode.cpp index 794275955c..0865ee492a 100644 --- a/Parser/ParserNode.cpp +++ b/Parser/ParserNode.cpp @@ -5303,7 +5303,7 @@ void RenameTableStmt::execute(const Catalog_Namespace::SessionInfo& session, } checkNameSubstition(tableSubtituteMap); - catalog.renameTable(names); + catalog.renameTables(names); // just to be explicit, clean out the list, the unique_ptr will delete while (!tablesToRename_.empty()) { diff --git a/Tests/CreateAndDropTableDdlTest.cpp b/Tests/CreateAndDropTableDdlTest.cpp index 7a81fa072d..43d48ac4e8 100644 --- a/Tests/CreateAndDropTableDdlTest.cpp +++ b/Tests/CreateAndDropTableDdlTest.cpp @@ -23,14 +23,26 @@ #include #include +#include "Calcite/Calcite.h" #include "Catalog/ForeignTable.h" #include "Catalog/TableDescriptor.h" #include "DBHandlerTestHelpers.h" #include "Fragmenter/FragmentDefaultValues.h" #include "Shared/SysDefinitions.h" +#include "Shared/scope.h" #include "TestHelpers.h" #include "Utils/DdlUtils.h" +class MockCalcite : public Calcite { + public: + struct UpdateMetadataError : public std::runtime_error { + UpdateMetadataError() : std::runtime_error("UpdateMetadataError") {} + }; + void updateMetadata(std::string catalog, std::string table) override { + throw UpdateMetadataError(); + } +}; + extern bool g_enable_fsi; extern bool g_enable_s3_fsi; @@ -1918,6 +1930,21 @@ TEST_F(RenameTableTest, SimpleRename) { sql("RENAME TABLE F TO A, G TO B, H TO C;"); } +TEST_F(RenameTableTest, RenameCalciteError) { + // Inject a calcite conenction issue so that updating calcite metadata will throw. + // TODO(Misiu): Replace this with proper Catalog unit tests. + auto& cat = getCatalog(); + auto old_calcite = cat.getCalciteMgr(); + cat.setCalciteMgr(std::make_shared()); + ScopeGuard scope = [&] { cat.setCalciteMgr(old_calcite); }; + + queryAndAssertException("RENAME TABLE A to F;", "UpdateMetadataError"); + sqlAndCompareResult("SELECT count(*) FROM A WHERE Name = 'A';", {{i(1)}}); + queryAndAssertException( + "SELECT count(*) from F;", + "SQL Error: From line 2, column 6 to line 2, column 8: Object 'F' not found"); +} + TEST_F(RenameTableTest, SwapRename) { // swap sql("RENAME TABLE D to E, E to D"); diff --git a/java/utility/src/main/java/com/mapd/tests/CatalogConcurrencyTest.java b/java/utility/src/main/java/com/mapd/tests/CatalogConcurrencyTest.java index 7df3f33bbd..7eed959b55 100644 --- a/java/utility/src/main/java/com/mapd/tests/CatalogConcurrencyTest.java +++ b/java/utility/src/main/java/com/mapd/tests/CatalogConcurrencyTest.java @@ -18,9 +18,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; +import java.util.*; + +import ai.heavy.thrift.server.*; public class CatalogConcurrencyTest { + final static String defPwd = "HyperInteractive", local = "localhost", defDb = "heavyai", + admin = "admin"; + final static int port = 6274; final static Logger logger = LoggerFactory.getLogger(CatalogConcurrencyTest.class); public static void main(String[] args) throws Exception { @@ -28,33 +33,39 @@ public static void main(String[] args) throws Exception { test.testCatalogConcurrency(); } - private void run_test( - HeavyDBTestClient dba, HeavyDBTestClient user, String prefix, int max) - throws Exception { + private void run_test(HeavyDBTestClient dba, + HeavyDBTestClient user, + String prefix, + int max, + List dashboardIds) throws Exception { final String sharedTableName = "table_shared"; for (int i = 0; i < max; i++) { - String tableName = "table_" + prefix + "_" + i; - String viewName = "view_" + prefix + "_" + i; - String dashName = "dash_" + prefix + "_" + i; - long tid = Thread.currentThread().getId(); + final long tid = Thread.currentThread().getId(); + final String threadPrefix = "[" + tid + "] ", + tableName = "table_" + prefix + "_" + i, + viewName = "view_" + prefix + "_" + i, + dashName = "dash_" + prefix + "_" + i; + + // Modify the fixed id dashboards in parallel. + for (int id : dashboardIds) { + TDashboard board = dba.get_dashboard(id); + logger.info("REPLACE DASHBOARD id (" + id + ") " + board.dashboard_name); + dba.replace_dashboard(board.dashboard_id, board.dashboard_name + "_", admin); + } - logger.info("[" + tid + "]" - + "CREATE " + tableName); + logger.info(threadPrefix + "CREATE TABLE " + tableName); user.runSql("CREATE TABLE " + tableName + " (id text);"); HeavyDBAsserts.assertEqual(true, null != dba.get_table_details(tableName)); - logger.info("[" + tid + "]" - + "INSERT INTO " + tableName); + logger.info(threadPrefix + "INSERT INTO " + tableName); user.runSql("INSERT INTO " + tableName + " VALUES(1);"); dba.runSql("GRANT SELECT ON TABLE " + tableName + " TO bob;"); - logger.info("[" + tid + "]" - + "CREATE " + viewName); + logger.info(threadPrefix + "CREATE VIEW " + viewName); user.runSql("CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName + ";"); HeavyDBAsserts.assertEqual(true, null != dba.get_table_details(viewName)); dba.runSql("GRANT SELECT ON VIEW " + viewName + " TO bob;"); - logger.info("[" + tid + "]" - + "CREATE " + dashName); + logger.info(threadPrefix + "CREATE DASHBOARD " + dashName); int dash_id = user.create_dashboard(dashName); HeavyDBAsserts.assertEqual(true, null != dba.get_dashboard(dash_id)); dba.runSql("GRANT VIEW ON DASHBOARD " + dash_id + " TO bob;"); @@ -63,31 +74,28 @@ private void run_test( dba.runSql("REVOKE SELECT ON VIEW " + viewName + " FROM bob;"); dba.runSql("REVOKE SELECT ON TABLE " + tableName + " FROM bob;"); - logger.info("[" + tid + "]" - + "DROP " + dashName); + logger.info(threadPrefix + "DELETE DASHBOARD " + dashName); dba.delete_dashboard(dash_id); - logger.info("[" + tid + "]" - + "DROP " + viewName); + logger.info(threadPrefix + "DROP VIEW " + viewName); dba.runSql("DROP VIEW " + viewName + ";"); - logger.info("[" + tid + "]" - + "DROP " + tableName); + logger.info(threadPrefix + "DROP TABLE " + tableName); dba.runSql("DROP TABLE " + tableName + ";"); - logger.info("[" + tid + "]" - + "CREATE IF NOT EXISTS " + sharedTableName); + logger.info(threadPrefix + "CREATE IF NOT EXISTS " + sharedTableName); dba.runSql("CREATE TABLE IF NOT EXISTS " + sharedTableName + " (id INTEGER);"); - logger.info("[" + tid + "]" - + "DROP IF EXISTS " + sharedTableName); + logger.info(threadPrefix + "DROP IF EXISTS " + sharedTableName); dba.runSql("DROP TABLE IF EXISTS " + sharedTableName + ";"); } } - private void runTest( - String db, String dbaUser, String dbaPassword, String dbUser, String dbPassword) - throws Exception { - int num_threads = 5; - final int runs = 25; + private void runTest(String db, + String dbaUser, + String dbaPassword, + String dbUser, + String dbPassword, + List dashboardIds) throws Exception { + final int num_threads = 5, runs = 25; Exception exceptions[] = new Exception[num_threads]; ArrayList threads = new ArrayList<>(); @@ -99,11 +107,11 @@ private void runTest( @Override public void run() { try { - HeavyDBTestClient dba = HeavyDBTestClient.getClient( - "localhost", 6274, db, dbaUser, dbaPassword); - HeavyDBTestClient user = HeavyDBTestClient.getClient( - "localhost", 6274, db, dbUser, dbPassword); - run_test(dba, user, prefix, runs); + HeavyDBTestClient dba = + HeavyDBTestClient.getClient(local, port, db, dbaUser, dbaPassword); + HeavyDBTestClient user = + HeavyDBTestClient.getClient(local, port, db, dbUser, dbPassword); + run_test(dba, user, prefix, runs, dashboardIds); } catch (Exception e) { logger.error("[" + Thread.currentThread().getId() + "]" + "Caught Exception: " + e.getMessage(), @@ -131,8 +139,12 @@ public void run() { public void testCatalogConcurrency() throws Exception { logger.info("testCatalogConcurrency()"); - HeavyDBTestClient su = HeavyDBTestClient.getClient( - "localhost", 6274, "heavyai", "admin", "HyperInteractive"); + HeavyDBTestClient su = HeavyDBTestClient.getClient(local, port, defDb, admin, defPwd); + + su.runSql("DROP USER IF EXISTS bob;"); + su.runSql("DROP USER IF EXISTS dba;"); + su.runSql("DROP DATABASE IF EXISTS db1;"); + su.runSql("CREATE USER dba (password = 'password', is_super = 'true');"); su.runSql("CREATE USER bob (password = 'password', is_super = 'false');"); @@ -159,18 +171,37 @@ public void testCatalogConcurrency() throws Exception { su.runSql("GRANT ACCESS on database db1 TO dba;"); su.runSql("GRANT ACCESS on database db1 TO bob;"); - runTest("db1", "admin", "HyperInteractive", "admin", "HyperInteractive"); - runTest("db1", "admin", "HyperInteractive", "dba", "password"); - runTest("db1", "admin", "HyperInteractive", "bob", "password"); - runTest("db1", "dba", "password", "admin", "HyperInteractive"); - runTest("db1", "dba", "password", "bob", "password"); - - runTest("heavyai", "admin", "HyperInteractive", "admin", "HyperInteractive"); - runTest("heavyai", "admin", "HyperInteractive", "dba", "password"); - runTest("heavyai", "admin", "HyperInteractive", "bob", "password"); - runTest("heavyai", "dba", "password", "admin", "HyperInteractive"); - runTest("heavyai", "dba", "password", "bob", "password"); - + // We create a series of dashboards with fixed ids to be modified in parallel. + HeavyDBTestClient dba = + HeavyDBTestClient.getClient(local, port, "db1", admin, defPwd); + for (TDashboard board : dba.get_dashboards()) { + logger.info("DROP DASHBOARD " + board.dashboard_name); + dba.delete_dashboard(board.dashboard_id); + } + ArrayList dashboardIds = new ArrayList<>(); + for (int i = 0; i < 5; ++i) { + String dashName = "dash_" + i; + logger.info("CREATE DASHBOARD " + dashName); + dashboardIds.add(dba.create_dashboard(dashName)); + } + HeavyDBAsserts.assertEqual(5, dba.get_dashboards().size()); + + runTest("db1", admin, defPwd, admin, defPwd, dashboardIds); + runTest("db1", admin, defPwd, "dba", "password", dashboardIds); + runTest("db1", admin, defPwd, "bob", "password", dashboardIds); + runTest("db1", "dba", "password", admin, defPwd, dashboardIds); + runTest("db1", "dba", "password", "bob", "password", dashboardIds); + + runTest(defDb, admin, defPwd, admin, defPwd, dashboardIds); + runTest(defDb, admin, defPwd, "dba", "password", dashboardIds); + runTest(defDb, admin, defPwd, "bob", "password", dashboardIds); + runTest(defDb, "dba", "password", admin, defPwd, dashboardIds); + runTest(defDb, "dba", "password", "bob", "password", dashboardIds); + + for (TDashboard board : dba.get_dashboards()) { + logger.info("DROP DASHBOARD " + board.dashboard_name); + dba.delete_dashboard(board.dashboard_id); + } su.runSql("DROP DATABASE db1;"); su.runSql("DROP USER bob;"); su.runSql("DROP USER dba;");