Skip to content

Commit

Permalink
Merge pull request #1506 from Expensify/main
Browse files Browse the repository at this point in the history
Update expensify_prod branch
  • Loading branch information
bondydaa authored May 16, 2023
2 parents dee3372 + 3d99438 commit c3ed869
Show file tree
Hide file tree
Showing 13 changed files with 187 additions and 48 deletions.
13 changes: 13 additions & 0 deletions BedrockBlockingCommandQueue.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#include <BedrockBlockingCommandQueue.h>

void BedrockBlockingCommandQueue::startTiming(unique_ptr<BedrockCommand>& command) {
command->startTiming(BedrockCommand::QUEUE_BLOCKING);
}

void BedrockBlockingCommandQueue::stopTiming(unique_ptr<BedrockCommand>& command) {
command->stopTiming(BedrockCommand::QUEUE_BLOCKING);
}

BedrockBlockingCommandQueue::BedrockBlockingCommandQueue() :
BedrockCommandQueue(function<void(unique_ptr<BedrockCommand>&)>(startTiming), function<void(unique_ptr<BedrockCommand>&)>(stopTiming))
{ }
13 changes: 13 additions & 0 deletions BedrockBlockingCommandQueue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#pragma once
#include "BedrockCommandQueue.h"

class BedrockCommand;

