Skip to content

Merge regions overhaul #201

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Dec 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ project(faabric)

option(FAABRIC_WASM_BUILD "Build Faabric wasm library" OFF)
option(FAABRIC_BUILD_TESTS "Build Faabric tests" ON)
option(FAABRIC_SELF_TRACING "Turn on system tracing using the logger" OFF)

# Enable colorized compiler output
if("${CMAKE_CXX_COMPILER_ID}" STREQUAL "GNU")
Expand All @@ -22,6 +23,11 @@ set(CMAKE_EXE_LINKER_FLAGS "-fuse-ld=lld")
# Compile comamnds for clang tools
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)

if(${FAABRIC_SELF_TRACING})
message("-- Activated Faabric self-tracing")
add_definitions(-DTRACE_ALL=1)
endif()

# Set-up use of sanitisers
if (FAABRIC_USE_SANITISER STREQUAL "Address")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address")
Expand Down
11 changes: 10 additions & 1 deletion include/faabric/snapshot/SnapshotClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,13 @@ class SnapshotClient final : public faabric::transport::MessageEndpointClient
void pushSnapshot(const std::string& key,
std::shared_ptr<faabric::util::SnapshotData> data);

void pushSnapshotUpdate(
std::string snapshotKey,
const std::shared_ptr<faabric::util::SnapshotData>& data,
const std::vector<faabric::util::SnapshotDiff>& diffs);

void pushSnapshotDiffs(
std::string snapshotKey,
bool force,
const std::vector<faabric::util::SnapshotDiff>& diffs);

void deleteSnapshot(const std::string& key);
Expand All @@ -49,5 +53,10 @@ class SnapshotClient final : public faabric::transport::MessageEndpointClient

private:
void sendHeader(faabric::snapshot::SnapshotCalls call);

void doPushSnapshotDiffs(
const std::string& snapshotKey,
const std::shared_ptr<faabric::util::SnapshotData>& data,
const std::vector<faabric::util::SnapshotDiff>& diffs);
};
}
10 changes: 0 additions & 10 deletions include/faabric/snapshot/SnapshotRegistry.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,9 @@ class SnapshotRegistry

bool snapshotExists(const std::string& key);

void mapSnapshot(const std::string& key, uint8_t* target);

void registerSnapshot(const std::string& key,
std::shared_ptr<faabric::util::SnapshotData> data);

void registerSnapshotIfNotExists(
const std::string& key,
std::shared_ptr<faabric::util::SnapshotData> data);

void deleteSnapshot(const std::string& key);

size_t getSnapshotCount();
Expand All @@ -44,10 +38,6 @@ class SnapshotRegistry

int writeSnapshotToFd(const std::string& key,
faabric::util::SnapshotData& data);

void doRegisterSnapshot(const std::string& key,
std::shared_ptr<faabric::util::SnapshotData> data,
bool overwrite);
};

SnapshotRegistry& getSnapshotRegistry();
Expand Down
2 changes: 2 additions & 0 deletions include/faabric/util/memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ std::vector<std::pair<uint32_t, uint32_t>> getDirtyRegions(const uint8_t* ptr,
// -------------------------
typedef std::unique_ptr<uint8_t[], std::function<void(uint8_t*)>> MemoryRegion;

MemoryRegion allocatePrivateMemory(size_t size);

MemoryRegion allocateSharedMemory(size_t size);

MemoryRegion allocateVirtualMemory(size_t size);
Expand Down
115 changes: 107 additions & 8 deletions include/faabric/util/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ namespace faabric::util {
enum SnapshotDataType
{
Raw,
Int
Bool,
Int,
Long,
Float,
Double
};

enum SnapshotMergeOperation
Expand All @@ -27,7 +31,8 @@ enum SnapshotMergeOperation
Product,
Subtract,
Max,
Min
Min,
Ignore
};

class SnapshotDiff
Expand Down Expand Up @@ -65,14 +70,106 @@ class SnapshotMergeRegion
SnapshotDataType dataType = SnapshotDataType::Raw;
SnapshotMergeOperation operation = SnapshotMergeOperation::Overwrite;

SnapshotMergeRegion() = default;

SnapshotMergeRegion(uint32_t offsetIn,
size_t lengthIn,
SnapshotDataType dataTypeIn,
SnapshotMergeOperation operationIn);

void addDiffs(std::vector<SnapshotDiff>& diffs,
const uint8_t* original,
uint32_t originalSize,
const uint8_t* updated,
uint32_t dirtyRegionStart,
uint32_t dirtyRegionEnd);
std::span<const uint8_t> originalData,
std::span<const uint8_t> updatedData,
std::pair<uint32_t, uint32_t> dirtyRange);

