Skip to content

Commit

Permalink
Merge pull request #1193 from Expensify/main
Browse files Browse the repository at this point in the history
Update expensify_prod branch
  • Loading branch information
bondydaa authored Feb 14, 2022
2 parents 6d0f1c7 + 01b6ea6 commit 8afc697
Show file tree
Hide file tree
Showing 22 changed files with 200 additions and 44 deletions.
7 changes: 6 additions & 1 deletion BedrockCommand.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ void BedrockCommand::finalizeTimingInfo() {
{"peekTime", peekTotal},
{"processTime", processTotal},
{"totalTime", totalTime},
{"escalationTime", escalationTimeUS},
{"unaccountedTime", unaccountedTime},
};

Expand Down Expand Up @@ -232,6 +231,12 @@ void BedrockCommand::finalizeTimingInfo() {
response[p.first] = to_string(p.second);
}
}

// TODO: Remove when "escalate over HTTP" is enabled all the time, this is here to support only old-style
// escalations.
if (escalationTimeUS && !response.isSet("escalationTime")) {
response["escalationTime"] = to_string(escalationTimeUS);
}
}

void BedrockCommand::prePoll(fd_map& fdm)
Expand Down
67 changes: 53 additions & 14 deletions BedrockServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,10 @@ void BedrockServer::sync()
args["-peerList"], args.calc("-priority"), firstTimeout,
_version, args.test("-parallelReplication")));

// This should be empty anyway, but let's make sure.
if (_completedCommands.size()) {
SWARN("_completedCommands not empty at startup of sync thread.");
}
// Control port and not command port so that followers can still escalate to leader when leader's command port is
// closed.
// TODO: Pass this in the constructor and make it const?
_syncNode->setCommandAddress(args["-controlPort"]);

// The node is now coming up, and should eventually end up in a `LEADING` or `FOLLOWING` state. We can start adding
// our worker threads now. We don't wait until the node is `LEADING` or `FOLLOWING`, as it's state can change while
Expand Down Expand Up @@ -1016,11 +1016,32 @@ void BedrockServer::worker(int threadId)
// Roll back the transaction, it'll get re-run in the sync thread.
core.rollback();

// We're not handling a writable command anymore.
SINFO("Sending non-parallel command " << command->request.methodLine
<< " to sync thread. Sync thread has " << _syncNodeQueuedCommands.size()
<< " queued commands.");
_syncNodeQueuedCommands.push(move(command));
// TODO: When escalation over HTTP is totally vetted, remove this "else" block and just always
// use the "if" case.
if (_escalateOverHTTP) {
if (state == SQLiteNode::LEADING) {
SINFO("Sending non-parallel command " << command->request.methodLine
<< " to sync thread. Sync thread has " << _syncNodeQueuedCommands.size() << " queued commands.");
_syncNodeQueuedCommands.push(move(command));
} else if (state == SQLiteNode::STANDINGDOWN) {
SINFO("Need to process command " << command->request.methodLine << " but STANDINGDOWN, moving to _standDownQueue.");
_standDownQueue.push(move(command));
} else if (_clusterMessenger.sendToLeader(*command)) {
SINFO("Escalating " << command->request.methodLine << " to leader.");
waitForHTTPS(move(command));
} else {
// TODO: Something less naive that considers how these failures happen rather than a simple
// endless loop of requeue and retry.
SWARN("Couldn't escalate command " << command->request.methodLine << " to leader. We are in state: " << STCPNode::stateName(state));
_commandQueue.push(move(command));
}
} else {
// We're not handling a writable command anymore.
SINFO("Sending non-parallel command " << command->request.methodLine
<< " to sync thread. Sync thread has " << _syncNodeQueuedCommands.size()
<< " queued commands.");
_syncNodeQueuedCommands.push(move(command));
}

// Done with this command, look for the next one.
break;
Expand Down Expand Up @@ -1230,19 +1251,24 @@ void BedrockServer::_resetServer() {
}
}

BedrockServer::BedrockServer(SQLiteNode::State state, const SData& args_) : SQLiteServer(), args(args_), _replicationState(SQLiteNode::LEADING)
BedrockServer::BedrockServer(SQLiteNode::State state, const SData& args_)
: SQLiteServer(), args(args_), _replicationState(SQLiteNode::LEADING),
_syncNode(nullptr), _clusterMessenger(_syncNode)
{}

