Skip to content

Commit d25c14b

Browse files
authored
Snapshots overhaul (#176)
* Start on changing snapshot sizes * Resizing * Up to failing test * Tests for memory utils * Fixing up tests * Test for expanding and remapping snapshots * Formatting * Checking for uninitialised fds * Test for pushing snapshots * Test fixes, formatting * Add target to dev.sanitise * Half-way through refactor * Small typos etc. * Continued refactor * Compiling * Fixing up tests * Fix tests * Formatting * Fixing up dist tests * Fix failing asan test * Small tidy-up * Typos * PR comments WIP * Remove vector/ span duplicates * More tests * Adding private and shared snapshot mappings * Compilation fixes * Sketching out memory view class * Follow through with MemoryView refactor * Fixing tests * Fixing tests * Tidying up * Move merging logic into SnapshotData * Master writing back snapshot changes locally * Failing distributed reduction test * Formatting and tests * Adding repeats into dist test * Fix up distributed test * Tidy-up * Override CPU count in dist tests * Fix race condition in master snapshot * Added locking log statement * Fix scheduler test * Fix straggler error in tests * Immutable snapshot diffs with ownership * Fix up broken tests * Fix compile error in dist tests * Test fix-up * Formatting * Fix dist tests * Rename _data param * PR comments
1 parent 77b5e89 commit d25c14b

34 files changed

+2180
-993
lines changed

dist-test/dev_server.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ THIS_DIR=$(dirname $(readlink -f $0))
55
PROJ_ROOT=${THIS_DIR}/..
66
pushd ${PROJ_ROOT} > /dev/null
77

8+
export OVERRIDE_CPU_COUNT=4
9+
810
if [[ -z "$1" ]]; then
911
docker-compose up -d dist-test-server
1012
elif [[ "$1" == "restart" ]]; then

dist-test/run.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ pushd ${PROJ_ROOT} >> /dev/null
66
export CONAN_CACHE_MOUNT_SOURCE=$HOME/.conan/
77
RETURN_VAL=0
88

9+
export OVERRIDE_CPU_COUNT=4
10+
911
# Run the test server in the background
1012
docker-compose \
1113
up \

docker-compose.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ services:
2020
- LOG_LEVEL=debug
2121
- REDIS_STATE_HOST=redis
2222
- REDIS_QUEUE_HOST=redis
23+
- OVERRIDE_CPU_COUNT=${OVERRIDE_CPU_COUNT:-0}
24+
- ASAN_OPTIONS=verbosity=1:halt_on_error=1
25+
- TSAN_OPTIONS=halt_on_error=1:suppressions=/code/faabric/thread-sanitizer-ignorelist.txt:history_size=7:second_deadlock_stack=1
26+
- UBSAN_OPTIONS="print_stacktrace=1:halt_on_error=1
2327
depends_on:
2428
- redis
2529

@@ -34,6 +38,7 @@ services:
3438
- LOG_LEVEL=debug
3539
- REDIS_STATE_HOST=redis
3640
- REDIS_QUEUE_HOST=redis
41+
- OVERRIDE_CPU_COUNT=${OVERRIDE_CPU_COUNT:-0}
3742
command: ./bin/faabric_dist_test_server
3843
depends_on:
3944
- redis

include/faabric/scheduler/Scheduler.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,13 @@ class ExecutorTask
3232
ExecutorTask(int messageIndexIn,
3333
std::shared_ptr<faabric::BatchExecuteRequest> reqIn,
3434
std::shared_ptr<std::atomic<int>> batchCounterIn,
35-
bool needsSnapshotPushIn,
35+
bool needsSnapshotSyncIn,
3636
bool skipResetIn);
3737

3838
std::shared_ptr<faabric::BatchExecuteRequest> req;
3939
std::shared_ptr<std::atomic<int>> batchCounter;
4040
int messageIndex = 0;
41-
bool needsSnapshotPush = false;
41+
bool needsSnapshotSync = false;
4242
bool skipReset = false;
4343
};
4444

