Skip to content

Commit

Permalink
Merge pull request #1259 from Expensify/main
Browse files Browse the repository at this point in the history
Update expensify_prod branch
  • Loading branch information
fukawi2 authored May 6, 2022
2 parents 515d3e5 + 620c773 commit 2efde31
Show file tree
Hide file tree
Showing 11 changed files with 67 additions and 337 deletions.
3 changes: 1 addition & 2 deletions BedrockCore.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ class BedrockCore : public SQLiteCore {
SHOULD_PROCESS = 2,
NEEDS_COMMIT = 3,
NO_COMMIT_REQUIRED = 4,
ABANDONED_FOR_CHECKPOINT = 5,
SERVER_NOT_LEADING = 6
SERVER_NOT_LEADING = 5
};

// Automatic timing class that records an entry corresponding to its lifespan.
Expand Down
198 changes: 27 additions & 171 deletions BedrockServer.cpp

Large diffs are not rendered by default.

7 changes: 0 additions & 7 deletions BedrockServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -469,10 +469,6 @@ class BedrockServer : public SQLiteServer {
// Timestamp for the last time we promoted a command to QUORUM.
atomic<uint64_t> _lastQuorumCommandTime;

// We keep a queue of completed commands that workers will insert into when they've successfully finished a command
// that just needs to be returned to a peer.
BedrockTimeoutCommandQueue _completedCommands;

// Whether or not all plugins are detached
bool _pluginsDetached;

Expand Down Expand Up @@ -509,7 +505,4 @@ class BedrockServer : public SQLiteServer {
// syncNode while the sync thread exists, it's a shared pointer to allow for the last socket thread using it to
// destroy the pool at shutdown.
shared_ptr<SQLitePool> _dbPool;

// TODO: Remove once we've verified this all works.
atomic<bool> _escalateOverHTTP = false;
};
4 changes: 0 additions & 4 deletions main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,6 @@ int main(int argc, char* argv[]) {
sqlite3_initialize();
SASSERT(sqlite3_threadsafe());

// Disabled by default, but lets really beat it in. This way checkpointing does not need to wait on locks
// created in this thread.
SASSERT(sqlite3_enable_shared_cache(0) == SQLITE_OK);

// Fork if requested
if (args.isSet("-fork")) {
// Do the fork
Expand Down
18 changes: 1 addition & 17 deletions sqlitecluster/SQLite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
atomic<int64_t> SQLite::_transactionAttemptCount(0);
mutex SQLite::_pageLogMutex;

atomic<int> SQLite::passiveCheckpointPageMin(2500); // Approx 10mb
atomic<int> SQLite::fullCheckpointPageMin(25000); // Approx 100mb (pages are assumed to be 4kb)

// Tracing can only be enabled or disabled globally, not per object.
atomic<bool> SQLite::enableTrace(false);

Expand Down Expand Up @@ -185,9 +182,6 @@ void SQLite::commonConstructorInitialization() {
SASSERT(!SQuery(_db, "enabling memory-mapped I/O", "PRAGMA mmap_size=" + to_string(_mmapSizeGB * 1024 * 1024 * 1024) + ";"));
}

// Do our own checkpointing.
sqlite3_wal_hook(_db, _sqliteWALCallback, this);

// Enable tracing for performance analysis.
sqlite3_trace_v2(_db, SQLITE_TRACE_STMT, _sqliteTraceCallback, this);

Expand Down Expand Up @@ -268,10 +262,6 @@ int SQLite::_sqliteTraceCallback(unsigned int traceCode, void* c, void* p, void*
return 0;
}

int SQLite::_sqliteWALCallback(void* data, sqlite3* db, const char* dbName, int pageCount) {
return SQLITE_OK;
}

string SQLite::_getJournalQuery(const list<string>& queryParts, bool append) {
return _getJournalQuery(_journalNames, queryParts, append);
}
Expand Down Expand Up @@ -671,13 +661,7 @@ int SQLite::commit(const string& description) {
_mutexLocked = false;
_queryCache.clear();

// See if we can checkpoint without holding the commit lock.
int walSizeFrames = 0;
int framesCheckpointed = 0;
uint64_t start = STimeNow();
int result = sqlite3_wal_checkpoint_v2(_db, 0, SQLITE_CHECKPOINT_PASSIVE, &walSizeFrames, &framesCheckpointed);
SDEBUG("[checkpoint] Checkpoint complete. Result: " << result << ". Total frames checkpointed: "
<< framesCheckpointed << " of " << walSizeFrames << " in " << ((STimeNow() - start) / 1000) << "ms.");
sqlite3_wal_checkpoint_v2(_db, 0, SQLITE_CHECKPOINT_PASSIVE, NULL, NULL);
SINFO(description << " COMMIT complete in " << time << ". Wrote " << (endPages - startPages)
<< " pages. WAL file size is " << sz << " bytes. " << _queryCount << " queries attempted, " << _cacheHits
<< " served from cache.");
Expand Down
10 changes: 1 addition & 9 deletions sqlitecluster/SQLite.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,6 @@ class SQLite {
// in the case that a specific table/column are not being directly requested.
map<string, set<string>>* whitelist = nullptr;

// These are the minimum thresholds for the WAL file, in pages, that will cause us to trigger either a full or
// passive checkpoint. They're public, non-const, and atomic so that they can be configured on the fly.
static atomic<int> passiveCheckpointPageMin;
static atomic<int> fullCheckpointPageMin;

// Enable/disable SQL statement tracing.
static atomic<bool> enableTrace;

Expand Down Expand Up @@ -300,7 +295,7 @@ class SQLite {
map<uint64_t, tuple<string, string, uint64_t>> _committedTransactions;

// This mutex is locked when we need to change the state of the _shareData object. It is shared between a
// variety of operations (i.e., inserting checkpoint listeners, updating _committedTransactions, etc.
// variety of operations (i.e., updating _committedTransactions, etc).
recursive_mutex _internalStateMutex;
};

Expand Down Expand Up @@ -408,9 +403,6 @@ class SQLite {
// Callback to trace internal sqlite state (used for logging normalized queries).
static int _sqliteTraceCallback(unsigned int traceCode, void* c, void* p, void* x);

// Handles running checkpointing operations.
static int _sqliteWALCallback(void* data, sqlite3* db, const char* dbName, int pageCount);

// Callback function for progress tracking.
static int _progressHandlerCallback(void* arg);
uint64_t _timeoutLimit = 0;
Expand Down
3 changes: 0 additions & 3 deletions sqlitecluster/SQLiteCommand.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ SData SQLiteCommand::preprocessRequest(SData&& request) {
SQLiteCommand::SQLiteCommand(SData&& _request) :
privateRequest(move(preprocessRequest(move(_request)))),
request(privateRequest),
initiatingPeerID(0),
initiatingClientID(0),
writeConsistency(SQLiteNode::ASYNC),
complete(false),
Expand Down Expand Up @@ -54,7 +53,6 @@ SQLiteCommand::SQLiteCommand(SData&& _request) :
SQLiteCommand::SQLiteCommand(SQLiteCommand&& from) :
privateRequest(move(from.privateRequest)),
request(privateRequest),
initiatingPeerID(from.initiatingPeerID),
initiatingClientID(from.initiatingClientID),
id(move(from.id)),
jsonContent(move(from.jsonContent)),
Expand All @@ -70,7 +68,6 @@ SQLiteCommand::SQLiteCommand(SQLiteCommand&& from) :
SQLiteCommand::SQLiteCommand() :
privateRequest(),
request(privateRequest),
initiatingPeerID(0),
initiatingClientID(0),
writeConsistency(SQLiteNode::ASYNC),
complete(false),
Expand Down
140 changes: 36 additions & 104 deletions sqlitecluster/SQLiteNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,6 @@ void SQLiteNode::_replicate(SQLiteNode& node, SQLitePeer* peer, SData command, s
// skip this, we did it above) to enforce that commits are in the same order on followers as on
// leader.
if (!quorum) {
// If we get here, we're *in* a transaction (begin ran) so the checkpoint thread is blocked
// waiting for us to finish. But the thread that needs to commit to unblock us can be blocked
// on the checkpoint if these are started out of order.
//
// Let's see if we can verify that happened.
// Yes, we get this line logged 4 times from four threads as their last activity and then:
// (SQLite.cpp:403) operator() [checkpoint] [info] [checkpoint] Waiting on 4 remaining transactions.
SDEBUG("Waiting at commit " << db.getCommitCount() << " for commit " << currentCount);
SQLiteSequentialNotifier::RESULT waitResult = node._localCommitNotifier.waitFor(currentCount, true);
if (waitResult == SQLiteSequentialNotifier::RESULT::CANCELED) {
Expand Down Expand Up @@ -959,10 +952,6 @@ bool SQLiteNode::update() {
}
}

// Did we get a majority? This is important whether or not our consistency level needs it, as it will
// reset the checkpoint limit either way.
bool majorityApproved = (numFullApproved * 2 >= numFullPeers);

// Figure out if we have enough consistency
bool consistentEnough = false;
switch (_commitConsistency) {
Expand All @@ -976,7 +965,7 @@ bool SQLiteNode::update() {
break;
case QUORUM:
// This one requires a majority
consistentEnough = majorityApproved;
consistentEnough = (numFullApproved * 2 >= numFullPeers);
break;
default:
SERROR("Invalid write consistency.");
Expand All @@ -988,17 +977,6 @@ bool SQLiteNode::update() {
// should be followers that are disconnected.
bool everybodyResponded = numFullResponded >= numFullFollowers;

// Record these for posterity
SDEBUG( "numFullPeers=" << numFullPeers
<< ", numFullFollowers=" << numFullFollowers
<< ", numFullResponded=" << numFullResponded
<< ", numFullApproved=" << numFullApproved
<< ", majorityApproved=" << majorityApproved
<< ", writeConsistency=" << CONSISTENCY_LEVEL_NAMES[_commitConsistency]
<< ", consistencyRequired=" << CONSISTENCY_LEVEL_NAMES[_commitConsistency]
<< ", consistentEnough=" << consistentEnough
<< ", everybodyResponded=" << everybodyResponded);

// If anyone denied this transaction, roll this back. Alternatively, roll it back if everyone we're
// currently connected to has responded, but that didn't generate enough consistency. This could happen, in
// theory, if we were disconnected from enough of the cluster that we could no longer reach QUORUM, but
Expand Down Expand Up @@ -2213,45 +2191,32 @@ void SQLiteNode::_recvSynchronize(SQLitePeer* peer, const SData& message) {
// **FIXME: This could be optimized to commit in one huge transaction
content += messageSize;
remaining -= messageSize;
if (!SIEquals(commit.methodLine, "COMMIT"))
if (!SIEquals(commit.methodLine, "COMMIT")) {
STHROW("expecting COMMIT");
if (!commit.isSet("CommitIndex"))
}
if (!commit.isSet("CommitIndex")) {
STHROW("missing CommitIndex");
if (commit.calc64("CommitIndex") < 0)
}
if (commit.calc64("CommitIndex") < 0) {
STHROW("invalid CommitIndex");
if (!commit.isSet("Hash"))
}
if (!commit.isSet("Hash")) {
STHROW("missing Hash");
if (commit.content.empty())
}
if (commit.content.empty()) {
SALERT("Synchronized blank query");
if (commit.calcU64("CommitIndex") != _db.getCommitCount() + 1)
}
if (commit.calcU64("CommitIndex") != _db.getCommitCount() + 1) {
STHROW("commit index mismatch");

// This block repeats until we successfully commit, or throw out of it.
// This allows us to retry in the event we're interrupted for a checkpoint. This should only happen once,
// because the second try will be blocked on the checkpoint.
while (true) {
try {
if (!_db.beginTransaction()) {
STHROW("failed to begin transaction");
}

// Inside a transaction; get ready to back out if an error
if (!_db.writeUnmodified(commit.content)) {
STHROW("failed to write transaction");
}
if (!_db.prepare()) {
STHROW("failed to prepare transaction");
}

// Done, break out of `while (true)`.
break;
} catch (const SException& e) {
// Transaction failed, clean up
SERROR("Can't synchronize (" << e.what() << "); shutting down.");
// **FIXME: Remove the above line once we can automatically handle?
_db.rollback();
throw e;
}
}
if (!_db.beginTransaction()) {
STHROW("failed to begin transaction");
}
if (!_db.writeUnmodified(commit.content)) {
STHROW("failed to write transaction");
}
if (!_db.prepare()) {
STHROW("failed to prepare transaction");
}

// Transaction succeeded, commit and go to the next
Expand Down Expand Up @@ -2429,34 +2394,17 @@ void SQLiteNode::_handleBeginTransaction(SQLite& db, SQLitePeer* peer, const SDa
STHROW("already in a transaction");
}

// This block repeats until we successfully commit, or error out of it.
// This allows us to retry in the event we're interrupted for a checkpoint. This should only happen once,
// because the second try will be blocked on the checkpoint.
while (true) {
try {
// If we are running this after a conflict, we'll grab an exclusive lock here. This makes no practical
// difference in replication, as transactions must commit in order, thus if we've failed one commit, nobody
// else can attempt to commit anyway, but this logs our time spent in the commit mutex in EXCLUSIVE rather
// than SHARED mode.
if (!db.beginTransaction(wasConflict ? SQLite::TRANSACTION_TYPE::EXCLUSIVE : SQLite::TRANSACTION_TYPE::SHARED)) {
STHROW("failed to begin transaction");
}

// Inside transaction; get ready to back out on error
if (!db.writeUnmodified(message.content)) {
STHROW("failed to write transaction");
}

// Done, break out of `while (true)`.
break;
} catch (const SException& e) {
// Something caused a write failure.
SALERT(e.what());
db.rollback();
// If we are running this after a conflict, we'll grab an exclusive lock here. This makes no practical
// difference in replication, as transactions must commit in order, thus if we've failed one commit, nobody
// else can attempt to commit anyway, but this logs our time spent in the commit mutex in EXCLUSIVE rather
// than SHARED mode.
if (!db.beginTransaction(wasConflict ? SQLite::TRANSACTION_TYPE::EXCLUSIVE : SQLite::TRANSACTION_TYPE::SHARED)) {
STHROW("failed to begin transaction");
}

// This is a fatal error case.
break;
}
// Inside transaction; get ready to back out on error
if (!db.writeUnmodified(message.content)) {
STHROW("failed to write transaction");
}
}

Expand All @@ -2466,7 +2414,6 @@ void SQLiteNode::_handlePrepareTransaction(SQLite& db, SQLitePeer* peer, const S
// the transaction for any reason, it is broken somehow -- disconnect from the leader.
// **FIXME**: What happens if LEADER steps down before sending BEGIN?
// **FIXME**: What happens if LEADER steps down or disconnects after BEGIN?
bool success = true;
uint64_t leaderSentTimestamp = message.calcU64("leaderSendTime");
uint64_t followerDequeueTimestamp = STimeNow();
if (!message.isSet("ID")) {
Expand All @@ -2482,26 +2429,11 @@ void SQLiteNode::_handlePrepareTransaction(SQLite& db, SQLitePeer* peer, const S
STHROW("not following");
}

// This block repeats until we successfully commit, or error out of it.
// This allows us to retry in the event we're interrupted for a checkpoint. This should only happen once,
// because the second try will be blocked on the checkpoint.
while (true) {
try {
// This will grab the commit lock and hold it until we commit or rollback.
if (!db.prepare()) {
STHROW("failed to prepare transaction");
}

// Done, break out of `while (true)`.
break;
} catch (const SException& e) {
// Something caused a write failure.
success = false;
db.rollback();

// This is a fatal error case.
break;
}
bool success = true;
if (!db.prepare()) {
SALERT("failed to prepare transaction");
success = false;
db.rollback();
}

// Are we participating in quorum?
Expand Down
3 changes: 1 addition & 2 deletions sqlitecluster/SQLiteSequentialNotifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ class SQLiteSequentialNotifier {
multimap<uint64_t, shared_ptr<WaitState>> _valueToPendingThreadMapNoCurrentTransaction;
uint64_t _value;

// If there is a global result for all pending operations (i.e., they've been canceled or a checkpoint needs to
// happen), that is stored here.
// If there is a global result for all pending operations (i.e., they've been canceled), that is stored here.
RESULT _globalResult;

// For saving the value after which new or existing waiters will be returned a CANCELED result.
Expand Down
16 changes: 0 additions & 16 deletions test/clustertest/tests/ConflictSpamTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,7 @@ struct ConflictSpamTest : tpunit::TestFixture {

void setup() {
cmdID.store(0);

// Turn the settings for checkpointing way down so we can observe that both passive and full checkpoints
// happen as expected.
tester = new BedrockClusterTester();
for (int i = 0; i < 3; i++) {
BedrockTester& node = tester->getTester(i);
SData controlCommand("SetCheckpointIntervals");
controlCommand["passiveCheckpointPageMin"] = to_string(3);
controlCommand["fullCheckpointPageMin"] = to_string(50);
vector<SData> results = node.executeWaitMultipleData({controlCommand}, 1, true);

// Verify we got a reasonable result.
ASSERT_EQUAL(results.size(), 1);
ASSERT_EQUAL(results[0].methodLine, "200 OK");
ASSERT_EQUAL(results[0]["fullCheckpointPageMin"], to_string(25000));
ASSERT_EQUAL(results[0]["passiveCheckpointPageMin"], to_string(2500));
}
}

void teardown() {
Expand Down
2 changes: 0 additions & 2 deletions test/lib/BedrockTester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ BedrockTester::BedrockTester(const map<string, string>& args,
{"-mmapSizeGB", "1"},
{"-maxJournalSize", "25000"},
{"-v", ""},
{"-quorumCheckpoint", "50"},
{"-enableMultiWrite", "true"},
{"-escalateOverHTTP", "true"},
{"-cacheSize", "1000"},
{"-parallelReplication", "true"},
// Currently breaks only in Travis and needs debugging, which has been removed, maybe?
Expand Down

0 comments on commit 2efde31

Please sign in to comment.