Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor fixes #192

Merged
merged 13 commits into from
Dec 6, 2021
2 changes: 1 addition & 1 deletion include/faabric/scheduler/MpiWorld.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ class MpiWorld

// Track ranks that are local to this world, and local/remote leaders
// MPITOPTP - this information exists in the broker
int localLeader;
int localLeader = -1;
std::vector<int> localRanks;
std::vector<int> remoteLeaders;
void initLocalRemoteLeaders();
Expand Down
6 changes: 3 additions & 3 deletions include/faabric/scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ class ExecutorTask
bool needsSnapshotPushIn,
bool skipResetIn);

int messageIndex = 0;
std::shared_ptr<faabric::BatchExecuteRequest> req;
std::shared_ptr<std::atomic<int>> batchCounter;
int messageIndex = 0;
bool needsSnapshotPush = false;
bool skipReset = false;
};
Expand All @@ -49,7 +49,7 @@ class Executor

explicit Executor(faabric::Message& msg);

virtual ~Executor();
virtual ~Executor() = default;

void executeTasks(std::vector<int> msgIdxs,
std::shared_ptr<faabric::BatchExecuteRequest> req);
Expand Down Expand Up @@ -222,7 +222,7 @@ class Scheduler

// ---- Host resources and hosts ----
faabric::HostResources thisHostResources;
std::atomic<int32_t> thisHostUsedSlots;
std::atomic<int32_t> thisHostUsedSlots = 0;

void updateHostResources();