class BedrockBlockingCommandQueue : public BedrockCommandQueue {
public:
BedrockBlockingCommandQueue();

// Functions to start and stop timing on the commands when they're inserted/removed from the queue.
static void startTiming(unique_ptr<BedrockCommand>& command);
static void stopTiming(unique_ptr<BedrockCommand>& command);
};
36 changes: 21 additions & 15 deletions BedrockCommand.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ void BedrockCommand::finalizeTimingInfo() {
uint64_t commitSyncTotal = 0;
uint64_t queueWorkerTotal = 0;
uint64_t queueSyncTotal = 0;
uint64_t queueBlockingTotal = 0;
for (const auto& entry: timingInfo) {
if (get<0>(entry) == PEEK) {
peekTotal += get<2>(entry) - get<1>(entry);
Expand All @@ -137,6 +138,8 @@ void BedrockCommand::finalizeTimingInfo() {
commitSyncTotal += get<2>(entry) - get<1>(entry);
} else if (get<0>(entry) == QUEUE_WORKER) {
queueWorkerTotal += get<2>(entry) - get<1>(entry);
} else if (get<0>(entry) == QUEUE_BLOCKING) {
queueBlockingTotal += get<2>(entry) - get<1>(entry);
} else if (get<0>(entry) == QUEUE_SYNC) {
queueSyncTotal += get<2>(entry) - get<1>(entry);
}
Expand All @@ -147,7 +150,7 @@ void BedrockCommand::finalizeTimingInfo() {

// Time that wasn't accounted for in all the other metrics.
uint64_t unaccountedTime = totalTime - (peekTotal + processTotal + commitWorkerTotal + commitSyncTotal +
escalationTimeUS + queueWorkerTotal + queueSyncTotal);
escalationTimeUS + queueWorkerTotal + queueBlockingTotal + queueSyncTotal);

// Build a map of the values we care about.
map<string, uint64_t> valuePairs = {
Expand Down Expand Up @@ -202,21 +205,24 @@ void BedrockCommand::finalizeTimingInfo() {
}
}

// Log all this info.
SINFO("command '" << methodName << "' timing info (ms): "
<< peekTotal/1000 << " (" << peekCount << "), "
<< processTotal/1000 << " (" << processCount << "), "
<< commitWorkerTotal/1000 << ", "
<< commitSyncTotal/1000 << ", "
<< queueWorkerTotal/1000 << ", "
<< queueSyncTotal/1000 << ", "
<< totalTime/1000 << ", "
<< unaccountedTime/1000 << ", "
<< escalationTimeUS/1000 << ". Upstream: "
<< upstreamPeekTime/1000 << ", "
<< upstreamProcessTime/1000 << ", "
<< upstreamTotalTime/1000 << ", "
<< upstreamUnaccountedTime/1000 << "."
"peek:" << peekTotal/1000 << " (count:" << peekCount << "), "
"process:" << processTotal/1000 << " (count:" << processCount << "), "
"total:" << totalTime/1000 << ", "
"unaccounted:" << unaccountedTime/1000 <<
". Commit: "
"worker:" << commitWorkerTotal/1000 << ", "
"sync:"<< commitSyncTotal/1000 <<
". Queue: "
"worker:" << queueWorkerTotal/1000 << ", "
"sync:" << queueSyncTotal/1000 << ", "
"blocking:" << queueBlockingTotal/1000 << ", "
"escalation:" << escalationTimeUS/1000 <<
". Upstream: "
"peek:" << upstreamPeekTime/1000 << ", "
"process:"<< upstreamProcessTime/1000 << ", "
"total:" << upstreamTotalTime/1000 << ", "
"unaccounted:" << upstreamUnaccountedTime/1000 << "."
);

// And here's where we set our own values.
Expand Down
3 changes: 3 additions & 0 deletions BedrockCommand.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class BedrockCommand : public SQLiteCommand {
COMMIT_SYNC,
QUEUE_WORKER,
QUEUE_SYNC,
QUEUE_BLOCKING,
};

enum class STAGE {
Expand Down Expand Up @@ -192,6 +193,8 @@ class BedrockCommand : public SQLiteCommand {
// This is a timestamp in *microseconds* for when this command should timeout.
uint64_t _timeout;

set<string> _tablesUsed;

static atomic<size_t> _commandCount;

static const string defaultPluginName;
Expand Down
6 changes: 6 additions & 0 deletions BedrockCommandQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ BedrockCommandQueue::BedrockCommandQueue() :
SScheduledPriorityQueue<unique_ptr<BedrockCommand>>(function<void(unique_ptr<BedrockCommand>&)>(startTiming), function<void(unique_ptr<BedrockCommand>&)>(stopTiming))
{ }

BedrockCommandQueue::BedrockCommandQueue(
function<void(unique_ptr<BedrockCommand>& item)> startFunction,
function<void(unique_ptr<BedrockCommand>& item)> endFunction
) : SScheduledPriorityQueue<unique_ptr<BedrockCommand>>(startFunction, endFunction)
{ }

list<string> BedrockCommandQueue::getRequestMethodLines() {
list<string> returnVal;
SAUTOLOCK(_queueMutex);
Expand Down
4 changes: 4 additions & 0 deletions BedrockCommandQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
class BedrockCommandQueue : public SScheduledPriorityQueue<unique_ptr<BedrockCommand>> {
public:
BedrockCommandQueue();
BedrockCommandQueue(
function<void(unique_ptr<BedrockCommand>& item)> startFunction,
function<void(unique_ptr<BedrockCommand>& item)> endFunction
);

// Functions to start and stop timing on the commands when they're inserted/removed from the queue.
static void startTiming(unique_ptr<BedrockCommand>& command);
Expand Down
66 changes: 66 additions & 0 deletions BedrockConflictManager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#include "BedrockConflictManager.h"
#include <libstuff/libstuff.h>

BedrockConflictManager::BedrockConflictManager() {
}

void BedrockConflictManager::recordTables(const string& commandName, const set<string>& tables) {
{
lock_guard<mutex> lock(m);
auto commandInfoIt = _commandInfo.find(commandName);
if (commandInfoIt == _commandInfo.end()) {
commandInfoIt = _commandInfo.emplace(make_pair(commandName, BedrockConflictManagerCommandInfo())).first;
}
BedrockConflictManagerCommandInfo& commandInfo = commandInfoIt->second;

// Increase the count of the command in general.
commandInfo.count++;

// And for each table (that's not a journal).
for (auto& table : tables) {
// Skip journals, they change on every instance of a command running and thus aren't useful for profiling which commands access which tables most frequently.
if (SStartsWith(table, "journal")) {
continue;
}

if (table == "json_each") {
continue;
}

// Does this command already have this table?
auto tableInfoIt = commandInfo.tableUseCounts.find(table);
if (tableInfoIt == commandInfo.tableUseCounts.end()) {
tableInfoIt = commandInfo.tableUseCounts.emplace(make_pair(table, 1)).first;
} else {
// tableInfoIt is an iterator into a map<string, size_t> (tableUseCounts), where the key is the table name and the value is the count of uses for this command.
// Incrementing `second` increases the count.
tableInfoIt->second++;
}
}
}

// And increase the count for each used table.
SINFO("Command " << commandName << " used tables: " << SComposeList(tables));
}

string BedrockConflictManager::generateReport() {
stringstream out;
{
lock_guard<mutex> lock(m);
for (auto& p : _commandInfo) {
const string& commandName = p.first;
const BedrockConflictManagerCommandInfo& commandInfo = p.second;

out << "Command: " << commandName << endl;
out << "Total Count: " << commandInfo.count << endl;
out << "Table usage" << endl;
for (const auto& table : commandInfo.tableUseCounts) {
const string& tableName = table.first;
const size_t& count = table.second;
out << " " << tableName << ": " << count << endl;
}
out << endl;
}
}
return out.str();
}
24 changes: 24 additions & 0 deletions BedrockConflictManager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#pragma once
#include <map>
#include <mutex>
#include <set>
#include <string>

using namespace std;

class BedrockConflictManagerCommandInfo {
public:
size_t count = 0;
map<string, size_t> tableUseCounts;
};

class BedrockConflictManager {
public:
BedrockConflictManager();
void recordTables(const string& commandName, const set<string>& tables);
string generateReport();

private:
mutex m;
map<string, BedrockConflictManagerCommandInfo> _commandInfo;
};
5 changes: 5 additions & 0 deletions BedrockServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ void BedrockServer::sync()
if (_syncNode->commitSucceeded()) {
if (command) {
SINFO("[performance] Sync thread finished committing command " << command->request.methodLine);
_conflictManager.recordTables(command->request.methodLine, db.getTablesUsed());

// Otherwise, save the commit count, mark this command as complete, and reply.
command->response["commitCount"] = to_string(db.getCommitCount());
Expand Down Expand Up @@ -968,6 +969,7 @@ void BedrockServer::runCommand(unique_ptr<BedrockCommand>&& _command, bool isBlo
_syncNode->notifyCommit();
SINFO("Committed leader transaction #" << transactionID << "(" << transactionHash << "). Command: '" << command->request.methodLine << "', blocking: "
<< (isBlocking ? "true" : "false"));
_conflictManager.recordTables(command->request.methodLine, db.getTablesUsed());
// So we must still be leading, and at this point our commit has succeeded, let's
// mark it as complete. We add the currentCommit count here as well.
command->response["commitCount"] = to_string(db.getCommitCount());
Expand Down Expand Up @@ -1688,6 +1690,7 @@ bool BedrockServer::_isControlCommand(const unique_ptr<BedrockCommand>& command)
SIEquals(command->request.methodLine, "SuppressCommandPort") ||
SIEquals(command->request.methodLine, "ClearCommandPort") ||
SIEquals(command->request.methodLine, "ClearCrashCommands") ||
SIEquals(command->request.methodLine, "ConflictReport") ||
SIEquals(command->request.methodLine, "Detach") ||
SIEquals(command->request.methodLine, "Attach") ||
SIEquals(command->request.methodLine, "SetConflictParams") ||
Expand Down Expand Up @@ -1723,6 +1726,8 @@ void BedrockServer::_control(unique_ptr<BedrockCommand>& command) {
} else if (SIEquals(command->request.methodLine, "ClearCrashCommands")) {
unique_lock<decltype(_crashCommandMutex)> lock(_crashCommandMutex);
_crashCommands.clear();
} else if (SIEquals(command->request.methodLine, "ConflictReport")) {
response.content = _conflictManager.generateReport();
} else if (SIEquals(command->request.methodLine, "Detach")) {
response.methodLine = "203 DETACHING";
_beginShutdown("Detach", true);
Expand Down
6 changes: 5 additions & 1 deletion BedrockServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include <sqlitecluster/SQLiteClusterMessenger.h>
#include "BedrockPlugin.h"
#include "BedrockCommandQueue.h"
#include "BedrockConflictManager.h"
#include "BedrockBlockingCommandQueue.h"
#include "BedrockTimeoutCommandQueue.h"

class SQLitePeer;
Expand Down Expand Up @@ -259,8 +261,10 @@ class BedrockServer : public SQLiteServer {
// Commands that aren't currently being processed are kept here.
BedrockCommandQueue _commandQueue;

BedrockConflictManager _conflictManager;

// These are commands that will be processed in a blacking fashion.
BedrockCommandQueue _blockingCommandQueue;
BedrockBlockingCommandQueue _blockingCommandQueue;

// Each time we read a new request from a client, we give it a unique ID.
atomic<uint64_t> _requestCount;
Expand Down
10 changes: 10 additions & 0 deletions sqlitecluster/SQLite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ string SQLite::initializeFilename(const string& filename) {
}
}

const set<string>& SQLite::getTablesUsed() const {
return _tablesUsed;
}

SQLite::SharedData& SQLite::initializeSharedData(sqlite3* db, const string& filename, const vector<string>& journalNames, bool hctree) {
static struct SharedDataLookupMapType {
map<string, SharedData*> m;
Expand Down Expand Up @@ -350,6 +354,7 @@ bool SQLite::beginTransaction(TRANSACTION_TYPE type) {
// the above `BEGIN CONCURRENT` and the `getCommitCount` call in a lock, which is worse.
_dbCountAtStart = getCommitCount();
_queryCache.clear();
_tablesUsed.clear();
_queryCount = 0;
_cacheHits = 0;
_beginElapsed = STimeNow() - before;
Expand Down Expand Up @@ -862,6 +867,11 @@ int SQLite::_authorize(int actionCode, const char* detail1, const char* detail2,
return SQLITE_DENY;
}

// Record all tables touched.
if (set<int>{SQLITE_INSERT, SQLITE_DELETE, SQLITE_READ, SQLITE_UPDATE}.count(actionCode)) {
_tablesUsed.insert(detail1);
}

// Here's where we can check for non-deterministic functions for the cache.
if (actionCode == SQLITE_FUNCTION && detail2) {
if (!strcmp(detail2, "random") ||
Expand Down
5 changes: 5 additions & 0 deletions sqlitecluster/SQLite.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ class SQLite {
void setUpdateNoopMode(bool enabled);
bool getUpdateNoopMode() const;

const set<string>& getTablesUsed() const;

// Prepare to commit or rollback the transaction. This also inserts the current uncommitted query into the
// journal; no additional writes are allowed until the next transaction has begun.
// The transactionID and transactionHash, if passed, will be updated with the values prepared for this transaction.
Expand Down Expand Up @@ -452,6 +454,9 @@ class SQLite {
// write, rollback, or commit.
map<string, SQResult> _queryCache;

// List of table names used during this transaction.
set<string> _tablesUsed;

// Number of queries that have been attempted in this transaction (for metrics only).
int64_t _queryCount = 0;

Expand Down
44 changes: 12 additions & 32 deletions test/clustertest/tests/StatusHandlingCommandsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,15 @@

struct StatusHandlingCommandsTest : tpunit::TestFixture {
StatusHandlingCommandsTest()
: tpunit::TestFixture("StatusHandlingCommands",
BEFORE_CLASS(StatusHandlingCommandsTest::setup),
AFTER_CLASS(StatusHandlingCommandsTest::teardown),
TEST(StatusHandlingCommandsTest::test)) { }

BedrockClusterTester* tester;

void setup () {
tester = new BedrockClusterTester();
}

void teardown() {
delete tester;
}
: tpunit::TestFixture("StatusHandlingCommands", TEST(StatusHandlingCommandsTest::test)) { }

void test() {
vector<string> results(3);
BedrockTester& leader = tester->getTester(0);
BedrockTester& follower = tester->getTester(1);
BedrockClusterTester tester;
BedrockTester& leader = tester.getTester(0);
BedrockTester& follower = tester.getTester(1);
vector<string> results(2);

leader.stopServer();

thread healthCheckThread([this, &results, &follower](){
SData cmd("GET /status/handlingCommands HTTP/1.1");
Expand All @@ -39,30 +29,20 @@ struct StatusHandlingCommandsTest : tpunit::TestFixture {
} else if (result == "HTTP/1.1 200 FOLLOWING") {
results[1] = result;
foundFollower = true;
} else if (result == "HTTP/1.1 200 STANDINGDOWN") {
results[2] = result;
foundStandingdown = true;

// If we get here, it's not going back to leading/standingdown.
break;
}
}
});

leader.stopServer();

// Execute a slow query while the follower is leading so when the
// leader is brought back up, it will be STANDINGDOWN until it finishes
thread slowQueryThread([this, &follower](){
SData slow("slowquery");
slow["timeout"] = "5000"; // 5s
follower.executeWaitVerifyContent(slow, "555 Timeout peeking command");
});

sleep(1);
leader.startServer(false);
slowQueryThread.join();
healthCheckThread.join();

ASSERT_EQUAL(results[0], "HTTP/1.1 200 LEADING")
ASSERT_EQUAL(results[1], "HTTP/1.1 200 FOLLOWING")
ASSERT_EQUAL(results[2], "HTTP/1.1 200 STANDINGDOWN")
// We don't test STANDINGDOWN because it's unreliable to get it to show up in the status, we can move straight through it too quickly.
}

} __StatusHandlingCommandsTest;

0 comments on commit c3ed869

Please sign in to comment.