Skip to content

Commit

Permalink
Revert "Revert "Stop ands reschedule commands that block checkpoints.""
Browse files Browse the repository at this point in the history
This reverts commit f1c3758.
  • Loading branch information
coleaeason committed Apr 3, 2020
1 parent 0f8e351 commit 1eb41be
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 30 deletions.
45 changes: 27 additions & 18 deletions BedrockCore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,15 @@ bool BedrockCore::isTimedOut(unique_ptr<BedrockCommand>& command) {
return false;
}

bool BedrockCore::peekCommand(unique_ptr<BedrockCommand>& command) {
BedrockCore::RESULT BedrockCore::peekCommand(unique_ptr<BedrockCommand>& command) {
AutoTimer timer(command, BedrockCommand::PEEK);
// Convenience references to commonly used properties.
const SData& request = command->request;
SData& response = command->response;
STable& content = command->jsonContent;

// We catch any exception and handle in `_handleCommandException`.
RESULT returnValue = RESULT::COMPLETE;
try {
SDEBUG("Peeking at '" << request.methodLine << "' with priority: " << command->priority);
uint64_t timeout = _getRemainingTime(command);
Expand All @@ -92,13 +93,11 @@ bool BedrockCore::peekCommand(unique_ptr<BedrockCommand>& command) {
bool completed = command->peek(_db);
SDEBUG("Plugin '" << command->getName() << "' peeked command '" << request.methodLine << "'");

// Peeking is over now, allow writes
_db.read("PRAGMA query_only = false;");

if (!completed) {
SINFO("Command '" << request.methodLine << "' not finished in peek, re-queuing.");
_db.resetTiming();
return false;
_db.read("PRAGMA query_only = false;");
return RESULT::SHOULD_PROCESS;
}

} catch (const SQLite::timeout_error& e) {
Expand Down Expand Up @@ -131,36 +130,36 @@ bool BedrockCore::peekCommand(unique_ptr<BedrockCommand>& command) {
}
} catch (const SException& e) {
command->repeek = false;
_db.resetTiming();
_db.read("PRAGMA query_only = false;");
_handleCommandException(command, e);
} catch (const SHTTPSManager::NotLeading& e) {
command->repeek = false;
_db.rollback();
_db.read("PRAGMA query_only = false;");
_db.resetTiming();
returnValue = RESULT::SHOULD_PROCESS;
SINFO("Command '" << request.methodLine << "' wants to make HTTPS request, queuing for processing.");
return false;
} catch (const SQLite::checkpoint_required_error& e) {
command->repeek = false;
returnValue = RESULT::ABANDONED_FOR_CHECKPOINT;
SINFO("[checkpoint] Command " << command->request.methodLine << " abandoned (peek) for checkpoint");
} catch (...) {
command->repeek = false;
_db.resetTiming();
_db.read("PRAGMA query_only = false;");
SALERT("Unhandled exception typename: " << SGetCurrentExceptionName() << ", command: " << request.methodLine);
command->response.methodLine = "500 Unhandled Exception";
}

// If we get here, it means the command is fully completed.
command->complete = true;
// Unless an exception handler set this to something different, the command is complete.
command->complete = returnValue == RESULT::COMPLETE;

// Back out of the current transaction, it doesn't need to do anything.
_db.rollback();
_db.resetTiming();

// Reset, we can write now.
_db.read("PRAGMA query_only = false;");

// Done.
return true;
return returnValue;
}

bool BedrockCore::processCommand(unique_ptr<BedrockCommand>& command) {
BedrockCore::RESULT BedrockCore::processCommand(unique_ptr<BedrockCommand>& command) {
AutoTimer timer(command, BedrockCommand::PROCESS);

// Convenience references to commonly used properties.
Expand Down Expand Up @@ -200,6 +199,8 @@ bool BedrockCore::processCommand(unique_ptr<BedrockCommand>& command) {
SALERT("Command " << command->request.methodLine << " timed out after " << e.time()/1000 << "ms.");
}
STHROW("555 Timeout processing command");
} catch (const SQLite::checkpoint_required_error& e) {
SINFO("[checkpoint] Command " << command->request.methodLine << " abandoned (process) for checkpoint");
}
}

Expand Down Expand Up @@ -233,6 +234,13 @@ bool BedrockCore::processCommand(unique_ptr<BedrockCommand>& command) {
_handleCommandException(command, e);
_db.rollback();
needsCommit = false;
} catch (const SQLite::checkpoint_required_error& e) {
_db.rollback();
_db.setUpdateNoopMode(false);
_db.resetTiming();
command->complete = false;
SINFO("[checkpoint] Command " << command->request.methodLine << " abandoned (process) for checkpoint");
return RESULT::ABANDONED_FOR_CHECKPOINT;
} catch(...) {
SALERT("Unhandled exception typename: " << SGetCurrentExceptionName() << ", command: " << request.methodLine);
command->response.methodLine = "500 Unhandled Exception";
Expand All @@ -248,7 +256,8 @@ bool BedrockCore::processCommand(unique_ptr<BedrockCommand>& command) {

// Done, return whether or not we need the parent to commit our transaction.
command->complete = !needsCommit;
return needsCommit;

return needsCommit ? RESULT::NEEDS_COMMIT : RESULT::NO_COMMIT_REQUIRED;
}

void BedrockCore::_handleCommandException(unique_ptr<BedrockCommand>& command, const SException& e) {
Expand Down
14 changes: 12 additions & 2 deletions BedrockCore.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ class BedrockCore : public SQLiteCore {
public:
BedrockCore(SQLite& db, const BedrockServer& server);

// Possible result values for `peekCommand` and `processCommand`
enum class RESULT {
INVALID = 0,
COMPLETE = 1,
SHOULD_PROCESS = 2,
NEEDS_COMMIT = 3,
NO_COMMIT_REQUIRED = 4,
ABANDONED_FOR_CHECKPOINT = 5
};

// Automatic timing class that records an entry corresponding to its lifespan.
class AutoTimer {
public:
Expand All @@ -33,7 +43,7 @@ class BedrockCore : public SQLiteCore {
// should be considered an error to modify the DB from inside `peek`.
// Returns a boolean value of `true` if the command is complete and its `response` field can be returned to the
// caller. Returns `false` if the command will need to be passed to `process` to complete handling the command.
bool peekCommand(unique_ptr<BedrockCommand>& command);
RESULT peekCommand(unique_ptr<BedrockCommand>& command);

// Process is the follow-up to `peek` if `peek` was insufficient to handle the command. It will only ever be called
// on the leader node, and should always be able to resolve the command completely. When a command is passed to
Expand All @@ -45,7 +55,7 @@ class BedrockCore : public SQLiteCore {
// replicate the transaction to follower nodes. Upon being returned `true`, the caller will attempt to perform a
// `COMMIT` and replicate the transaction to follower nodes. It's allowable for this `COMMIT` to fail, in which case
// this command *will be passed to process again in the future to retry*.
bool processCommand(unique_ptr<BedrockCommand>& command);
RESULT processCommand(unique_ptr<BedrockCommand>& command);

private:
// When called in the context of handling an exception, returns the demangled (if possible) name of the exception.
Expand Down
47 changes: 40 additions & 7 deletions BedrockServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,8 @@ void BedrockServer::sync(const SData& args,
// risk duplicating that request. If your command creates an HTTPS request, it needs to explicitly
// re-verify that any checks made in peek are still valid in process.
if (!command->httpsRequests.size()) {
if (core.peekCommand(command)) {
BedrockCore::RESULT result = core.peekCommand(command);
if (result == BedrockCore::RESULT::COMPLETE) {

// Finished with this.
server._syncThreadCommitMutex.unlock();
Expand All @@ -552,6 +553,15 @@ void BedrockServer::sync(const SData& args,
server._reply(command);
}
continue;
} else if (result == BedrockCore::RESULT::SHOULD_PROCESS) {
// This is sort of the "default" case after checking if this command was complete above. If so,
// we'll fall through to calling processCommand below.
} else if (result == BedrockCore::RESULT::ABANDONED_FOR_CHECKPOINT) {
SINFO("[checkpoint] Re-queuing abandoned command (from peek) in sync thread");
server._commandQueue.push(move(command));
continue;
} else {
SERROR("peekCommand (" << command->request.getVerb() << ") returned invalid result code: " << (int)result);
}

// If we just started a new HTTPS request, save it for later.
Expand All @@ -565,7 +575,8 @@ void BedrockServer::sync(const SData& args,
}
}

if (core.processCommand(command)) {
BedrockCore::RESULT result = core.processCommand(command);
if (result == BedrockCore::RESULT::NEEDS_COMMIT) {
// The processor says we need to commit this, so let's start that process.
committingCommand = true;
SINFO("[performance] Sync thread beginning committing command " << command->request.methodLine);
Expand All @@ -583,7 +594,7 @@ void BedrockServer::sync(const SData& args,

// Don't unlock _syncThreadCommitMutex here, we'll hold the lock till the commit completes.
continue;
} else {
} else if (result == BedrockCore::RESULT::NO_COMMIT_REQUIRED) {
// Otherwise, the command doesn't need a commit (maybe it was an error, or it didn't have any work
// to do). We'll just respond.
server._syncThreadCommitMutex.unlock();
Expand All @@ -592,6 +603,12 @@ void BedrockServer::sync(const SData& args,
} else {
server._reply(command);
}
} else if (result == BedrockCore::RESULT::ABANDONED_FOR_CHECKPOINT) {
SINFO("[checkpoint] Re-queuing abandoned command (from process) in sync thread");
server._commandQueue.push(move(command));
continue;
} else {
SERROR("processCommand (" << command->request.getVerb() << ") returned invalid result code: " << (int)result);
}
} else if (nodeState == SQLiteNode::FOLLOWING) {
// If we're following, we just escalate directly to leader without peeking. We can only get an incomplete
Expand Down Expand Up @@ -902,13 +919,19 @@ void BedrockServer::worker(const SData& args,
// command has specifically asked for that.
// If peek succeeds, then it's finished, and all we need to do is respond to the command at the bottom.
bool calledPeek = false;
bool peekResult = false;
BedrockCore::RESULT peekResult = BedrockCore::RESULT::INVALID;
if (command->repeek || !command->httpsRequests.size()) {
peekResult = core.peekCommand(command);
calledPeek = true;
}

if (!calledPeek || !peekResult) {
// This drops us back to the top of the loop.
if (peekResult == BedrockCore::RESULT::ABANDONED_FOR_CHECKPOINT) {
SINFO("[checkpoint] Re-trying abandoned command (from peek) in worker thread");
continue;
}

if (!calledPeek || peekResult == BedrockCore::RESULT::SHOULD_PROCESS) {
// We've just unsuccessfully peeked a command, which means we're in a state where we might want to
// write it. We'll flag that here, to keep the node from falling out of LEADING/STANDINGDOWN
// until we're finished with this command.
Expand Down Expand Up @@ -966,7 +989,8 @@ void BedrockServer::worker(const SData& args,
}

// In this case, there's nothing blocking us from processing this in a worker, so let's try it.
if (core.processCommand(command)) {
BedrockCore::RESULT result = core.processCommand(command);
if (result == BedrockCore::RESULT::NEEDS_COMMIT) {
// If processCommand returned true, then we need to do a commit. Otherwise, the command is
// done, and we just need to respond. Before we commit, we need to grab the sync thread
// lock. Because the sync thread grabs an exclusive lock on this wrapping any transactions
Expand Down Expand Up @@ -1021,11 +1045,20 @@ void BedrockServer::worker(const SData& args,
SINFO("Conflict or state change committing " << command->request.methodLine
<< " on worker thread with " << retry << " retries remaining.");
}
} else if (result == BedrockCore::RESULT::ABANDONED_FOR_CHECKPOINT) {
SINFO("[checkpoint] Re-trying abandoned command (from process) in worker thread");
// Add a retry so that we don't hit out conflict limit for checkpoints.
++retry;
} else if (result == BedrockCore::RESULT::NO_COMMIT_REQUIRED) {
// Nothing to do in this case, `command->complete` will be set and we'll finish as we fall out
// of this block.
} else {
SERROR("processCommand (" << command->request.getVerb() << ") returned invalid result code: " << (int)result);
}
}

// If the command was completed above, then we'll go ahead and respond. Otherwise there must have been
// a conflict, and we'll retry.
// a conflict or the command was abandoned for a checkpoint, and we'll retry.
if (command->complete) {
if (command->initiatingPeerID) {
// Escalated command. Send it back to the peer.
Expand Down
25 changes: 25 additions & 0 deletions sqlitecluster/SQLite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ SQLite::SQLite(const string& filename, int cacheSize, bool enableFullCheckpoints
_enableRewrite(false),
_currentlyRunningRewritten(false),
_timeoutLimit(0),
_abandonForCheckpoint(false),
_autoRolledBack(false),
_noopUpdateMode(false),
_enableFullCheckpoints(enableFullCheckpoints),
Expand Down Expand Up @@ -222,6 +223,10 @@ int SQLite::_progressHandlerCallback(void* arg) {

// Return non-zero causes sqlite to interrupt the operation.
return 1;
} else if (sqlite->_sharedData->_checkpointThreadBusy.load()) {
SINFO("[checkpoint] Abandoning transaction to unblock checkpoint");
sqlite->_abandonForCheckpoint = true;
return 2;
}
return 0;
}
Expand Down Expand Up @@ -543,10 +548,29 @@ bool SQLite::read(const string& query, SQResult& result) {
_queryCache.emplace(make_pair(query, result));
}
_checkTiming("timeout in SQLite::read"s);
_checkAbandon();
_readElapsed += STimeNow() - before;
return queryResult;
}

void SQLite::_checkAbandon() {
if (_abandonForCheckpoint) {
// Timing out inside a write operation will automatically roll back the current transaction. We need to be
// aware as to whether or not this has happened.
// If autocommit is turned on, it means we're not inside an explicit `BEGIN` block, indicating that the
// transaction has been rolled back.
// see: http://www.sqlite.org/c3ref/get_autocommit.html
if (sqlite3_get_autocommit(_db)) {
SHMMM("Transaction automatically rolled back. Setting _autoRolledBack = true");
_autoRolledBack = true;
}

// Reset this and throw the appropriate exception.
_abandonForCheckpoint = false;
throw checkpoint_required_error();
}
}

void SQLite::_checkTiming(const string& error) {
if (_timeoutLimit) {
uint64_t now = STimeNow();
Expand Down Expand Up @@ -624,6 +648,7 @@ bool SQLite::_writeIdempotent(const string& query, bool alwaysKeepQueries) {
result = !SQuery(_db, "read/write transaction", query);
}
_checkTiming("timeout in SQLite::write"s);
_checkAbandon();
_writeElapsed += STimeNow() - before;
if (!result) {
return false;
Expand Down
11 changes: 11 additions & 0 deletions sqlitecluster/SQLite.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ class SQLite {
uint64_t _time;
};

class checkpoint_required_error : public exception {
public :
checkpoint_required_error() {};
virtual ~checkpoint_required_error() {};
const char* what() const noexcept { return "checkpoint_required"; }
};

// This publicly exposes our core mutex, allowing other classes to perform extra operations around commits and
// such, when they determine that those operations must be made atomically with operations happening in SQLite.
// This can be locked with the SQLITE_COMMIT_AUTOLOCK macro, as well.
Expand Down Expand Up @@ -387,10 +394,14 @@ class SQLite {
uint64_t _timeoutLimit;
uint64_t _timeoutStart;
uint64_t _timeoutError;
bool _abandonForCheckpoint;

// Check the timing of the current query and throw if the limit's exceeded.
void _checkTiming(const string& error);

// Check if we need to abandon a query for a checkpoint.
void _checkAbandon();

// Called internally by _sqliteAuthorizerCallback to authorize columns for a query.
int _authorize(int actionCode, const char* detail1, const char* detail2, const char* detail3, const char* detail4);

Expand Down
2 changes: 1 addition & 1 deletion test/clustertest/testplugin/TestPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ void TestPluginCommand::process(SQLite& db) {
nextID++;
}
query += ";";
db.read(query, result);
db.write(query);
}
} else if (SStartsWith(request.methodLine, "exceptioninprocess")) {
throw 2;
Expand Down
4 changes: 2 additions & 2 deletions test/lib/BedrockTester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ vector<SData> BedrockTester::executeWaitMultipleData(vector<SData> requests, int

// This continues until there are no more requests to process.
bool timedOut = false;
int timeoutAutoRetries = 1;
int timeoutAutoRetries = 0;
size_t myIndex = 0;
SData myRequest;
while (true) {
Expand Down Expand Up @@ -360,7 +360,7 @@ vector<SData> BedrockTester::executeWaitMultipleData(vector<SData> requests, int
}

// Reset this for the next request that might need it.
timeoutAutoRetries = 3;
timeoutAutoRetries = 0;
}
timedOut = false;

Expand Down

0 comments on commit 1eb41be

Please sign in to comment.