Skip to content

Commit

Permalink
Minor fixes (#192)
Browse files Browse the repository at this point in the history
* Fix flaky point-to-point test

* Remove unused piece of code

* Re-order fields to remove struct padding

* Fix missing initializers for primitive members

* fix possible fd leak

* Pass by const reference where it's more efficient

* More unused variables

* copy->move

* find ":" -> ':'

* Use defaulted instead of empty constructors

* Executor con/destructor fixup

* PTP broker raciness fixes

* DummyExecutor short sleep to make it much less likely to fail under TSan
  • Loading branch information
eigenraven authored Dec 6, 2021
1 parent 4eef572 commit 08e4876
Show file tree
Hide file tree
Showing 25 changed files with 53 additions and 57 deletions.
2 changes: 1 addition & 1 deletion include/faabric/scheduler/MpiWorld.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ class MpiWorld

// Track ranks that are local to this world, and local/remote leaders
// MPITOPTP - this information exists in the broker
int localLeader;
int localLeader = -1;
std::vector<int> localRanks;
std::vector<int> remoteLeaders;
void initLocalRemoteLeaders();
Expand Down
6 changes: 3 additions & 3 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ class ExecutorTask
bool needsSnapshotPushIn,
bool skipResetIn);

int messageIndex = 0;
std::shared_ptr<faabric::BatchExecuteRequest> req;
std::shared_ptr<std::atomic<int>> batchCounter;
int messageIndex = 0;
bool needsSnapshotPush = false;
bool skipReset = false;
};
Expand All @@ -49,7 +49,7 @@ class Executor

explicit Executor(faabric::Message& msg);

virtual ~Executor();
virtual ~Executor() = default;

void executeTasks(std::vector<int> msgIdxs,
std::shared_ptr<faabric::BatchExecuteRequest> req);
Expand Down Expand Up @@ -222,7 +222,7 @@ class Scheduler

// ---- Host resources and hosts ----
faabric::HostResources thisHostResources;
std::atomic<int32_t> thisHostUsedSlots;
std::atomic<int32_t> thisHostUsedSlots = 0;

void updateHostResources();

