Skip to content

Commit

Permalink
Merge pull request #1315 from Expensify/main
Browse files Browse the repository at this point in the history
Update expensify_prod branch
  • Loading branch information
nathanmetcalf authored Jun 24, 2022
2 parents 9a5893b + 4465b00 commit 1d272b8
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 0 deletions.
11 changes: 11 additions & 0 deletions sqlitecluster/SQLiteNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ void SQLiteNode::_replicate(SQLitePeer* peer, SData command, size_t sqlitePoolIn
SDEBUG("BEGIN for commit " << newCount);
bool uniqueContraintsError = false;
try {
auto start = chrono::steady_clock::now();
_handleBeginTransaction(db, peer, command, commitAttemptCount > 1);

// Now we need to wait for the DB to be up-to-date (if the transaction is QUORUM, we can
Expand All @@ -208,6 +209,7 @@ void SQLiteNode::_replicate(SQLitePeer* peer, SData command, size_t sqlitePoolIn
SQLiteSequentialNotifier::RESULT waitResult = _localCommitNotifier.waitFor(currentCount, true);
if (waitResult == SQLiteSequentialNotifier::RESULT::CANCELED) {
SINFO("Replication canceled mid-transaction, stopping.");
--_concurrentReplicateTransactions;
db.rollback();
break;
}
Expand All @@ -216,6 +218,9 @@ void SQLiteNode::_replicate(SQLitePeer* peer, SData command, size_t sqlitePoolIn
// Ok, almost ready.
// Note:: calls _sendToPeer() which is a write operation.
_handlePrepareTransaction(db, peer, command);
auto duration = chrono::steady_clock::now() - start;
SINFO("Wrote replicate transaction in " << chrono::duration_cast<chrono::microseconds>(duration).count() << "us. " << _concurrentReplicateTransactions.load()
<< " concurrent replicate transactions in " << _replicationThreadCount << " threads.");
} catch (const SQLite::constraint_error& e) {
// We could `continue` immediately upon catching this exception, but instead, we wait for the
// leader commit notifier to be ready. This prevents us from spinning in an endless loop on the
Expand All @@ -229,10 +234,12 @@ void SQLiteNode::_replicate(SQLitePeer* peer, SData command, size_t sqlitePoolIn
SQLiteSequentialNotifier::RESULT waitResult = _leaderCommitNotifier.waitFor(command.calcU64("NewCount"), true);
if (uniqueContraintsError) {
SINFO("Got unique constraints error in replication, restarting.");
--_concurrentReplicateTransactions;
db.rollback();
continue;
} else if (waitResult == SQLiteSequentialNotifier::RESULT::CANCELED) {
SINFO("Replication canceled mid-transaction, stopping.");
--_concurrentReplicateTransactions;
db.rollback();
break;
}
Expand All @@ -250,12 +257,14 @@ void SQLiteNode::_replicate(SQLitePeer* peer, SData command, size_t sqlitePoolIn
} catch (const SException& e) {
SALERT("Caught exception in replication thread. Assuming this means we want to stop following. Exception: " << e.what());
goSearchingOnExit = true;
--_concurrentReplicateTransactions;
db.rollback();
}
} else if (SIEquals(command.methodLine, "ROLLBACK_TRANSACTION")) {
// `decrementer` needs to be destroyed to decrement our thread count before we can change state out of
// FOLLOWING.
_handleRollbackTransaction(db, peer, command);
--_concurrentReplicateTransactions;
goSearchingOnExit = true;
} else if (SIEquals(command.methodLine, "COMMIT_TRANSACTION")) {
_leaderCommitNotifier.notifyThrough(command.calcU64("CommitCount"));
Expand Down Expand Up @@ -2366,6 +2375,7 @@ void SQLiteNode::_handleBeginTransaction(SQLite& db, SQLitePeer* peer, const SDa
// difference in replication, as transactions must commit in order, thus if we've failed one commit, nobody
// else can attempt to commit anyway, but this logs our time spent in the commit mutex in EXCLUSIVE rather
// than SHARED mode.
++_concurrentReplicateTransactions;
if (!db.beginTransaction(wasConflict ? SQLite::TRANSACTION_TYPE::EXCLUSIVE : SQLite::TRANSACTION_TYPE::SHARED)) {
STHROW("failed to begin transaction");
}
Expand Down Expand Up @@ -2454,6 +2464,7 @@ int SQLiteNode::_handleCommitTransaction(SQLite& db, SQLitePeer* peer, const uin

SDEBUG("Committing current transaction because COMMIT_TRANSACTION: " << db.getUncommittedQuery());
int result = db.commit(stateName(_state));
--_concurrentReplicateTransactions;
if (result == SQLITE_BUSY_SNAPSHOT) {
// conflict, bail out early.
return result;
Expand Down
4 changes: 4 additions & 0 deletions sqlitecluster/SQLiteNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -369,4 +369,8 @@ class SQLiteNode : public STCPManager {
// The peer that we'll synchronize from.
// Remove. See: https://github.com/Expensify/Expensify/issues/208439
SQLitePeer* _syncPeer;

// Debugging info. Log the current number of transactions we're actually performing in replicate threads.
// This can be removed once we've figured out why replication falls behind. See this issue: https://github.com/Expensify/Expensify/issues/210528
atomic<size_t> _concurrentReplicateTransactions = 0;
};

0 comments on commit 1d272b8

Please sign in to comment.