Skip to content

Commit

Permalink
Fix Sqlite Transaction in Transaction Error (#7506)
Browse files Browse the repository at this point in the history
* Moved error logging after transaction rollback in Catalog and SysCatalog.  Added dashboard concurrency testing for replace_dashboard API.

* fix formatting issue

* Further formatting fixes

* Minor updates

* Encapulate renameTables in proper transaction

* Remove dead comment

* Merged dashboard concurrency with other catalog concurrency tests

* Minor cleanup

Signed-off-by: Misiu Godfrey <misiu.godfrey@kraken.mapd.com>
  • Loading branch information
misiugodfrey committed Sep 28, 2023
1 parent 0a91545 commit 314cd23
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 148 deletions.
7 changes: 4 additions & 3 deletions Calcite/Calcite.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class TFilterPushDownInfo;
class TPlanResult;
class TCompletionHint;

class Calcite final {
class Calcite {
public:
Calcite(const int db_port,
const int port,
Expand All @@ -71,6 +71,7 @@ class Calcite final {
Calcite(const SystemParameters& db_parameters,
const std::string& data_dir,
const std::string& udf_filename = "");
Calcite() {}
// sql_string may differ from what is in query_state due to legacy_syntax option.
TPlanResult process(query_state::QueryStateProxy,
std::string sql_string,
Expand All @@ -86,9 +87,9 @@ class Calcite final {
const int cursor);
std::string getExtensionFunctionWhitelist();
std::string getUserDefinedFunctionWhitelist();
void updateMetadata(std::string catalog, std::string table);
virtual void updateMetadata(std::string catalog, std::string table);
void close_calcite_server(bool log = true);
~Calcite();
virtual ~Calcite();
std::string getRuntimeExtensionFunctionWhitelist();
void setRuntimeExtensionFunctions(const std::vector<TUserDefinedFunction>& udfs,
const std::vector<TUserDefinedTableFunction>& udtfs,
Expand Down
156 changes: 72 additions & 84 deletions Catalog/Catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -566,8 +566,8 @@ void Catalog::updateDefaultColumnValues() {
sqliteConnector_.query("ALTER TABLE mapd_columns ADD default_value TEXT");
}
} catch (std::exception& e) {
LOG(ERROR) << "Failed to make metadata update for default values` support";
sqliteConnector_.query("ROLLBACK TRANSACTION");
LOG(ERROR) << "Failed to make metadata update for default values` support";
throw;
}
sqliteConnector_.query("END TRANSACTION");
Expand Down Expand Up @@ -4441,30 +4441,24 @@ void Catalog::renameTable(const TableDescriptor* td, const string& newTableName)
}
}

void Catalog::renamePhysicalTable(std::vector<std::pair<std::string, std::string>>& names,
std::vector<int>& tableIds) {
void Catalog::renamePhysicalTables(
std::vector<std::pair<std::string, std::string>>& names,
std::vector<int>& tableIds) {
cat_write_lock write_lock(this);
cat_sqlite_lock sqlite_lock(getObjForLock());

// execute the SQL query
try {
for (size_t i = 0; i < names.size(); i++) {
int tableId = tableIds[i];
std::string& newTableName = names[i].second;

sqliteConnector_.query_with_text_params(
"UPDATE mapd_tables SET name = ? WHERE tableid = ?",
std::vector<std::string>{newTableName, std::to_string(tableId)});
}
} catch (std::exception& e) {
sqliteConnector_.query("ROLLBACK TRANSACTION");
throw;
for (size_t i = 0; i < names.size(); i++) {
int tableId = tableIds[i];
const std::string& newTableName = names[i].second;
sqliteConnector_.query_with_text_params(
"UPDATE mapd_tables SET name = ? WHERE tableid = ?",
std::vector<std::string>{newTableName, std::to_string(tableId)});
}

// reset the table descriptors, give Calcite a kick
for (size_t i = 0; i < names.size(); i++) {
std::string& curTableName = names[i].first;
std::string& newTableName = names[i].second;
const auto& [curTableName, newTableName] = names[i];

TableDescriptorMap::iterator tableDescIt =
tableDescriptorMap_.find(to_upper(curTableName));
Expand All @@ -4483,38 +4477,31 @@ void Catalog::renamePhysicalTable(std::vector<std::pair<std::string, std::string
// Collect an 'overlay' mapping of the tableNames->tableId
// to account for possible chained renames
// (for swap: a->b, b->c, c->d, d->a)

const TableDescriptor* lookupTableDescriptor(Catalog* cat,
std::map<std::string, int>& cachedTableMap,
std::string& curTableName) {
auto iter = cachedTableMap.find(curTableName);
if ((iter != cachedTableMap.end())) {
// get the cached tableId
// and use that to lookup the TableDescriptor
int tableId = (*iter).second;
if (tableId == -1) {
return NULL;
} else {
return cat->getMetadataForTable(tableId);
}
const TableDescriptor* Catalog::getCachedTableDescriptor(
const std::map<std::string, int>& cached_table_map,
const std::string& cur_table_name) {
if (auto it = cached_table_map.find(cur_table_name); it != cached_table_map.end()) {
auto table_id = it->second;
return (table_id == -1) ? NULL : getMetadataForTable(table_id);
}

// else ... lookup in standard location
return cat->getMetadataForTable(curTableName);
return getMetadataForTable(cur_table_name);
}

void replaceTableName(std::map<std::string, int>& cachedTableMap,
std::string& curTableName,
std::string& newTableName,
int tableId) {
namespace {
void replace_cached_table_name(std::map<std::string, int>& cachedTableMap,
const std::string& curTableName,
const std::string& newTableName,
int tableId) {
// mark old/cur name as deleted
cachedTableMap[curTableName] = -1;

// insert the 'new' name
cachedTableMap[newTableName] = tableId;
}
} // namespace

void Catalog::renameTable(std::vector<std::pair<std::string, std::string>>& names) {
void Catalog::renameTables(
const std::vector<std::pair<std::string, std::string>>& names) {
// tableId of all tables being renamed
// ... in matching order to 'names'
std::vector<int> tableIds;
Expand All @@ -4533,20 +4520,19 @@ void Catalog::renameTable(std::vector<std::pair<std::string, std::string>>& name

// gather tableIds pre-execute; build maps
for (size_t i = 0; i < names.size(); i++) {
std::string& curTableName = names[i].first;
std::string& newTableName = names[i].second;
const auto& [curTableName, newTableName] = names[i];

// make sure the table being renamed exists,
// or will exist when executed in 'name' order
auto td = lookupTableDescriptor(this, cachedTableMap, curTableName);
auto td = getCachedTableDescriptor(cachedTableMap, curTableName);
CHECK(td);

tableIds.push_back(td->tableId);
if (uniqueOrderedTableIds.find(td->tableId) == uniqueOrderedTableIds.end()) {
// don't overwrite as it should map to the first names index 'i'
uniqueOrderedTableIds[td->tableId] = i;
}
replaceTableName(cachedTableMap, curTableName, newTableName, td->tableId);
replace_cached_table_name(cachedTableMap, curTableName, newTableName, td->tableId);
}

CHECK_EQ(tableIds.size(), names.size());
Expand All @@ -4564,7 +4550,7 @@ void Catalog::renameTable(std::vector<std::pair<std::string, std::string>>& name
// acquire the locks for all tables being renamed
lockmgr::LockedTableDescriptors tableLocks;
for (auto& idPair : uniqueOrderedTableIds) {
std::string& tableName = names[idPair.second].first;
const std::string& tableName = names[idPair.second].first;
tableLocks.emplace_back(
std::make_unique<lockmgr::TableSchemaLockContainer<lockmgr::WriteLock>>(
lockmgr::TableSchemaLockContainer<lockmgr::WriteLock>::acquireTableDescriptor(
Expand All @@ -4575,18 +4561,13 @@ void Catalog::renameTable(std::vector<std::pair<std::string, std::string>>& name

{
cat_write_lock write_lock(this);
cat_sqlite_lock sqlite_lock(getObjForLock());

sqliteConnector_.query("BEGIN TRANSACTION");

// collect all (tables + physical tables) into a single list
std::vector<std::pair<std::string, std::string>> allNames;
std::vector<int> allTableIds;

for (size_t i = 0; i < names.size(); i++) {
int tableId = tableIds[i];
std::string& curTableName = names[i].first;
std::string& newTableName = names[i].second;
const auto& [curTableName, newTableName] = names[i];

// rename all corresponding physical tables if this is a logical table
const auto physicalTableIt = logicalToPhysicalTableMapById_.find(tableId);
Expand All @@ -4597,52 +4578,44 @@ void Catalog::renameTable(std::vector<std::pair<std::string, std::string>>& name
int32_t physical_tb_id = physicalTables[k];
const TableDescriptor* phys_td = getMetadataForTable(physical_tb_id);
CHECK(phys_td);
std::string newPhysTableName =
generatePhysicalTableName(newTableName, static_cast<int32_t>(k + 1));
std::string newPhysTableName = generatePhysicalTableName(newTableName, (k + 1));
allNames.emplace_back(phys_td->tableName, newPhysTableName);
allTableIds.push_back(phys_td->tableId);
}
}
allNames.emplace_back(curTableName, newTableName);
allTableIds.push_back(tableId);
}
// rename all tables in one shot
renamePhysicalTable(allNames, allTableIds);

sqliteConnector_.query("END TRANSACTION");
// cat write/sqlite locks are released when they go out scope
// rename all tables in one transaction
execInTransaction(&Catalog::renamePhysicalTables, allNames, allTableIds);
}
{
// now update the SysCatalog
for (size_t i = 0; i < names.size(); i++) {
int tableId = tableIds[i];
std::string& newTableName = names[i].second;
{
// update table name in direct and effective priv map
DBObjectKey key;
key.dbId = currentDB_.dbId;
key.objectId = tableId;
key.permissionType = static_cast<int>(DBObjectType::TableDBObjectType);

DBObject object(newTableName, TableDBObjectType);
object.setObjectKey(key);

auto objdescs = SysCatalog::instance().getMetadataForObject(
currentDB_.dbId, static_cast<int>(DBObjectType::TableDBObjectType), tableId);
for (auto obj : objdescs) {
Grantee* grnt = SysCatalog::instance().getGrantee(obj->roleName);
if (grnt) {
grnt->renameDbObject(object);
}

// now update the SysCatalog
for (size_t i = 0; i < names.size(); i++) {
int tableId = tableIds[i];
const std::string& newTableName = names[i].second;
{
// update table name in direct and effective priv map
DBObjectKey key;
key.dbId = currentDB_.dbId;
key.objectId = tableId;
key.permissionType = static_cast<int>(DBObjectType::TableDBObjectType);

DBObject object(newTableName, TableDBObjectType);
object.setObjectKey(key);

auto objdescs = SysCatalog::instance().getMetadataForObject(
currentDB_.dbId, static_cast<int>(DBObjectType::TableDBObjectType), tableId);
for (auto obj : objdescs) {
Grantee* grnt = SysCatalog::instance().getGrantee(obj->roleName);
if (grnt) {
grnt->renameDbObject(object);
}
SysCatalog::instance().renameObjectsInDescriptorMap(object, *this);
}
SysCatalog::instance().renameObjectsInDescriptorMap(object, *this);
}
}

// -------- Cleanup --------

// table locks are released when 'tableLocks' goes out of scope
}

void Catalog::renameColumn(const TableDescriptor* td,
Expand Down Expand Up @@ -5110,7 +5083,7 @@ void Catalog::eraseTablePhysicalData(const TableDescriptor* td) {
}

std::string Catalog::generatePhysicalTableName(const std::string& logicalTableName,
const int32_t& shardNumber) {
const size_t shardNumber) {
std::string physicalTableName =
logicalTableName + physicalTableNameTag_ + std::to_string(shardNumber);
return (physicalTableName);
Expand Down Expand Up @@ -7023,4 +6996,19 @@ void Catalog::removeFromColumnMap(ColumnDescriptor* cd) {
columnDescriptorMap_.erase(ColumnKey{cd->tableId, to_upper(cd->columnName)});
columnDescriptorMapById_.erase(ColumnIdKey{cd->tableId, cd->columnId});
}

// TODO(Misiu): Replace most sqlite transactions with this idiom.
template <typename F, typename... Args>
void Catalog::execInTransaction(F&& f, Args&&... args) {
cat_write_lock write_lock(this);
cat_sqlite_lock sqlite_lock(this);
sqliteConnector_.query("BEGIN TRANSACTION");
try {
(this->*f)(std::forward<Args>(args)...);
} catch (std::exception&) {
sqliteConnector_.query("ROLLBACK TRANSACTION");
throw;
}
sqliteConnector_.query("END TRANSACTION");
}
} // namespace Catalog_Namespace
20 changes: 16 additions & 4 deletions Catalog/Catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ class Catalog final {
void dropTable(const TableDescriptor* td);
void truncateTable(const TableDescriptor* td);
void renameTable(const TableDescriptor* td, const std::string& newTableName);
void renameTable(std::vector<std::pair<std::string, std::string>>& names);
void renameTables(const std::vector<std::pair<std::string, std::string>>& names);
void renameColumn(const TableDescriptor* td,
const ColumnDescriptor* cd,
const std::string& newColumnName);
Expand Down Expand Up @@ -265,6 +265,11 @@ class Catalog final {
const DBMetadata& getCurrentDB() const { return currentDB_; }
Data_Namespace::DataMgr& getDataMgr() const { return *dataMgr_; }
std::shared_ptr<Calcite> getCalciteMgr() const { return calciteMgr_; }
void setCalciteMgr(const std::shared_ptr<Calcite>& new_calcite_mgr) {
// Used for testing.
calciteMgr_ = new_calcite_mgr;
}

const std::string& getCatalogBasePath() const { return basePath_; }

const DictDescriptor* getMetadataForDict(int dict_ref, bool loadDict = true) const;
Expand Down Expand Up @@ -675,8 +680,8 @@ class Catalog final {
void executeDropTableSqliteQueries(const TableDescriptor* td);
void doTruncateTable(const TableDescriptor* td);
void renamePhysicalTable(const TableDescriptor* td, const std::string& newTableName);
void renamePhysicalTable(std::vector<std::pair<std::string, std::string>>& names,
std::vector<int>& tableIds);
void renamePhysicalTables(std::vector<std::pair<std::string, std::string>>& names,
std::vector<int>& tableIds);
void instantiateFragmenter(TableDescriptor* td) const;
void getAllColumnMetadataForTableImpl(const TableDescriptor* td,
std::list<const ColumnDescriptor*>& colDescs,
Expand All @@ -685,7 +690,7 @@ class Catalog final {
const bool fetchPhysicalColumns) const;
std::string calculateSHA1(const std::string& data);
std::string generatePhysicalTableName(const std::string& logicalTableName,
const int32_t& shardNumber);
const size_t shardNumber);
std::vector<DBObject> parseDashboardObjects(const std::string& view_meta,
const int& user_id);
void createOrUpdateDashboardSystemRole(const std::string& view_meta,
Expand Down Expand Up @@ -877,6 +882,13 @@ class Catalog final {
: std::runtime_error{"No entry found for table: " + std::to_string(table_id)} {}
};

template <typename F, typename... Args>
void execInTransaction(F&& f, Args&&... args);

const TableDescriptor* getCachedTableDescriptor(
const std::map<std::string, int>& cached_table_map,
const std::string& cur_table_name);

static constexpr const char* CATALOG_SERVER_NAME{"system_catalog_server"};
static constexpr const char* MEMORY_STATS_SERVER_NAME{"system_memory_stats_server"};
static constexpr const char* STORAGE_STATS_SERVER_NAME{"system_storage_stats_server"};
Expand Down
Loading

0 comments on commit 314cd23

Please sign in to comment.