Expand Down
2 changes: 1 addition & 1 deletion include/faabric/snapshot/SnapshotRegistry.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace faabric::snapshot {
class SnapshotRegistry
{
public:
SnapshotRegistry();
SnapshotRegistry() = default;

faabric::util::SnapshotData& getSnapshot(const std::string& key);

Expand Down
2 changes: 1 addition & 1 deletion include/faabric/snapshot/SnapshotServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ class SnapshotServer final : public faabric::transport::MessageEndpointServer

private:
faabric::transport::PointToPointBroker& broker;
std::atomic_size_t diffsAppliedCounter;
std::atomic_size_t diffsAppliedCounter = 0;
};
}
2 changes: 1 addition & 1 deletion include/faabric/state/InMemoryStateRegistry.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace faabric::state {
class InMemoryStateRegistry
{
public:
InMemoryStateRegistry();
InMemoryStateRegistry() = default;

std::string getMasterIP(const std::string& user,
const std::string& key,
Expand Down
9 changes: 5 additions & 4 deletions include/faabric/transport/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ namespace faabric::transport {
class Message
{
public:
Message(const zmq::message_t& msgIn);
explicit Message(const zmq::message_t& msgIn);

Message(int sizeIn);
explicit Message(int sizeIn);

Message();
// Empty message signals shutdown
Message() = default;

char* data();

Expand All @@ -31,6 +32,6 @@ class Message
private:
std::vector<uint8_t> bytes;

bool _more;
bool _more = false;
};
}
5 changes: 3 additions & 2 deletions include/faabric/transport/PointToPointBroker.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <faabric/util/locks.h>
#include <faabric/util/scheduling.h>

#include <atomic>
#include <condition_variable>
#include <queue>
#include <set>
Expand Down Expand Up @@ -69,7 +70,7 @@ class PointToPointGroup
int groupId = 0;
int groupSize = 0;

std::mutex mx;
std::shared_mutex mx;

// Transport
faabric::transport::PointToPointBroker& ptpBroker;
Expand All @@ -80,7 +81,7 @@ class PointToPointGroup

// Distributed lock
std::stack<int> recursiveLockOwners;
int lockOwnerIdx = -1;
std::atomic<int> lockOwnerIdx = -1;
std::queue<int> lockWaiters;

void notifyLocked(int groupIdx);
Expand Down
2 changes: 1 addition & 1 deletion include/faabric/util/func.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace faabric::util {
std::string funcToString(const faabric::Message& msg, bool includeId);

std::string funcToString(
const std::shared_ptr<faabric::BatchExecuteRequest> req);
const std::shared_ptr<faabric::BatchExecuteRequest>& req);

unsigned int setMessageId(faabric::Message& msg);

Expand Down
4 changes: 2 additions & 2 deletions include/faabric/util/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ enum SnapshotMergeOperation
class SnapshotDiff
{
public:
const uint8_t* data = nullptr;
size_t size = 0;
SnapshotDataType dataType = SnapshotDataType::Raw;
SnapshotMergeOperation operation = SnapshotMergeOperation::Overwrite;
uint32_t offset = 0;
size_t size = 0;
const uint8_t* data = nullptr;

bool noChange = false;

Expand Down
8 changes: 3 additions & 5 deletions src/scheduler/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ ExecutorTask::ExecutorTask(int messageIndexIn,
std::shared_ptr<std::atomic<int>> batchCounterIn,
bool needsSnapshotPushIn,
bool skipResetIn)
: messageIndex(messageIndexIn)
, req(reqIn)
, batchCounter(batchCounterIn)
: req(std::move(reqIn))
, batchCounter(std::move(batchCounterIn))
, messageIndex(messageIndexIn)
, needsSnapshotPush(needsSnapshotPushIn)
, skipReset(skipResetIn)
{}
Expand All @@ -53,8 +53,6 @@ Executor::Executor(faabric::Message& msg)
}
}

Executor::~Executor() {}

void Executor::finish()
{
SPDLOG_DEBUG("Executor {} shutting down", id);
Expand Down
2 changes: 1 addition & 1 deletion src/scheduler/MpiMessageBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ int MpiMessageBuffer::size()

void MpiMessageBuffer::addMessage(PendingAsyncMpiMessage msg)
{
pendingMsgs.push_back(msg);
pendingMsgs.emplace_back(std::move(msg));
}

void MpiMessageBuffer::deleteMessage(const MpiMessageIterator& msgIt)
Expand Down
2 changes: 0 additions & 2 deletions src/snapshot/SnapshotRegistry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
#include <sys/mman.h>

namespace faabric::snapshot {
SnapshotRegistry::SnapshotRegistry() {}

faabric::util::SnapshotData& SnapshotRegistry::getSnapshot(
const std::string& key)
{
Expand Down
2 changes: 0 additions & 2 deletions src/state/InMemoryStateRegistry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ InMemoryStateRegistry& getInMemoryStateRegistry()
return reg;
}

InMemoryStateRegistry::InMemoryStateRegistry() {}

static std::string getMasterKey(const std::string& user, const std::string& key)
{
std::string masterKey = MASTER_KEY_PREFIX + user + "_" + key;
Expand Down
3 changes: 0 additions & 3 deletions src/transport/Message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ Message::Message(int sizeIn)
, _more(false)
{}

// Empty message signals shutdown
Message::Message() {}

char* Message::data()
{
return reinterpret_cast<char*>(bytes.data());
Expand Down
23 changes: 14 additions & 9 deletions src/transport/PointToPointBroker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,18 @@ void PointToPointGroup::lock(int groupIdx, bool recursive)
if (masterIsLocal) {
bool acquiredLock = false;
{
faabric::util::UniqueLock lock(mx);
faabric::util::FullLock lock(mx);

if (recursive && (recursiveLockOwners.empty() ||
recursiveLockOwners.top() == groupIdx)) {
// Recursive and either free, or already locked by this idx
recursiveLockOwners.push(groupIdx);
acquiredLock = true;
} else if (!recursive && (lockOwnerIdx == NO_LOCK_OWNER_IDX)) {
} else if (!recursive &&
(lockOwnerIdx.load(std::memory_order_acquire) ==
NO_LOCK_OWNER_IDX)) {
// Non-recursive and free
lockOwnerIdx = groupIdx;
lockOwnerIdx.store(groupIdx, std::memory_order_release);
acquiredLock = true;
}
}
Expand All @@ -183,7 +185,7 @@ void PointToPointGroup::lock(int groupIdx, bool recursive)
notifyLocked(groupIdx);
} else {
{
faabric::util::UniqueLock lock(mx);
faabric::util::FullLock lock(mx);
// Need to wait to get the lock
lockWaiters.push(groupIdx);
}
Expand Down Expand Up @@ -249,7 +251,7 @@ void PointToPointGroup::unlock(int groupIdx, bool recursive)
ptpBroker.getHostForReceiver(groupId, POINT_TO_POINT_MASTER_IDX);

if (host == conf.endpointHost) {
faabric::util::UniqueLock lock(mx);
faabric::util::FullLock lock(mx);

SPDLOG_TRACE("Group idx {} unlocking {} ({} waiters, recursive {})",
groupIdx,
Expand All @@ -270,12 +272,14 @@ void PointToPointGroup::unlock(int groupIdx, bool recursive)
lockWaiters.pop();
}
} else {
lockOwnerIdx = NO_LOCK_OWNER_IDX;

if (!lockWaiters.empty()) {
lockOwnerIdx = lockWaiters.front();
lockOwnerIdx.store(lockWaiters.front(),
std::memory_order_release);
notifyLocked(lockWaiters.front());
lockWaiters.pop();
} else {
lockOwnerIdx.store(NO_LOCK_OWNER_IDX,
std::memory_order_release);
}
}
} else {
Expand Down Expand Up @@ -362,14 +366,15 @@ void PointToPointGroup::notify(int groupIdx)
int PointToPointGroup::getLockOwner(bool recursive)
{
if (recursive) {
faabric::util::SharedLock lock(mx);
if (!recursiveLockOwners.empty()) {
return recursiveLockOwners.top();
}

return NO_LOCK_OWNER_IDX;
}

return lockOwnerIdx;
return lockOwnerIdx.load(std::memory_order_acquire);
}

PointToPointBroker::PointToPointBroker()
Expand Down
2 changes: 1 addition & 1 deletion src/util/func.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ std::string funcToString(const faabric::Message& msg, bool includeId)
}

std::string funcToString(
const std::shared_ptr<faabric::BatchExecuteRequest> req)
const std::shared_ptr<faabric::BatchExecuteRequest>& req)
{
return funcToString(req->messages(0), false);
}
Expand Down
4 changes: 2 additions & 2 deletions src/util/json.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ std::map<std::string, std::string> getStringStringMapFromJson(
std::string(valuePtr, valuePtr + it->value.GetStringLength()));
std::string keyVal;
while (std::getline(ss, keyVal, ',')) {
auto pos = keyVal.find(":");
auto pos = keyVal.find(':');
std::string key = keyVal.substr(0, pos);
map[key] = keyVal.erase(0, pos + sizeof(char));
}
Expand All @@ -350,7 +350,7 @@ std::map<std::string, int> getStringIntMapFromJson(Document& doc,
std::string(valuePtr, valuePtr + it->value.GetStringLength()));
std::string keyVal;
while (std::getline(ss, keyVal, ',')) {
auto pos = keyVal.find(":");
auto pos = keyVal.find(':');
std::string key = keyVal.substr(0, pos);
int val = std::stoi(keyVal.erase(0, pos + sizeof(char)));
map[key] = val;
Expand Down
1 change: 1 addition & 0 deletions src/util/memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ void resetDirtyTracking()
size_t nWritten = fwrite(value, sizeof(char), 1, fd);
if (nWritten != 1) {
SPDLOG_ERROR("Failed to write to clear_refs ({})", nWritten);
fclose(fd);
throw std::runtime_error("Failed to write to clear_refs");
}

Expand Down
2 changes: 1 addition & 1 deletion tests/dist/fixtures.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class DistTestsFixture
faabric::scheduler::setExecutorFactory(fac);
}

~DistTestsFixture() {}
~DistTestsFixture() = default;

std::string getWorkerIP()
{
Expand Down
4 changes: 1 addition & 3 deletions tests/test/endpoint/test_endpoint_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ class EndpointApiTestExecutor final : public Executor
{
faabric::Message& msg = reqOrig->mutable_messages()->at(msgIdx);

std::string funcStr = faabric::util::funcToString(msg, true);

int returnVal = 0;
if (msg.function() == "valid") {
msg.set_outputdata(
Expand Down Expand Up @@ -76,7 +74,7 @@ class EndpointApiTestFixture : public SchedulerTestFixture
setExecutorFactory(executorFactory);
}

~EndpointApiTestFixture() {}
~EndpointApiTestFixture() = default;

protected:
std::shared_ptr<EndpointApiTestExecutorFactory> executorFactory;
Expand Down
3 changes: 1 addition & 2 deletions tests/test/redis/test_redis.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ TEST_CASE("Basic Redis operations", "[redis]")
SECTION("Test enqueue/ dequeue bytes")
{
// Enqueue some values
std::vector<uint8_t> bytesA = faabric::util::stringToBytes(VALUE_A);
redisQueue.enqueueBytes(QUEUE_NAME, BYTES_A);
redisQueue.enqueueBytes(QUEUE_NAME, BYTES_B);
redisQueue.enqueueBytes(QUEUE_NAME, BYTES_C);
Expand Down Expand Up @@ -662,7 +661,7 @@ TEST_CASE("Test range set pipeline")

void checkDequeueBytes(Redis& redis,
const std::string& queueName,
const std::vector<uint8_t> expected)
const std::vector<uint8_t>& expected)
{
unsigned long bufferLen = expected.size();
auto buffer = std::vector<uint8_t>(bufferLen, 0);
Expand Down
6 changes: 0 additions & 6 deletions tests/test/state/test_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,6 @@ class StateServerTestFixture
std::vector<uint8_t> actual(values.size(), 0);
setDummyData(values);

// Get, with optional pull
int nMessages = 1;
if (doPull) {
nMessages = 2;
}

// Initial pull
const std::shared_ptr<state::StateKeyValue>& localKv = getLocalKv();
localKv->pull();
Expand Down
2 changes: 1 addition & 1 deletion tests/test/transport/test_point_to_point.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,9 @@ TEST_CASE_METHOD(PointToPointClientServerFixture,
// shared integer
std::thread t([this, &decision, &sharedInt] {
SLEEP_MS(1000);
broker.setUpLocalMappingsFromSchedulingDecision(decision);

sharedInt.fetch_add(100);
broker.setUpLocalMappingsFromSchedulingDecision(decision);
});

broker.waitForMappingsOnThisHost(groupId);
Expand Down
5 changes: 2 additions & 3 deletions tests/test/util/test_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@ namespace tests {
class SnapshotMergeTestFixture : public SnapshotTestFixture
{
public:
SnapshotMergeTestFixture() {}
~SnapshotMergeTestFixture() {}
SnapshotMergeTestFixture() = default;
~SnapshotMergeTestFixture() = default;

protected:
std::string snapKey;
int snapPages;
faabric::util::SnapshotData snap;

uint8_t* setUpSnapshot(int snapPages, int sharedMemPages)
Expand Down
Loading