@@ -69,7 +69,7 @@ class Executor
6969

7070
void releaseClaim();
7171

72-
virtual faabric::util::SnapshotData snapshot();
72+
virtual faabric::util::MemoryView getMemoryView();
7373

7474
protected:
7575
virtual void restore(faabric::Message& msg);

include/faabric/snapshot/SnapshotClient.h

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ namespace faabric::snapshot {
1212
// Mocking
1313
// -----------------------------------
1414

15-
std::vector<std::pair<std::string, faabric::util::SnapshotData>>
15+
std::vector<
16+
std::pair<std::string, std::shared_ptr<faabric::util::SnapshotData>>>
1617
getSnapshotPushes();
1718

1819
std::vector<std::pair<std::string, std::vector<faabric::util::SnapshotDiff>>>
@@ -35,12 +36,12 @@ class SnapshotClient final : public faabric::transport::MessageEndpointClient
3536
explicit SnapshotClient(const std::string& hostIn);
3637

3738
void pushSnapshot(const std::string& key,
38-
int32_t groupId,
39-
const faabric::util::SnapshotData& data);
39+
std::shared_ptr<faabric::util::SnapshotData> data);
4040

41-
void pushSnapshotDiffs(std::string snapshotKey,
42-
int32_t groupId,
43-
std::vector<faabric::util::SnapshotDiff> diffs);
41+
void pushSnapshotDiffs(
42+
std::string snapshotKey,
43+
bool force,
44+
const std::vector<faabric::util::SnapshotDiff>& diffs);
4445

4546
void deleteSnapshot(const std::string& key);
4647

include/faabric/snapshot/SnapshotRegistry.h

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,12 @@ class SnapshotRegistry
2222

2323
void mapSnapshot(const std::string& key, uint8_t* target);
2424

25-
void takeSnapshot(const std::string& key,
26-
faabric::util::SnapshotData data,
27-
bool locallyRestorable = true);
25+
void registerSnapshot(const std::string& key,
26+
std::shared_ptr<faabric::util::SnapshotData> data);
2827

29-
void takeSnapshotIfNotExists(const std::string& key,
30-
faabric::util::SnapshotData data,
31-
bool locallyRestorable = true);
28+
void registerSnapshotIfNotExists(
29+
const std::string& key,
30+
std::shared_ptr<faabric::util::SnapshotData> data);
3231

3332
void deleteSnapshot(const std::string& key);
3433

@@ -46,10 +45,9 @@ class SnapshotRegistry
4645
int writeSnapshotToFd(const std::string& key,
4746
faabric::util::SnapshotData& data);
4847

49-
void doTakeSnapshot(const std::string& key,
50-
faabric::util::SnapshotData data,
51-
bool locallyRestorable,
52-
bool overwrite);
48+
void doRegisterSnapshot(const std::string& key,
49+
std::shared_ptr<faabric::util::SnapshotData> data,
50+
bool overwrite);
5351
};
5452

5553
SnapshotRegistry& getSnapshotRegistry();

include/faabric/snapshot/SnapshotServer.h

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,6 @@ class SnapshotServer final : public faabric::transport::MessageEndpointServer
1414
public:
1515
SnapshotServer();
1616

17-
// Returns how many diffs have been applied since started, useful for
18-
// testing
19-
size_t diffsApplied() const;
20-
2117
protected:
2218
void doAsyncRecv(int header,
2319
const uint8_t* buffer,
@@ -40,6 +36,5 @@ class SnapshotServer final : public faabric::transport::MessageEndpointServer
4036

4137
private:
4238
faabric::transport::PointToPointBroker& broker;
43-
std::atomic_size_t diffsAppliedCounter = 0;
4439
};
4540
}

include/faabric/util/bytes.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,18 @@ int safeCopyToBuffer(const uint8_t* dataIn,
2626
int bufferLen);
2727