Expand Down
2 changes: 1 addition & 1 deletion include/faabric/snapshot/SnapshotRegistry.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace faabric::snapshot {
class SnapshotRegistry
{
public:
SnapshotRegistry();
SnapshotRegistry() = default;

faabric::util::SnapshotData& getSnapshot(const std::string& key);

Expand Down
2 changes: 1 addition & 1 deletion include/faabric/snapshot/SnapshotServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ class SnapshotServer final : public faabric::transport::MessageEndpointServer

private:
faabric::transport::PointToPointBroker& broker;
std::atomic_size_t diffsAppliedCounter;
std::atomic_size_t diffsAppliedCounter = 0;
};
}
2 changes: 1 addition & 1 deletion include/faabric/state/InMemoryStateRegistry.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace faabric::state {
class InMemoryStateRegistry
{
public:
InMemoryStateRegistry();
InMemoryStateRegistry() = default;

std::string getMasterIP(const std::string& user,
const std::string& key,
Expand Down
9 changes: 5 additions & 4 deletions include/faabric/transport/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ namespace faabric::transport {
class Message
{
public:
Message(const zmq::message_t& msgIn);
explicit Message(const zmq::message_t& msgIn);

Message(int sizeIn);
explicit Message(int sizeIn);

Message();
// Empty message signals shutdown
Message() = default;

char* data();

Expand All @@ -31,6 +32,6 @@ class Message
private:
std::vector<uint8_t> bytes;

bool _more;
bool _more = false;
};
}
5 changes: 3 additions & 2 deletions include/faabric/transport/PointToPointBroker.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <faabric/util/locks.h>
#include <faabric/util/scheduling.h>

#include <atomic>
#include <condition_variable>
#include <queue>
#include <set>
Expand Down Expand Up @@ -69,7 +70,7 @@ class PointToPointGroup
int groupId = 0;
int groupSize = 0;

std::mutex mx;
std::shared_mutex mx;

// Transport
faabric::transport::PointToPointBroker& ptpBroker;
Expand All @@ -80,7 +81,7 @@ class PointToPointGroup

// Distributed lock
std::stack<int> recursiveLockOwners;
int lockOwnerIdx = -1;
std::atomic<int> lockOwnerIdx = -1;
std::queue<int> lockWaiters;

void notifyLocked(int groupIdx);
Expand Down
2 changes: 1 addition & 1 deletion include/faabric/util/func.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace faabric::util {
std::string funcToString(const faabric::Message& msg, bool includeId);

std::string funcToString(
const std::shared_ptr<faabric::BatchExecuteRequest> req);
const std::shared_ptr<faabric::BatchExecuteRequest>& req);

unsigned int setMessageId(faabric::Message& msg);

Expand Down
4 changes: 2 additions & 2 deletions include/faabric/util/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ enum SnapshotMergeOperation
class SnapshotDiff
{
public:
const uint8_t* data = nullptr;
size_t size = 0;
SnapshotDataType dataType = SnapshotDataType::Raw;
SnapshotMergeOperation operation = SnapshotMergeOperation::Overwrite;
uint32_t offset = 0;
size_t size = 0;
const uint8_t* data = nullptr;

bool noChange = false;

Expand Down
8 changes: 3 additions & 5 deletions src/scheduler/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ ExecutorTask::ExecutorTask(int messageIndexIn,
std::shared_ptr<std::atomic<int>> batchCounterIn,
bool needsSnapshotPushIn,
bool skipResetIn)
: messageIndex(messageIndexIn)
, req(reqIn)
, batchCounter(batchCounterIn)
: req(std::move(reqIn))
, batchCounter(std::move(batchCounterIn))
, messageIndex(messageIndexIn)
, needsSnapshotPush(needsSnapshotPushIn)
, skipReset(skipResetIn)
{}
Expand All @@ -53,8 +53,6 @@ Executor::Executor(faabric::Message& msg)
}
}

Executor::~Executor() {}

void Executor::finish()
{
SPDLOG_DEBUG("Executor {} shutting down", id);
Expand Down
2 changes: 1 addition & 1 deletion src/scheduler/MpiMessageBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ int MpiMessageBuffer::size()

void MpiMessageBuffer::addMessage(PendingAsyncMpiMessage msg)
{
pendingMsgs.push_back(msg);
pendingMsgs.emplace_back(std::move(msg));
}

void MpiMessageBuffer::deleteMessage(const MpiMessageIterator& msgIt)
Expand Down
2 changes: 0 additions & 2 deletions src/snapshot/SnapshotRegistry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
#include <sys/mman.h>

namespace faabric::snapshot {
SnapshotRegistry::SnapshotRegistry() {}

faabric::util::SnapshotData& SnapshotRegistry::getSnapshot(
const std::string& key)
{
Expand Down
2 changes: 0 additions & 2 deletions src/state/InMemoryStateRegistry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ InMemoryStateRegistry& getInMemoryStateRegistry()
return reg;
}

InMemoryStateRegistry::InMemoryStateRegistry() {}

static std::string getMasterKey(const std::string& user, const std::string& key)
{
std::string masterKey = MASTER_KEY_PREFIX + user + "_" + key;
Expand Down
3 changes: 0 additions & 3 deletions src/transport/Message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ Message::Message(int sizeIn)
, _more(false)
{}

// Empty message signals shutdown
Message::Message() {}

char* Message::data()
{
return reinterpret_cast<char*>(bytes.data());
Expand Down
23 changes: 14 additions & 9 deletions src/transport/PointToPointBroker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,18 @@ void PointToPointGroup::lock(int groupIdx, bool recursive)
if (masterIsLocal) {
bool acquiredLock = false;
{
faabric::util::UniqueLock lock(mx);
faabric::util::FullLock lock(mx);

if (recursive && (recursiveLockOwners.empty() ||
recursiveLockOwners.top() == groupIdx)) {
// Recursive and either free, or already locked by this idx
recursiveLockOwners.push(groupIdx);
acquiredLock = true;
} else if (!recursive && (lockOwnerIdx == NO_LOCK_OWNER_IDX)) {
} else if (!recursive &&
(lockOwnerIdx.load(std::memory_order_acquire) ==
NO_LOCK_OWNER_IDX)) {
// Non-recursive and free
lockOwnerIdx = groupIdx;
lockOwnerIdx.store(groupIdx, std::memory_order_release);
acquiredLock = true;
}
}
Expand All @@ -183,7 +185,7 @@ void PointToPointGroup::lock(int groupIdx, bool recursive)
notifyLocked(groupIdx);
} else {
{
faabric::util::UniqueLock lock(mx);
faabric::util::FullLock lock(mx);
// Need to wait to get the lock
lockWaiters.push(groupIdx);
}
Expand Down Expand Up @@ -249,7 +251,7 @@ void PointToPointGroup::unlock(int groupIdx, bool recursive)
ptpBroker.getHostForReceiver(groupId, POINT_TO_POINT_MASTER_IDX);

if (host == conf.endpointHost) {
faabric::util::UniqueLock lock(mx);
faabric::util::FullLock lock(mx);

SPDLOG_TRACE("Group idx {} unlocking {} ({} waiters, recursive {})",
groupIdx,
Expand All @@ -270,12 +272,14 @@ void PointToPointGroup::unlock(int groupIdx, bool recursive)
lockWaiters.pop();
}
} else {
lockOwnerIdx = NO_LOCK_OWNER_IDX;

if (!lockWaiters.empty()) {
lockOwnerIdx = lockWaiters.front();
lockOwnerIdx.store(lockWaiters.front(),
std::memory_order_release);
notifyLocked(lockWaiters.front());
lockWaiters.pop();
} else {
lockOwnerIdx.store(NO_LOCK_OWNER_IDX,
std::memory_order_release);
}
}
} else {
Expand Down Expand Up @@ -362,14 +366,15 @@ void PointToPointGroup::notify(int groupIdx)
int PointToPointGroup::getLockOwner(bool recursive)
{
if (recursive) {
faabric::util::SharedLock lock(mx);
if (!recursiveLockOwners.empty()) {
return recursiveLockOwners.top();
}

return NO_LOCK_OWNER_IDX;
}

return lockOwnerIdx;
return lockOwnerIdx.load(std::memory_order_acquire);
}

PointToPointBroker::PointToPointBroker()
Expand Down
2 changes: 1 addition & 1 deletion src/util/func.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ std::string funcToString(const faabric::Message& msg, bool includeId)
}

std::string funcToString(
const std::shared_ptr<faabric::BatchExecuteRequest> req)
const std::shared_ptr<faabric::BatchExecuteRequest>& req)
{
return funcToString(req->messages(0), false);
}
Expand Down
4 changes: 2 additions & 2 deletions src/util/json.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ std::map<std::string, std::string> getStringStringMapFromJson(
std::string(valuePtr, valuePtr + it->value.GetStringLength()));
std::string keyVal;
while (std::getline(ss, keyVal, ',')) {
auto pos = keyVal.find(":");
auto pos = keyVal.find(':');
std::string key = keyVal.substr(0, pos);
map[key] = keyVal.erase(0, pos + sizeof(char));
}
Expand All @@ -350,7 +350,7 @@ std::map<std::string, int> getStringIntMapFromJson(Document& doc,
std::string(valuePtr, valuePtr + it->value.GetStringLength()));
std::string keyVal;
while (std::getline(ss, keyVal, ',')) {
auto pos = keyVal.find(":");
auto pos = keyVal.find(':');
std::string key = keyVal.substr(0, pos);
int val = std::stoi(keyVal.erase(0, pos + sizeof(char)));
map[key] = val;
Expand Down
1 change: 1 addition & 0 deletions src/util/memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ void resetDirtyTracking()
size_t nWritten = fwrite(value, sizeof(char), 1, fd);
if (nWritten != 1) {
SPDLOG_ERROR("Failed to write to clear_refs ({})", nWritten);
fclose(fd);
throw std::runtime_error("Failed to write to clear_refs");
}

Expand Down
2 changes: 1 addition & 1 deletion tests/dist/fixtures.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class DistTestsFixture
faabric::scheduler::setExecutorFactory(fac);
}

~DistTestsFixture() {}
~DistTestsFixture() = default;

std::string getWorkerIP()
{
Expand Down
4 changes: 1 addition & 3 deletions tests/test/endpoint/test_endpoint_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ class EndpointApiTestExecutor final : public Executor
{
faabric::Message& msg = reqOrig->mutable_messages()->at(msgIdx);

std::string funcStr = faabric::util::funcToString(msg, true);

int returnVal = 0;
if (msg.function() == "valid") {
msg.set_outputdata(
Expand Down Expand Up @@ -76,7 +74,7 @@ class EndpointApiTestFixture : public SchedulerTestFixture
setExecutorFactory(executorFactory);
}

~EndpointApiTestFixture() {}
~EndpointApiTestFixture() = default;

protected:
std::shared_ptr<EndpointApiTestExecutorFactory> executorFactory;
Expand Down
3 changes: 1 addition & 2 deletions tests/test/redis/test_redis.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ TEST_CASE("Basic Redis operations", "[redis]")
SECTION("Test enqueue/ dequeue bytes")
{
// Enqueue some values
std::vector<uint8_t> bytesA = faabric::util::stringToBytes(VALUE_A);
redisQueue.enqueueBytes(QUEUE_NAME, BYTES_A);
redisQueue.enqueueBytes(QUEUE_NAME, BYTES_B);
redisQueue.enqueueBytes(QUEUE_NAME, BYTES_C);
Expand Down Expand Up @@ -662,7 +661,7 @@ TEST_CASE("Test range set pipeline")

void checkDequeueBytes(Redis& redis,
const std::string& queueName,
const std::vector<uint8_t> expected)
const std::vector<uint8_t>& expected)
{
unsigned long bufferLen = expected.size();
auto buffer = std::vector<uint8_t>(bufferLen, 0);
Expand Down
6 changes: 0 additions & 6 deletions tests/test/state/test_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,6 @@ class StateServerTestFixture
std::vector<uint8_t> actual(values.size(), 0);
setDummyData(values);

// Get, with optional pull
int nMessages = 1;
if (doPull) {
nMessages = 2;
}

// Initial pull
const std::shared_ptr<state::StateKeyValue>& localKv = getLocalKv();
localKv->pull();
Expand Down
2 changes: 1 addition & 1 deletion tests/test/transport/test_point_to_point.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,9 @@ TEST_CASE_METHOD(PointToPointClientServerFixture,
// shared integer
std::thread t([this, &decision, &sharedInt] {
SLEEP_MS(1000);
broker.setUpLocalMappingsFromSchedulingDecision(decision);

sharedInt.fetch_add(100);
broker.setUpLocalMappingsFromSchedulingDecision(decision);
});

broker.waitForMappingsOnThisHost(groupId);
Expand Down
5 changes: 2 additions & 3 deletions tests/test/util/test_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@ namespace tests {
class SnapshotMergeTestFixture : public SnapshotTestFixture
{
public:
SnapshotMergeTestFixture() {}
~SnapshotMergeTestFixture() {}
SnapshotMergeTestFixture() = default;
~SnapshotMergeTestFixture() = default;

protected:
std::string snapKey;
int snapPages;
faabric::util::SnapshotData snap;

uint8_t* setUpSnapshot(int snapPages, int sharedMemPages)
Expand Down
Loading

0 comments on commit 08e4876

Please sign in to comment.