BedrockServer::BedrockServer(const SData& args_)
: SQLiteServer(), shutdownWhileDetached(false), args(args_), _requestCount(0), _replicationState(SQLiteNode::SEARCHING),
_upgradeInProgress(false), _suppressCommandPort(false), _suppressCommandPortManualOverride(false),
_syncThreadComplete(false), _syncNode(nullptr), _shutdownState(RUNNING),
_syncThreadComplete(false), _syncNode(nullptr), _clusterMessenger(_syncNode), _shutdownState(RUNNING),
_multiWriteEnabled(args.test("-enableMultiWrite")), _shouldBackup(false), _detach(args.isSet("-bootstrap")),
_controlPort(nullptr), _commandPort(nullptr), _maxConflictRetries(3), _lastQuorumCommandTime(STimeNow()),
_pluginsDetached(false), _lastChance(0), _socketThreadNumber(0), _outstandingSocketThreads(0)
_pluginsDetached(false), _lastChance(0), _socketThreadNumber(0), _outstandingSocketThreads(0),
_escalateOverHTTP(args.test("-escalateOverHTTP"))
{
_version = VERSION;

SINFO("Escalate over HTTP: " << (_escalateOverHTTP ? "enabled" : "disabled"));

// Enable the requested plugins, and update our version string if required.
list<string> pluginNameList = SParseList(args["-plugins"]);
SINFO("Loading plugins: " << args["-plugins"]);
Expand Down Expand Up @@ -1797,7 +1823,8 @@ bool BedrockServer::_isControlCommand(const unique_ptr<BedrockCommand>& command)
SIEquals(command->request.methodLine, "Attach") ||
SIEquals(command->request.methodLine, "SetConflictParams") ||
SIEquals(command->request.methodLine, "SetCheckpointIntervals") ||
SIEquals(command->request.methodLine, "EnableSQLTracing")
SIEquals(command->request.methodLine, "EnableSQLTracing") ||
SIEquals(command->request.methodLine, "EnableEscalateOverHTTP")
) {
return true;
}
Expand Down Expand Up @@ -1864,6 +1891,17 @@ void BedrockServer::_control(unique_ptr<BedrockCommand>& command) {
SQLite::enableTrace.store(command->request.test("enable"));
response["newValue"] = SQLite::enableTrace ? "true" : "false";
}
} else if (SIEquals(command->request.methodLine, "EnableEscalateOverHTTP")) {
if (command->request.isSet("enable")) {
bool oldValue = _escalateOverHTTP;
_escalateOverHTTP = command->request.test("enable");
if (_escalateOverHTTP == oldValue) {
command->response.methodLine = "200 No Change";
} else {
SINFO("Escalate over HTTP: " << (_escalateOverHTTP ? "enabled" : "disabled"));
command->response.methodLine = "200 "s + (_escalateOverHTTP ? "Enabled" : "Disabled");
}
}
}
}