2828
template<class T>
29-
T unalignedRead(const std::byte* bytes)
29+
T unalignedRead(const uint8_t* bytes)
3030
{
3131
T value;
32-
std::copy_n(bytes, sizeof(T), reinterpret_cast<std::byte*>(&value));
32+
std::copy_n(bytes, sizeof(T), reinterpret_cast<uint8_t*>(&value));
3333
return value;
3434
}
3535

3636
template<class T>
37-
void unalignedWrite(const T& value, std::byte* destination)
37+
void unalignedWrite(const T& value, uint8_t* destination)
3838
{
3939
std::copy_n(
40-
reinterpret_cast<const std::byte*>(&value), sizeof(T), destination);
40+
reinterpret_cast<const uint8_t*>(&value), sizeof(T), destination);
4141
}
4242

4343
template<class T>

include/faabric/util/memory.h

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
#pragma once
22

33
#include <cstdint>
4+
#include <functional>
5+
#include <memory>
6+
#include <span>
7+
#include <string>
48
#include <unistd.h>
59
#include <vector>
610

@@ -22,7 +26,7 @@ struct AlignedChunk
2226

2327
static const long HOST_PAGE_SIZE = sysconf(_SC_PAGESIZE);
2428

25-
bool isPageAligned(void* ptr);
29+
bool isPageAligned(const void* ptr);
2630

2731
size_t getRequiredHostPages(size_t nBytes);
2832

@@ -41,4 +45,27 @@ std::vector<int> getDirtyPageNumbers(const uint8_t* ptr, int nPages);
4145

4246
std::vector<std::pair<uint32_t, uint32_t>> getDirtyRegions(const uint8_t* ptr,
4347
int nPages);
48+
49+
// -------------------------
50+
// Allocation
51+
// -------------------------
52+
typedef std::unique_ptr<uint8_t[], std::function<void(uint8_t*)>> MemoryRegion;
53+
54+
MemoryRegion allocateSharedMemory(size_t size);
55+
56+
MemoryRegion allocateVirtualMemory(size_t size);
57+
58+
void claimVirtualMemory(std::span<uint8_t> region);
59+
60+
void mapMemoryPrivate(std::span<uint8_t> target, int fd);
61+
62+
void mapMemoryShared(std::span<uint8_t> target, int fd);
63+
64+
void resizeFd(int fd, size_t size);
65+
66+
void writeToFd(int fd, off_t offset, std::span<const uint8_t> data);
67+
68+
int createFd(size_t size, const std::string& fdLabel);
69+
70+
void appendDataToFd(int fd, std::span<uint8_t> data);
4471
}

include/faabric/util/snapshot.h

Lines changed: 93 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@
33
#include <map>
44
#include <memory>
55
#include <mutex>
6+
#include <shared_mutex>
7+
#include <span>
68
#include <string>
79
#include <vector>
810

911
#include <faabric/util/logging.h>
1012
#include <faabric/util/macros.h>
13+
#include <faabric/util/memory.h>
1114

1215
namespace faabric::util {
1316

@@ -30,28 +33,28 @@ enum SnapshotMergeOperation
3033
class SnapshotDiff
3134
{
3235
public:
33-
const uint8_t* data = nullptr;
34-
size_t size = 0;
35-
SnapshotDataType dataType = SnapshotDataType::Raw;
36-
SnapshotMergeOperation operation = SnapshotMergeOperation::Overwrite;
37-
uint32_t offset = 0;
38-
39-
bool noChange = false;
40-
4136
SnapshotDiff() = default;
4237

4338
SnapshotDiff(SnapshotDataType dataTypeIn,
4439
SnapshotMergeOperation operationIn,
4540
uint32_t offsetIn,
46-
const uint8_t* dataIn,
47-
size_t sizeIn)
48-
{
49-
dataType = dataTypeIn;
50-
operation = operationIn;
51-
offset = offsetIn;
52-
data = dataIn;
53-
size = sizeIn;
54-
}
41+
std::span<const uint8_t> dataIn);
42+
43+
SnapshotDataType getDataType() const { return dataType; }
44+
45+
SnapshotMergeOperation getOperation() const { return operation; }
46+
47+
uint32_t getOffset() const { return offset; }
48+
49+
std::span<const uint8_t> getData() const { return data; }
50+
51+
std::vector<uint8_t> getDataCopy() const;
52+
53+
private:
54+
SnapshotDataType dataType = SnapshotDataType::Raw;
55+
SnapshotMergeOperation operation = SnapshotMergeOperation::Overwrite;
56+
uint32_t offset = 0;
57+
std::vector<uint8_t> data;
5558
};
5659

