Skip to content

Commit

Permalink
Implemented custom reconnect algorithm
Browse files Browse the repository at this point in the history
Significantly increased performance of prepared statements after reconnect
  • Loading branch information
FredyH committed Apr 21, 2022
1 parent a2b5f29 commit 1152181
Show file tree
Hide file tree
Showing 13 changed files with 176 additions and 196 deletions.
88 changes: 57 additions & 31 deletions src/mysql/Database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
#include <cstring>
#include <iostream>
#include <utility>
#include "mysqld_error.h"
#include "../lua/LuaObject.h"
#include "errmsg.h"

Database::Database(std::string host, std::string username, std::string pw, std::string database, unsigned int port,
std::string unixSocket) :
Expand All @@ -30,32 +32,38 @@ Database::~Database() {


//This makes sure that all stmts always get freed
void Database::cacheStatement(MYSQL_STMT *stmt) {
if (stmt == nullptr) return;
std::shared_ptr<StatementHandle> Database::cacheStatement(MYSQL_STMT *stmt) {
if (stmt == nullptr) return std::make_shared<StatementHandle>(nullptr, false);
std::unique_lock<std::mutex> lock(m_statementMutex);
cachedStatements.insert(stmt);
auto handle = std::make_shared<StatementHandle>(stmt, true);
cachedStatements.insert(handle);
return handle;
}

//This notifies the database thread to free this statement some time in the future
void Database::freeStatement(MYSQL_STMT *stmt) {
if (stmt == nullptr) return;
void Database::freeStatement(const std::shared_ptr<StatementHandle> &handle) {
if (handle == nullptr || !handle->isValid()) return;
std::unique_lock<std::mutex> lock(m_statementMutex);
if (cachedStatements.find(stmt) != cachedStatements.end()) {
if (cachedStatements.find(handle) != cachedStatements.end()) {
//Otherwise, the statement was already freed
cachedStatements.erase(stmt);
freedStatements.insert(stmt);
cachedStatements.erase(handle);
freedStatements.insert(handle->stmt);
}
handle->invalidate();
}

//Frees all statements that were allocated by the database
//This is called when the database shuts down
//This is called when the database shuts down or a reconnect happens
void Database::freeCachedStatements() {
std::unique_lock<std::mutex> lock(m_statementMutex);
for (auto &stmt: cachedStatements) {
mysql_stmt_close(stmt);
for (auto &handle: cachedStatements) {
if (handle == nullptr || !handle->isValid()) continue;
mysql_stmt_close(handle->stmt);
handle->invalidate();
}
cachedStatements.clear();
for (auto &stmt: freedStatements) {
if (stmt == nullptr) continue;
mysql_stmt_close(stmt);
}
freedStatements.clear();
Expand All @@ -66,6 +74,7 @@ void Database::freeCachedStatements() {
void Database::freeUnusedStatements() {
std::unique_lock<std::mutex> lock(m_statementMutex);
for (auto &stmt: freedStatements) {
//Even if this returns an error, the handle will be freed
mysql_stmt_close(stmt);
}
freedStatements.clear();
Expand Down Expand Up @@ -299,21 +308,6 @@ void Database::setCachePreparedStatements(bool shouldCache) {
cachePreparedStatements = shouldCache;
}

//Should only be called from the db thread
//While the mysql documentation says that mysql_options should only be called
//before the connection is done it appears to work after just fine (at least for reconnect)
void Database::setSQLAutoReconnect(bool shouldReconnect) {
auto myAutoReconnectBool = (my_bool) shouldReconnect;
mysql_optionsv(m_sql, MYSQL_OPT_RECONNECT, &myAutoReconnectBool);
}

//Should only be called from the db thread
bool Database::getSQLAutoReconnect() {
my_bool autoReconnect;
mysql_get_optionv(m_sql, MYSQL_OPT_RECONNECT, &autoReconnect);
return (bool) autoReconnect;
}

void Database::failWaitingQuery(const std::shared_ptr<IQuery> &query, const std::shared_ptr<IQueryData> &data,
std::string reason) {
data->setError(std::move(reason));
Expand Down Expand Up @@ -381,9 +375,6 @@ void Database::connectRun() {
return;
}
this->customSSLSettings.applySSLSettings(this->m_sql);
if (this->shouldAutoReconnect) {
setSQLAutoReconnect(true);
}
const char *socketStr = (this->socket.length() == 0) ? nullptr : this->socket.c_str();
unsigned long clientFlag = (this->useMultiStatements) ? CLIENT_MULTI_STATEMENTS : 0;
clientFlag |= CLIENT_MULTI_RESULTS;
Expand Down Expand Up @@ -415,6 +406,27 @@ void Database::connectRun() {
}
}

void Database::runQuery(const std::shared_ptr<IQuery>& query, const std::shared_ptr<IQueryData>& data, bool retry) {
try {
query->executeStatement(*this, this->m_sql, data);
data->setResultStatus(QUERY_SUCCESS);
} catch (const MySQLException &error) {
unsigned int errorCode = error.getErrorCode();
bool retryableError = errorCode == CR_SERVER_LOST || errorCode == CR_SERVER_GONE_ERROR ||
errorCode == ER_MAX_PREPARED_STMT_COUNT_REACHED || errorCode == ER_UNKNOWN_STMT_HANDLER ||
errorCode == CR_NO_PREPARE_STMT;
if (retry && retryableError && attemptReconnect()) {
//Need to free statements before retrying in case the connection was lost
//and prepared statement handles have become invalid
freeCachedStatements();
runQuery(query, data, false);
} else {
data->setResultStatus(QUERY_ERROR);
data->setError(error.what());
}
}
}

/* The run method of the thread of the database instance.
*/
void Database::run() {
Expand All @@ -432,7 +444,9 @@ void Database::run() {
{
//New scope so mutex will be released as soon as possible
std::unique_lock<std::mutex> queryMutex(m_queryMutex);
curQuery->executeStatement(*this, this->m_sql, data);
data->setStatus(QUERY_RUNNING);
runQuery(curQuery, data, this->shouldAutoReconnect);
data->setStatus(QUERY_COMPLETE);
}
finishedQueries.put(pair);
{
Expand All @@ -449,4 +463,16 @@ void Database::run() {
//So that statements get eventually freed even if the queue is constantly full
freeUnusedStatements();
}
}
}

bool Database::attemptReconnect() {
bool success;
my_bool reconnect = '1';
mysql_optionsv(this->m_sql, MYSQL_OPT_RECONNECT, &reconnect);
success = mariadb_reconnect(this->m_sql) == 0;
reconnect = '0';
mysql_optionsv(this->m_sql, MYSQL_OPT_RECONNECT, &reconnect);
return success;
}

StatementHandle::StatementHandle(MYSQL_STMT *stmt, bool valid) : stmt(stmt), valid(valid) {}
17 changes: 9 additions & 8 deletions src/mysql/Database.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <memory>
#include <mutex>
#include <sstream>
#include "StatementHandle.h"
#include <unordered_set>
#include <condition_variable>
#include "GarrysMod/Lua/Interface.h"
Expand Down Expand Up @@ -36,6 +37,7 @@ enum DatabaseStatus {
DATABASE_CONNECTION_FAILED = 3
};


class Database : public std::enable_shared_from_this<Database> {
friend class IQuery;

Expand All @@ -47,9 +49,9 @@ class Database : public std::enable_shared_from_this<Database> {

~Database();

void cacheStatement(MYSQL_STMT *stmt);
std::shared_ptr<StatementHandle> cacheStatement(MYSQL_STMT *stmt);

void freeStatement(MYSQL_STMT *stmt);
void freeStatement(const std::shared_ptr<StatementHandle> &handle);

void enqueueQuery(const std::shared_ptr<IQuery> &query, const std::shared_ptr<IQueryData> &data);

Expand Down Expand Up @@ -99,17 +101,14 @@ class Database : public std::enable_shared_from_this<Database> {

bool connectionSuccessful() { return m_success; }

bool attemptReconnect();

std::string connectionError() { return m_connection_err; }

std::deque<std::pair<std::shared_ptr<IQuery>, std::shared_ptr<IQueryData>>> takeFinishedQueries() {
return finishedQueries.clear();
}

void setSQLAutoReconnect(bool autoReconnect);

bool getSQLAutoReconnect();


private:
Database(std::string host, std::string username, std::string pw, std::string database, unsigned int port,
std::string unixSocket);
Expand All @@ -122,6 +121,8 @@ class Database : public std::enable_shared_from_this<Database> {

void run();

void runQuery(const std::shared_ptr<IQuery> &query, const std::shared_ptr<IQueryData> &data, bool retry);

void connectRun();

void abortWaitingQuery();
Expand All @@ -133,7 +134,7 @@ class Database : public std::enable_shared_from_this<Database> {

BlockingQueue<std::pair<std::shared_ptr<IQuery>, std::shared_ptr<IQueryData>>> finishedQueries{};
BlockingQueue<std::pair<std::shared_ptr<IQuery>, std::shared_ptr<IQueryData>>> queryQueue{};
std::unordered_set<MYSQL_STMT *> cachedStatements{};
std::unordered_set<std::shared_ptr<StatementHandle>> cachedStatements{};
std::unordered_set<MYSQL_STMT *> freedStatements{};
MYSQL *m_sql = nullptr;
std::thread m_thread;
Expand Down
9 changes: 0 additions & 9 deletions src/mysql/IQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,6 @@ QueryResultStatus IQuery::getResultStatus() const {
//Wrapper for c api calls
//Just throws an exception if anything goes wrong for ease of use

void IQuery::mysqlAutocommit(MYSQL *sql, bool auto_mode) {
my_bool result = mysql_autocommit(sql, (my_bool) auto_mode);
if (result) {
const char *errorMessage = mysql_error(sql);
unsigned int errorCode = mysql_errno(sql);
throw MySQLException(errorCode, errorMessage);
}
}

//Returns if the query has been queued with the database instance
bool IQuery::isRunning() {
return !runningQueryData.empty();
Expand Down
4 changes: 1 addition & 3 deletions src/mysql/IQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,11 @@ class IQuery : public std::enable_shared_from_this<IQuery> {
std::shared_ptr<IQueryData> callbackQueryData;
protected:

virtual bool executeStatement(Database &database, MYSQL *m_sql, std::shared_ptr<IQueryData> data) = 0;
virtual void executeStatement(Database &database, MYSQL *m_sql, const std::shared_ptr<IQueryData>& data) = 0;

//Wrapper functions for c api that throw exceptions
static void mysqlQuery(MYSQL *sql, std::string &query);

static void mysqlAutocommit(MYSQL *sql, bool auto_mode);

static MYSQL_RES *mysqlStoreResults(MYSQL *sql);

static bool mysqlNextResult(MYSQL *sql);
Expand Down
7 changes: 2 additions & 5 deletions src/mysql/PingQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ PingQuery::~PingQuery() = default;

/* Executes the ping query
*/
void PingQuery::executeQuery(Database &database, MYSQL *connection, const std::shared_ptr<IQueryData> &data) {
bool oldAutoReconnect = database.getSQLAutoReconnect();
database.setSQLAutoReconnect(true);
this->pingSuccess = mysql_ping(connection) == 0;
database.setSQLAutoReconnect(oldAutoReconnect);
void PingQuery::executeStatement(Database &database, MYSQL *connection, const std::shared_ptr<IQueryData> &data) {
this->pingSuccess = mysql_ping(connection) == 0 || database.attemptReconnect();
}
2 changes: 1 addition & 1 deletion src/mysql/PingQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class PingQuery : public Query {
~PingQuery() override;
protected:
explicit PingQuery(const std::shared_ptr<Database>& dbase);
void executeQuery(Database &database, MYSQL* m_sql, const std::shared_ptr<IQueryData> &data) override;
void executeStatement(Database &database, MYSQL* m_sql, const std::shared_ptr<IQueryData> &data) override;
bool pingSuccess = false;
};
#endif
56 changes: 8 additions & 48 deletions src/mysql/PreparedQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,34 +173,29 @@ void PreparedQuery::generateMysqlBinds(MYSQL_BIND *binds,
}
}


/* Executes the prepared query
* This function can only ever return one result set
* Note: If an error occurs at the nth query all the actions done before
* that nth query won't be reverted even though this query results in an error
*/
void PreparedQuery::executeQuery(Database &database, MYSQL *connection, const std::shared_ptr<IQueryData> &ptr) {
void PreparedQuery::executeStatement(Database &database, MYSQL *connection, const std::shared_ptr<IQueryData>& ptr) {
std::shared_ptr<PreparedQueryData> data = std::dynamic_pointer_cast<PreparedQueryData>(ptr);
bool shouldReconnect = database.getSQLAutoReconnect();
//Autoreconnect has to be disabled for prepared statement since prepared statements
//get reset on the server if the connection fails and auto reconnects
try {
MYSQL_STMT *stmt = nullptr;
auto stmtClose = finally([&] {
if (!database.shouldCachePreparedStatements() && stmt != nullptr) {
mysql_stmt_close(stmt);
}
});
if (this->cachedStatement.load() != nullptr) {
stmt = this->cachedStatement;
if (this->cachedStatement != nullptr && this->cachedStatement->isValid()) {
stmt = this->cachedStatement->stmt;
} else {
stmt = mysqlStmtInit(connection);
my_bool attrMaxLength = 1;
mysql_stmt_attr_set(stmt, STMT_ATTR_UPDATE_MAX_LENGTH, &attrMaxLength);
mysqlStmtPrepare(stmt, this->m_query.c_str());
if (database.shouldCachePreparedStatements()) {
this->cachedStatement = stmt;
database.cacheStatement(stmt);
this->cachedStatement = database.cacheStatement(stmt);
}
}
unsigned int parameterCount = mysql_stmt_param_count(stmt);
Expand Down Expand Up @@ -235,51 +230,16 @@ void PreparedQuery::executeQuery(Database &database, MYSQL *connection, const st
}
} catch (const MySQLException &error) {
unsigned int errorCode = error.getErrorCode();
if (errorCode == ER_UNKNOWN_STMT_HANDLER || errorCode == CR_NO_PREPARE_STMT) {
//In this case, the statement is lost on the server (usually after a reconnect).
//Since the statement is unknown, nothing has been executed yet (i.e. no side effects),
//and we are perfectly fine to re-prepare the statement and try again, even if auto-reconnect
//is disabled.
database.freeStatement(this->cachedStatement);
this->cachedStatement = nullptr;
if (data->firstAttempt) {
data->firstAttempt = false;
executeQuery(database, connection, ptr);
return;
}
} else if (errorCode == CR_SERVER_LOST || errorCode == CR_SERVER_GONE_ERROR ||
errorCode == ER_MAX_PREPARED_STMT_COUNT_REACHED) {
if (errorCode == CR_SERVER_LOST || errorCode == CR_SERVER_GONE_ERROR ||
errorCode == ER_MAX_PREPARED_STMT_COUNT_REACHED || errorCode == ER_UNKNOWN_STMT_HANDLER ||
errorCode == CR_NO_PREPARE_STMT) {
//In these cases the statement will no longer be valid, free it.
database.freeStatement(this->cachedStatement);
this->cachedStatement = nullptr;
//Because autoreconnect is disabled we want to try and explicitly execute the prepared query once more
//if we can get the client to reconnect (reconnect is caused by mysql_ping)
//If this fails we just go ahead and error
if (shouldReconnect && data->firstAttempt) {
if (mysql_ping(connection) == 0) {
data->firstAttempt = false;
executeQuery(database, connection, ptr);
return;
}
}
}
//Rethrow error to be handled by executeStatement()
throw error;
}
}

bool PreparedQuery::executeStatement(Database &database, MYSQL *connection, std::shared_ptr<IQueryData> ptr) {
std::shared_ptr<PreparedQueryData> data = std::dynamic_pointer_cast<PreparedQueryData>(ptr);
data->setStatus(QUERY_RUNNING);
try {
this->executeQuery(database, connection, ptr);
data->setResultStatus(QUERY_SUCCESS);
} catch (const MySQLException &error) {
data->setResultStatus(QUERY_ERROR);
data->setError(error.what());
}
return true;
}

std::shared_ptr<QueryData> PreparedQuery::buildQueryData() {
std::shared_ptr<PreparedQueryData> data(new PreparedQueryData());
data->m_parameters = this->m_parameters;
Expand Down
Loading

0 comments on commit 1152181

Please sign in to comment.