Expand Down Expand Up @@ -1907,7 +1945,8 @@ void BedrockServer::_postPollCommands(fd_map& fdm, uint64_t nextActivity) {
// If it finished all it's requests, put it back in the main queue.
if (command->areHttpsRequestsComplete()) {
SINFO("All HTTPS requests complete, returning to main queue.");
// Because set's contain only `const` data, they can't be moved-from without these weird `extract`

// Because sets contain only `const` data, they can't be moved-from without these weird `extract`
// semantics. This invalidates our iterator, so we save the one we want before we break it.
auto nextIt = next(it);
_commandQueue.push(move(_outstandingHTTPSCommands.extract(it).value()));
Expand Down
8 changes: 8 additions & 0 deletions BedrockServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <libstuff/libstuff.h>
#include <sqlitecluster/SQLiteNode.h>
#include <sqlitecluster/SQLiteServer.h>
#include <sqlitecluster/SQLiteClusterMessenger.h>
#include "BedrockPlugin.h"
#include "BedrockCommandQueue.h"
#include "BedrockTimeoutCommandQueue.h"
Expand Down Expand Up @@ -330,6 +331,10 @@ class BedrockServer : public SQLiteServer {
// object.
shared_ptr<SQLiteNode> _syncNode;

// SStandaloneHTTPSManager for communication between SQLiteNodes for anything other than cluster state and
// synchronization.
SQLiteClusterMessenger _clusterMessenger;

// Functions for checking for and responding to status and control commands.
bool _isStatusCommand(const unique_ptr<BedrockCommand>& command);
void _status(unique_ptr<BedrockCommand>& command);
Expand Down Expand Up @@ -466,4 +471,7 @@ class BedrockServer : public SQLiteServer {
// syncNode while the sync thread exists, it's a shared pointer to allow for the last socket thread using it to
// destroy the pool at shutdown.
shared_ptr<SQLitePool> _dbPool;

// TODO: Remove once we've verified this all works.
atomic<bool> _escalateOverHTTP = false;
};
2 changes: 1 addition & 1 deletion libstuff/SHTTPSManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ void SStandaloneHTTPSManager::postPoll(fd_map& fdm, SStandaloneHTTPSManager::Tra
// content. Why this is the what constitutes a valid response is lost to time. Any well-formed response should
// be valid here, and this should get cleaned up. However, this requires testing anything that might rely on
// the existing behavior, which is an exercise for later.
if (SContains(transaction.fullResponse.methodLine, " 200 ") || transaction.fullResponse.content.size()) {
if (handleAllResponses() || SContains(transaction.fullResponse.methodLine, " 200 ") || transaction.fullResponse.content.size()) {
// Pass the transaction down to the subclass.
_onRecv(&transaction);
} else {
Expand Down
5 changes: 4 additions & 1 deletion libstuff/SHTTPSManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ class SStandaloneHTTPSManager : public STCPManager {
Transaction* _httpsSend(const string& url, const SData& request);
Transaction* _createErrorTransaction();
virtual bool _onRecv(Transaction* transaction);

// Historically we only call _onRecv for `200 OK` responses. This allows manangers to handle all responses.
virtual bool handleAllResponses() { return false; }
};

class SHTTPSManager : public SStandaloneHTTPSManager {
Expand All @@ -72,7 +75,7 @@ class SHTTPSManager : public SStandaloneHTTPSManager {
}
};

void validate();
virtual void validate() override;

protected:
// Reference to the plugin that owns this object.
Expand Down
3 changes: 3 additions & 0 deletions libstuff/STCPNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,9 @@ STable STCPNode::Peer::getData() const {
for (auto& p : params) {
result.emplace(p);
}

result["commandAddress"] = commandAddress;

return result;
}

Expand Down
3 changes: 3 additions & 0 deletions libstuff/STCPNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ struct STCPNode : public STCPManager {
atomic<Response> transactionResponse;
atomic<string> version;

// An address on which this peer can accept commands.
atomic<string> commandAddress;

// Constructor.
Peer(const string& name_, const string& host_, const STable& params_, uint64_t id_);
~Peer();
Expand Down
71 changes: 71 additions & 0 deletions sqlitecluster/SQLiteClusterMessenger.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#include <BedrockCommand.h>
#include <sqlitecluster/SQLiteClusterMessenger.h>
#include <sqlitecluster/SQLiteNode.h>

SQLiteClusterMessenger::SQLiteClusterMessenger(shared_ptr<SQLiteNode>& node)
: _node(node)
{
}

bool SQLiteClusterMessenger::sendToLeader(BedrockCommand& command) {
string leaderAddress;
auto _nodeCopy = atomic_load(&_node);
if (_nodeCopy) {
for (SQLiteNode::Peer* peer : _nodeCopy->peerList) {
if (peer->state == STCPNode::LEADING && !peer->commandAddress.load().empty()) {
leaderAddress = peer->commandAddress;
break;
}
}
}

// SParseURI expects a typical http or https scheme.
string url = "http://" + leaderAddress;
string host, path;
if (!SParseURI(url, host, path) || !SHostIsValid(host)) {
return false;
}

// Create a new transaction. This can throw if `validate` fails. We explicitly do this *before* creating a socket.
Transaction* transaction = new Transaction(*this);

// I don't trust this not to ever leak currently, but for the moment, this is OK.
_transactionCommands[transaction] = make_pair(&command, STimeNow());

Socket* s = nullptr;
try {
s = new Socket(host, nullptr);
} catch (const SException& exception) {
_transactionCommands.erase(transaction);
delete transaction;
return false;
}

transaction->s = s;
transaction->fullRequest = command.request.serialize();

command.httpsRequests.push_back(transaction);

// Ship it.
transaction->s->send(command.request.serialize());

return true;
}

bool SQLiteClusterMessenger::_onRecv(Transaction* transaction)
{
transaction->response = getHTTPResponseCode(transaction->fullResponse.methodLine);
auto cmdIt = _transactionCommands.find(transaction);
if (cmdIt != _transactionCommands.end()) {
BedrockCommand* command = cmdIt->second.first;
command->response = transaction->fullResponse;
command->response["escalationTime"] = to_string(STimeNow() - cmdIt->second.second);
command->complete = true;
_transactionCommands.erase(cmdIt);
}
return false;
}

bool SQLiteClusterMessenger::handleAllResponses() {
return true;
}
20 changes: 20 additions & 0 deletions sqlitecluster/SQLiteClusterMessenger.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#include <libstuff/libstuff.h>
#include <libstuff/SHTTPSManager.h>

class SQLiteNode;
class BedrockCommand;

class SQLiteClusterMessenger : public SStandaloneHTTPSManager {
public:
SQLiteClusterMessenger(shared_ptr<SQLiteNode>& node);
bool sendToLeader(BedrockCommand& command);
virtual bool _onRecv(Transaction* transaction) override;

virtual bool handleAllResponses() override;

private:
shared_ptr<SQLiteNode>& _node;

// Map of transactions to their commands and escalation start times
map<Transaction*, pair<BedrockCommand*, uint64_t>> _transactionCommands;
};
10 changes: 10 additions & 0 deletions sqlitecluster/SQLiteNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1250,6 +1250,9 @@ void SQLiteNode::_onMESSAGE(Peer* peer, const SData& message) {
if (!message.isSet("Hash")) {
STHROW("missing Hash");
}
if (message.isSet("commandAddress")) {
peer->commandAddress = message["commandAddress"];
}

peer->setCommit(message.calcU64("CommitCount"), message["Hash"]);

Expand Down Expand Up @@ -1962,6 +1965,7 @@ void SQLiteNode::_sendToPeer(Peer* peer, const SData& message) {
SData messageCopy = message;
messageCopy["CommitCount"] = to_string(_db.getCommitCount());
messageCopy["Hash"] = _db.getCommittedHash();
messageCopy["commandAddress"] = _commandAddress;
peer->socket->send(messageCopy.serialize());
}

Expand All @@ -1974,6 +1978,7 @@ void SQLiteNode::_sendToAllPeers(const SData& message, bool subscribedOnly) {
if (!messageCopy.isSet("Hash")) {
messageCopy["Hash"] = _db.getCommittedHash();
}
messageCopy["commandAddress"] = _commandAddress;
const string& serializedMessage = messageCopy.serialize();

// Loop across all connected peers and send the message
Expand Down Expand Up @@ -2805,3 +2810,8 @@ bool SQLiteNode::hasQuorum() {
}
return (numFullFollowers * 2 >= numFullPeers);
}

void SQLiteNode::setCommandAddress(const string& commandAddress) {
_commandAddress = commandAddress;
}

6 changes: 6 additions & 0 deletions sqlitecluster/SQLiteNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ class SQLiteNode : public STCPNode {
// This will broadcast a message to all peers, or a specific peer.
void broadcast(const SData& message, Peer* peer = nullptr);

void setCommandAddress(const string& commandAddress);

private:
// STCPNode API: Peer handling framework functions
void _onConnect(Peer* peer);
Expand Down Expand Up @@ -275,4 +277,8 @@ class SQLiteNode : public STCPNode {
AutoTimer _legacyReplication;
AutoTimer _onMessageTimer;
AutoTimer _escalateTimer;

// A string representing an address (i.e., `127.0.0.1:80`) where this server accepts commands. I.e., "the command
// port".
atomic<string> _commandAddress;
};
4 changes: 2 additions & 2 deletions test/clustertest/testplugin/TestPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,11 @@ TestPluginCommand::~TestPluginCommand()
// check simply that we're not leading, because this should also fail if we end up in some weird state (we
// don't want the test to pass if our follower is actually `WAITING` or something strange).
if (serverState != SQLiteNode::stateName(SQLiteNode::LEADING)) {
SASSERT(escalated);
// SASSERT(escalated); TODO: Remove this flag.
string fileContents = fileLockAndLoad(request["tempFile"]);
SFileDelete(request["tempFile"]);

// Verifiy this all happened in the right order. We're running this on the follower, but it's feasible the
// Verify this all happened in the right order. We're running this on the follower, but it's feasible the
// destructor on the leader hasn't happened yet. We verify everything up to the first destruction.
if (!SStartsWith(fileContents, "Peeking testescalate (FOLLOWING)\n"
"Peeking testescalate (LEADING)\n"
Expand Down
2 changes: 1 addition & 1 deletion test/clustertest/tests/ControlCommandTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

struct ControlCommandTest : tpunit::TestFixture {
ControlCommandTest()
: tpunit::TestFixture("ControlCommandTest",
: tpunit::TestFixture("ControlCommand",
BEFORE_CLASS(ControlCommandTest::setup),
AFTER_CLASS(ControlCommandTest::teardown),
TEST(ControlCommandTest::testPreventAttach)) { }
Expand Down
2 changes: 1 addition & 1 deletion test/clustertest/tests/EscalateTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#include <test/clustertest/BedrockClusterTester.h>

struct EscalateTest : tpunit::TestFixture {
EscalateTest() : tpunit::TestFixture("EscalateTest", TEST(EscalateTest::test)) { }
EscalateTest() : tpunit::TestFixture("Escalate", TEST(EscalateTest::test)) { }

// NOTE: This test relies on two processes (the leader and follower Bedrock nodes) both writing to the same temp
// file at potentially the same time. It's not impossible that these two writes step on each other and this test
Expand Down
Loading

0 comments on commit 8afc697

Please sign in to comment.