5760
class SnapshotMergeRegion
@@ -73,27 +76,93 @@ class SnapshotMergeRegion
7376
class SnapshotData
7477
{
7578
public:
76-
size_t size = 0;
77-
uint8_t* data = nullptr;
78-
int fd = 0;
79-
8079
SnapshotData() = default;
8180

82-
std::vector<SnapshotDiff> getDirtyPages();
81+
explicit SnapshotData(size_t sizeIn);
82+
83+
SnapshotData(size_t sizeIn, size_t maxSizeIn);
84+
85+
explicit SnapshotData(std::span<const uint8_t> dataIn);
86+
87+
SnapshotData(std::span<const uint8_t> dataIn, size_t maxSizeIn);
88+
89+
SnapshotData(const SnapshotData&) = delete;
90+
91+
SnapshotData& operator=(const SnapshotData&) = delete;
92+
93+
~SnapshotData();
94+
95+
void copyInData(std::span<const uint8_t> buffer, uint32_t offset = 0);
96+
97+
const uint8_t* getDataPtr(uint32_t offset = 0);
98+
99+
std::vector<uint8_t> getDataCopy();
83100

84-
std::vector<SnapshotDiff> getChangeDiffs(const uint8_t* updated,
85-
size_t updatedSize);
101+
std::vector<uint8_t> getDataCopy(uint32_t offset, size_t dataSize);
102+
103+
void mapToMemory(uint8_t* target);
86104

87105
void addMergeRegion(uint32_t offset,
88106
size_t length,
89107
SnapshotDataType dataType,
90108
SnapshotMergeOperation operation,
91109
bool overwrite = false);
92110

111+
void clearMergeRegions();
112+
113+
std::map<uint32_t, SnapshotMergeRegion> getMergeRegions();
114+
115+
size_t getQueuedDiffsCount();
116+
117+
void queueDiffs(std::span<SnapshotDiff> diffs);
118+
119+
void writeQueuedDiffs();
120+
121+
size_t getSize() const { return size; }
122+
123+
size_t getMaxSize() const { return maxSize; }
124+
93125
private:
126+
size_t size = 0;
127+
size_t maxSize = 0;
128+
129+
int fd = -1;
130+
131+
std::shared_mutex snapMx;
132+
133+
MemoryRegion data = nullptr;
134+
135+
std::vector<SnapshotDiff> queuedDiffs;
136+
94137
// Note - we care about the order of this map, as we iterate through it
95138
// in order of offsets
96139
std::map<uint32_t, SnapshotMergeRegion> mergeRegions;
140+
141+
uint8_t* validatedOffsetPtr(uint32_t offset);
142+
143+
void mapToMemory(uint8_t* target, bool shared);
144+
145+
void writeData(std::span<const uint8_t> buffer, uint32_t offset = 0);
146+
};
147+
148+
class MemoryView
149+
{
150+
public:
151+
// Note - this object is just a view of a section of memory, and does not
152+
// own the underlying data
153+
MemoryView() = default;
154+
155+
explicit MemoryView(std::span<const uint8_t> dataIn);
156+
157+
std::vector<SnapshotDiff> getDirtyRegions();
158+
159+
std::vector<SnapshotDiff> diffWithSnapshot(
160+
std::shared_ptr<SnapshotData> snap);
161+
162+
std::span<const uint8_t> getData() { return data; }
163+
164+
private:
165+
std::span<const uint8_t> data;
97166
};
98167

99168
std::string snapshotDataTypeStr(SnapshotDataType dt);

0 commit comments

Comments
 (0)