Skip to content

Commit

Permalink
Merge pull request #861 from Expensify/master
Browse files Browse the repository at this point in the history
Update expensify_prod branch
  • Loading branch information
justinpersaud authored Aug 26, 2020
2 parents 507b811 + d3bc88c commit 6fbdda1
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 34 deletions.
4 changes: 2 additions & 2 deletions BedrockServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -789,8 +789,8 @@ void BedrockServer::worker(SQLitePool& dbPool,
SInitialize(threadId ? "worker" + to_string(threadId) : "blockingCommit");

// Get a DB handle to work on. This will automatically be returned when dbScope goes out of scope.
SQLite& db = dbPool.get();
SQLiteScopedHandle dbScope(dbPool, db);
SQLiteScopedHandle dbScope(dbPool, dbPool.getIndex());
SQLite& db = dbScope.db();
BedrockCore core(db, server);

// Command to work on. This default command is replaced when we find work to do.
Expand Down
2 changes: 1 addition & 1 deletion libstuff/STCPNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ void STCPNode::postPoll(fd_map& fdm, uint64_t& nextActivity) {
// What is it?
socket->recvBuffer.consumeFront(messageSize);
if (SIEquals(message.methodLine, "NODE_LOGIN")) {
// Got it -- can we asssociate with a peer?
// Got it -- can we associate with a peer?
bool foundIt = false;
for (Peer* peer : peerList) {
// Just match any unconnected peer
Expand Down
7 changes: 4 additions & 3 deletions sqlitecluster/SQLiteNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,13 @@ SQLiteNode::~SQLiteNode() {
_dbPool.getBase().removeCheckpointListener(_leaderCommitNotifier);
}

void SQLiteNode::replicate(SQLiteNode& node, Peer* peer, SData command, SQLite& db) {
void SQLiteNode::replicate(SQLiteNode& node, Peer* peer, SData command, size_t sqlitePoolIndex) {
// Initialize each new thread with a new number.
SInitialize("replicate" + to_string(node._currentCommandThreadID.fetch_add(1)));

// Allow the DB handle to be returned regardless of how this function exits.
SQLiteScopedHandle dbScope(node._dbPool, db);
SQLiteScopedHandle dbScope(node._dbPool, sqlitePoolIndex);
SQLite& db = dbScope.db();

bool goSearchingOnExit = false;
{
Expand Down Expand Up @@ -1592,7 +1593,7 @@ void SQLiteNode::_onMESSAGE(Peer* peer, const SData& message) {
auto threadID = _replicationThreadCount.fetch_add(1);
SINFO("Spawning concurrent replicate thread (blocks until DB handle available): " << threadID);
AutoTimerTime time(_multiReplicationThreadSpawn);
thread(replicate, ref(*this), peer, message, ref(_dbPool.get())).detach();
thread(replicate, ref(*this), peer, message, _dbPool.getIndex(false)).detach();
SINFO("Done spawning concurrent replicate thread: " << threadID);
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion sqlitecluster/SQLiteNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ class SQLiteNode : public STCPNode {
//
// This thread exits on completion of handling the command or when node._replicationThreadsShouldExit is set,
// which happens when a node stops FOLLOWING.
static void replicate(SQLiteNode& node, Peer* peer, SData command, SQLite& db);
static void replicate(SQLiteNode& node, Peer* peer, SData command, size_t sqlitePoolIndex);

// Counter of the total number of currently active replication threads. This is used to let us know when all
// threads have finished.
Expand Down
59 changes: 40 additions & 19 deletions sqlitecluster/SQLitePool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,43 +11,53 @@ SQLitePool::SQLitePool(size_t maxDBs,
int64_t mmapSizeGB,
bool pageLoggingEnabled)
: _maxDBs(max(maxDBs, 1ul)),
_baseDB(filename, cacheSize, enableFullCheckpoints, maxJournalSize, minJournalTables, synchronous, mmapSizeGB, pageLoggingEnabled)
{ }
_baseDB(filename, cacheSize, enableFullCheckpoints, maxJournalSize, minJournalTables, synchronous, mmapSizeGB, pageLoggingEnabled),
_objects(_maxDBs, nullptr)
{
}

SQLitePool::~SQLitePool() {
lock_guard<mutex> lock(_sync);
if (_inUseHandles.size()) {
SWARN("Destroying SQLitePool with DBs in use.");
}
for (auto dbHandle : _availableHandles) {
delete dbHandle;
delete _objects[dbHandle];
_objects[dbHandle] = nullptr;
}
for (auto dbHandle : _inUseHandles) {
delete dbHandle;
delete _objects[dbHandle];
_objects[dbHandle] = nullptr;
}
}

SQLite& SQLitePool::getBase() {
return _baseDB;
}

SQLite& SQLitePool::get() {
size_t SQLitePool::getIndex(bool createHandle) {
while (true) {
unique_lock<mutex> lock(_sync);
if (_availableHandles.size()) {
// Return an existing handle.
auto frontIt = _availableHandles.begin();
SQLite* db = *frontIt;
_inUseHandles.insert(db);
size_t index = *frontIt;
_inUseHandles.insert(index);
_availableHandles.erase(frontIt);
SINFO("Returning existing DB handle");
return *db;
return index;
} else if (_availableHandles.size() + _inUseHandles.size() < (_maxDBs - 1)) {
// Create a new handle.
SQLite* db = new SQLite(_baseDB);
_inUseHandles.insert(db);
SINFO("Returning new DB handle: " << (_availableHandles.size() + _inUseHandles.size()));
return *db;
size_t index = _availableHandles.size() + _inUseHandles.size();
_inUseHandles.insert(index);

// Create a new handle unless we're not supposed to. We unlock here as we're no longer in a position to
// change which indices are in use.
lock.unlock();
if (createHandle) {
initializeIndex(index);
}
SINFO("Returning new DB handle: " << index);
return index;
} else {
// Wait for a handle.
SINFO("Waiting for DB handle");
Expand All @@ -56,23 +66,34 @@ SQLite& SQLitePool::get() {
}
}

void SQLitePool::returnToPool(SQLite& object) {
SQLite& SQLitePool::initializeIndex(size_t index) {
// We don't lock here on purpose. Because these indexes are handed out individually, no two threads should have the
// same one, and thus they should independently be able to update the addresses in this vector, as neither will
// change the allocation of the vector itself.
// It's an error to run `initializeIndex` in two threads on the same index at the same time.
if (_objects[index] == nullptr) {
_objects[index] = new SQLite(_baseDB);
}
return *_objects[index];
}

void SQLitePool::returnToPool(size_t index) {
{
lock_guard<mutex> lock(_sync);
_availableHandles.insert(&object);
_inUseHandles.erase(&object);
_availableHandles.insert(index);
_inUseHandles.erase(index);
SINFO("DB handle returned to pool.");
}
_wait.notify_one();
}

SQLiteScopedHandle::SQLiteScopedHandle(SQLitePool& pool, SQLite& db) : _pool(pool), _db(db)
SQLiteScopedHandle::SQLiteScopedHandle(SQLitePool& pool, size_t index) : _pool(pool), _index(index)
{}

SQLiteScopedHandle::~SQLiteScopedHandle() {
_pool.returnToPool(_db);
_pool.returnToPool(_index);
}

SQLite& SQLiteScopedHandle::db() {
return _db;
return _pool.initializeIndex(_index);
}
27 changes: 19 additions & 8 deletions sqlitecluster/SQLitePool.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,19 @@ class SQLitePool {
// threads, both threads may hold the same DB handle.
SQLite& getBase();

// Get any object except the base. Will wait for an available handle if there are already maxDBs.
SQLite& get();
// Gets an index into the internal data structure for a handle that is marked as "inUse". If there are too many
// "inUse" handles (maxDBs), this will wait until one is available.
// If `createHandle` is true, and all existent handles are in use, but there is space for more handles, this will
// create a new one and return it's index.
// However, if `creteHandle` is false, this will *not* create the handle, but just reserve the index, and allow the
// handle to be created later with `initializeIndex` on this slot.
size_t getIndex(bool createHandle = true);

// Takes an allocated index and creates the appropriate DB handle if required.
SQLite& initializeIndex(size_t index);

// Return an object to the pool.
void returnToPool(SQLite& object);
void returnToPool(size_t index);

private:
// Synchronization variables.
Expand All @@ -31,18 +39,21 @@ class SQLitePool {
// Our base object that all others are based upon.
SQLite _baseDB;

// Pointers to every other object we create.
set<SQLite*> _availableHandles;
set<SQLite*> _inUseHandles;
// These are indexes into `_objects`.
set<size_t> _availableHandles;
set<size_t> _inUseHandles;

// This is a vector of pointers to all possibly allocated objects.
vector<SQLite*> _objects;
};

class SQLiteScopedHandle {
public:
SQLiteScopedHandle(SQLitePool& pool, SQLite& db);
SQLiteScopedHandle(SQLitePool& pool, size_t index);
~SQLiteScopedHandle();
SQLite& db();

private:
SQLitePool& _pool;
SQLite& _db;
size_t _index;
};

0 comments on commit 6fbdda1

Please sign in to comment.