private:
void addOverwriteDiff(std::vector<SnapshotDiff>& diffs,
std::span<const uint8_t> original,
std::span<const uint8_t> updated,
std::pair<uint32_t, uint32_t> dirtyRange);
};

template<typename T>
inline bool calculateDiffValue(const uint8_t* original,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not immediate to tell the difference between calculateDiffValue and applyDiffValue, could we add a comment explaining what each one does, or how they differ?

Also, just to double check, can the uint8_t pointers here can't be converted to std::spans?

uint8_t* updated,
SnapshotMergeOperation operation)
{
// Cast to value
T updatedValue = unalignedRead<T>(updated);
T originalValue = unalignedRead<T>(original);

// Skip if no change
if (originalValue == updatedValue) {
return false;
}

// Work out final result
switch (operation) {
case (SnapshotMergeOperation::Sum): {
// Sums must send the value to be _added_, and
// not the final result
updatedValue -= originalValue;
break;
}
case (SnapshotMergeOperation::Subtract): {
// Subtractions must send the value to be
// subtracted, not the result
updatedValue = originalValue - updatedValue;
break;
}
case (SnapshotMergeOperation::Product): {
// Products must send the value to be
// multiplied, not the result
updatedValue /= originalValue;
break;
}
case (SnapshotMergeOperation::Max):
case (SnapshotMergeOperation::Min):
// Min and max don't need to change
break;
default: {
SPDLOG_ERROR("Can't calculate diff for operation: {}", operation);
throw std::runtime_error("Can't calculate diff");
}
}

unalignedWrite<T>(updatedValue, updated);

return true;
}

template<typename T>
inline T applyDiffValue(const uint8_t* original,
const uint8_t* diff,
SnapshotMergeOperation operation)
{

auto diffValue = unalignedRead<T>(diff);
T originalValue = unalignedRead<T>(original);

switch (operation) {
case (SnapshotMergeOperation::Sum): {
return diffValue + originalValue;
}
case (SnapshotMergeOperation::Subtract): {
return originalValue - diffValue;
}
case (SnapshotMergeOperation::Product): {
return originalValue * diffValue;
}
case (SnapshotMergeOperation::Max): {
return std::max<T>(originalValue, diffValue);
}
case (SnapshotMergeOperation::Min): {
return std::min<T>(originalValue, diffValue);
}
default: {
SPDLOG_ERROR("Can't apply merge operation: {}", operation);
throw std::runtime_error("Can't apply merge operation");
}
}
}

