In a parallel environment, the element can be modified during collecting snapshot. Hence, we need to scrutinize that the acquired snapshot is clean or not(or dirty).
Here are the basic idea of scan()
.
oldCopy = collect() while(true) { newCopy = collect() if (oldCopy == newCopy) return oldCopy // TODO :: return WAIT-FREE snapshot, which means you have to escape this loop in bounded-time }
It will be done with N repetitions by checking moved[i]
. It records which threads have been observed to move in the scan()
.
To record that the other thread changed the value,
// In update() a_table[threadId].stamp++
This will make the line 4
of scan()
, oldCopy == newCopy
not hold.
moved[] = {false, } oldCopy = collect() while(true) { newCopy = collect() if (oldCopy == newCopy) return oldCopy for (j) { if (oldCopy[j] != newCopy[j]) { if (moved[j]) return oldCopy.snap else { moved[j] = true oldCopy = newCopy continue while } } } }
We check moved[j]
is true and if so, just return the collected snap. This always returns clean snapshot because we can guarantee that the second change means the snap taken was not interleaved.
We use naive c pointers to record or process the arrays to avoid memory leaking.
volatile int status; enum status_t {PENDING,RUNNING,END}; std::mutex lk; // this should be used only for evaluating performance
status
is declared as volatile int
to prevent the following codes being optimized.
void Manager::run(int tid) { int localUpdate = 0; while(status==PENDING); // spinlock while(status==RUNNING) { snapshot.update(tid, localUpdate); ++localUpdate; } { std::unique_lock lock(lk); totalUpdate += localUpdate; } }
The status will be changed at here.
for (std::size_t i=0; i < threadCount; i++) threads[i] = std::thread(&Manager::run, this, i); status = RUNNING; __sync_synchronize(); std::this_thread::sleep_for(std::chrono::seconds(RUNNING_TIME)); __sync_synchronize(); status = END;
CPU - Intel(R) Xeon(R) CPU E5-2697 v2 @ 2.70GHz (24 core)
RAM - 256GB
OS - Ubuntu 16. 04. 3 LTS