class SnapshotData
{
public:
Expand Down Expand Up @@ -100,14 +197,16 @@ class SnapshotData

std::vector<uint8_t> getDataCopy(uint32_t offset, size_t dataSize);

void mapToMemory(uint8_t* target);
void mapToMemory(std::span<uint8_t> target);

void addMergeRegion(uint32_t offset,
size_t length,
SnapshotDataType dataType,
SnapshotMergeOperation operation,
bool overwrite = false);

void fillGapsWithOverwriteRegions();

void clearMergeRegions();

std::map<uint32_t, SnapshotMergeRegion> getMergeRegions();
Expand Down
19 changes: 14 additions & 5 deletions src/flat/faabric.fbs
Original file line number Diff line number Diff line change
@@ -1,24 +1,33 @@
table SnapshotMergeRegionRequest {
offset:int;
length:ulong;
data_type:int;
merge_op:int;
}

table SnapshotPushRequest {
key:string;
maxSize:ulong;
max_size:ulong;
contents:[ubyte];
merge_regions:[SnapshotMergeRegionRequest];
}

table SnapshotDeleteRequest {
key:string;
}

table SnapshotDiffChunk {
table SnapshotDiffRequest {
offset:int;
dataType:int;
mergeOp:int;
data_type:int;
merge_op:int;
data:[ubyte];
}

table SnapshotDiffPushRequest {
key:string;
force:bool;
chunks:[SnapshotDiffChunk];
merge_regions:[SnapshotMergeRegionRequest];
diffs:[SnapshotDiffRequest];
}

table ThreadResultRequest {
Expand Down
4 changes: 4 additions & 0 deletions src/runner/FaabricMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ void FaabricMain::startBackground()
// Crash handler
faabric::util::setUpCrashHandler();

PROF_BEGIN
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same point made in the Faasm PR re. comitting profiling macros. I would personally remove them, but feel free to ignore.


// Start basics
startRunner();

Expand All @@ -39,6 +41,8 @@ void FaabricMain::startBackground()

// Work sharing
startFunctionCallServer();

PROF_SUMMARY
}

void FaabricMain::startRunner()
Expand Down
15 changes: 10 additions & 5 deletions src/scheduler/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,11 +298,15 @@ void Executor::threadPoolThread(int threadPoolIdx)
SPDLOG_TRACE("Diffing memory with pre-execution snapshot for {}",
msg.snapshotkey());

// If we're on master, we write the diffs straight to the snapshot
// otherwise we push them to the master.
// Fill gaps with overwrites
snap->fillGapsWithOverwriteRegions();

// Work out the diffs
std::vector<faabric::util::SnapshotDiff> diffs =
funcMemory.diffWithSnapshot(snap);

// On master we queue the diffs locally directly, on a remote host
// we push them back to master
if (isMaster) {
SPDLOG_DEBUG("Queueing {} diffs for {} to snapshot {} on "
"master (group {})",
Expand All @@ -314,11 +318,12 @@ void Executor::threadPoolThread(int threadPoolIdx)
snap->queueDiffs(diffs);
} else {
sch.pushSnapshotDiffs(msg, diffs);

// Reset dirty page tracking on non-master
faabric::util::resetDirtyTracking();
}

// Reset dirty page tracking
faabric::util::resetDirtyTracking();

// Clear merge regions
SPDLOG_DEBUG("Clearing merge regions for {}", msg.snapshotkey());
snap->clearMergeRegions();
}
Expand Down
4 changes: 2 additions & 2 deletions src/scheduler/Scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ faabric::util::SchedulingDecision Scheduler::doCallFunctions(
std::vector<faabric::util::SnapshotDiff> snapshotDiffs =
snapMemView.getDirtyRegions();

c.pushSnapshotDiffs(snapshotKey, true, snapshotDiffs);
c.pushSnapshotUpdate(snapshotKey, snap, snapshotDiffs);
} else {
c.pushSnapshot(snapshotKey, snap);
pushedSnapshotsMap[snapshotKey].insert(host);
Expand Down Expand Up @@ -916,7 +916,7 @@ void Scheduler::pushSnapshotDiffs(
}

SnapshotClient& c = getSnapshotClient(msg.masterhost());
c.pushSnapshotDiffs(snapKey, false, diffs);
c.pushSnapshotDiffs(snapKey, diffs);
}

void Scheduler::setThreadResultLocally(uint32_t msgId, int32_t returnValue)
Expand Down
Loading