-
Notifications
You must be signed in to change notification settings - Fork 6.2k
parallelize numbuf memcpy and plasma object hash construction #366
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
parallelize numbuf memcpy and plasma object hash construction #366
Conversation
Merged build finished. Test FAILed. |
Test FAILed. |
@@ -44,7 +50,11 @@ class FixedBufferStream : public arrow::io::OutputStream, | |||
DCHECK(position_ + nbytes <= size_) << "position: " << position_ | |||
<< " nbytes: " << nbytes << "size: " << size_; | |||
uint8_t* dst = data_ + position_; | |||
memcpy(dst, data, nbytes); | |||
if (nbytes >= (1<<20)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make this a constant (and for the code that computes the hash too)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do, where's a good place to put it? It's worth defining things like #define MB (1<<20)
and using that for better readability.
src/plasma/plasma_client.cc
Outdated
// Start the prefix thread. | ||
threads.push_back(std::thread( | ||
compute_block_hash, data, prefix, &threadhash[0])); | ||
for (int i = 1; i <= numthreads; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's try to make NUMTHREADS the real # of threads here and have i = 0; for i = 1, ..., NUMTHREADS-2; i = NUMTHREADS-1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pcmoritz , I could do that, but in cases where we get aligned, well-behaved input, we won't even have to start the prefix and suffix threads, making the expected number of actual threads \in [numthreads; numthreads+2] in the general case. Alternatively, I could issue the prefix and suffix memcopy in the main thread, without spawning a thread for it. Would that be better? I felt that, having a guaranteed tight margin on the expected number of threads is sufficient.
@@ -24,5 +24,6 @@ echo "building arrow" | |||
cd $TP_DIR/arrow/cpp | |||
mkdir -p $TP_DIR/arrow/cpp/build | |||
cd $TP_DIR/arrow/cpp/build | |||
cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_FLAGS="-g" -DCMAKE_CXX_FLAGS="-g" -DARROW_BUILD_TESTS=OFF .. | |||
cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_FLAGS="-g -lpthread" -DCMAKE_CXX_FLAGS="-g -lpthread" -DARROW_BUILD_TESTS=OFF .. | |||
make clean |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should consider removing this before merging, since we probably don't want to rebuild arrow normally
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, i'll remove make clean
before merging.
src/plasma/plasma_client.cc
Outdated
const uint64_t numthreads = NUMTHREADS; | ||
uint64_t threadhash[numthreads+2]; | ||
//CHECK(numthreads > 0); | ||
const uint64_t blocksz = 64; // cache block alignment (alternative: page size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not use abbreviations here (block_size, data_begin maybe, data_end or names like this)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's going to be unbelievably verbose. The more verbose things are, the more difficult it is to read code. Using "sz" suffix is standard in the linux kernel. See here for example:
https://github.com/torvalds/linux/blob/5924bbecd0267d87c24110cbe2041b5075173a25/arch/microblaze/include/asm/mmu.h#L101
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, we need to have a consistent style here. We are using extremely few abbreviations in the code right now (basically just abbreviating id for identifier and db for database). I think that will make it more understandable to people. I'm with you on trying to be concise with code, but let's not trade off readability. It will be good if our code is readable by many different people.
src/plasma/plasma_client.cc
Outdated
//CHECK(numthreads > 0); | ||
const uint64_t blocksz = 64; // cache block alignment (alternative: page size) | ||
// Calculate the first and last aligned positions in the data stream. | ||
unsigned char *databp = (unsigned char *)(((uint64_t)data + blocksz-1) & ~(blocksz-1)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm, can you explain how this works? any way to simplify this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is meant to work for blocksz = any power of 2. The only way to simplify is to assume some specific alignment and substitute the constants. Let's assume 64 byte alignment. The first term on the rhs, tips the pointer over the closest aligned memory position. The second term is the negation of the power of 2 alignment minus 1. So it leaves all bits on except the bits that correspond to the bit-width of the alignment (in this case : 6 least significant bits will be zero). Doing a binary AND of the first term with the second term zeros out 6 least significant bits, thus causing the result to be 64-byte aligned to the next 64-byte aligned memory position. If it's already aligned, it stays the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there are a few ways this could be simplified, like doing a shift right, shift left by the alignment bitwidth, but it may get weird on different architectures, if you are not careful. Because the shift can be with/without sign bit, with/without carry. It would also require us to add a variable that derives the alignment bitwidth of the specified block size (i.e. a log base 2 of blocksz).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok cool thanks, that makes sense, let's keep it the way it is and add some comments that describe what you just said
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also have a C unit tests that tests that the parallel memcpy is correct.
Merged build finished. Test FAILed. |
Test FAILed. |
Merged build finished. Test FAILed. |
Test FAILed. |
19c609b
to
a658e50
Compare
Merged build finished. Test PASSed. |
Test PASSed. |
Merged build finished. Test PASSed. |
Test PASSed. |
class FixedBufferStream : public arrow::io::OutputStream, | ||
public arrow::io::ReadableFileInterface { | ||
public: | ||
virtual ~FixedBufferStream() {} | ||
|
||
explicit FixedBufferStream(uint8_t* data, int64_t nbytes) | ||
: data_(data), position_(0), size_(nbytes) {} | ||
: data_(data), position_(0), size_(nbytes), threadpool_(THREADPOOL_SIZE) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be great if you could contribute this code to arrow/io. I opened JIRAs for both the FixedBufferStream and multithreaded memcpy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey Wes,
We're refactoring this into a standalone class. Should be easier to contribute to arrow as well when we're done with it.
Thanks,
Alexey
Merged build finished. Test PASSed. |
Test PASSed. |
memcpy(dst, data, nbytes); | ||
} | ||
memcpy_helper.memcopy(dst, data, nbytes); | ||
// if (nbytes >= BYTES_IN_MB) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's get rid of these comments
src/plasma/plasma_client.cc
Outdated
*hash = XXH64_digest(&hash_state); | ||
} | ||
|
||
inline bool compute_object_hash_parallel(XXH64_state_t *hash_state, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's do the same naming convention here as in ParallelMemcpy
Build finished. Test PASSed. |
Test PASSed. |
Build finished. Test PASSed. |
Test PASSed. |
Build finished. Test PASSed. |
Test PASSed. |
src/plasma/plasma_client.cc
Outdated
const uint64_t numthreads = THREADPOOL_SIZE; | ||
uint64_t threadhash[numthreads + 2]; | ||
const uint64_t block_size = BLOCK_SIZE; | ||
// Calculate the first and last aligned positions in the data stream. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should make the comment style consistent
Build finished. Test PASSed. |
Test PASSed. |
Build finished. Test PASSed. |
Test PASSed. |
1aa4c33
to
8cb2633
Compare
Merged build finished. Test PASSed. |
Test PASSed. |
src/plasma/plasma_store.cc
Outdated
@@ -231,7 +232,8 @@ int create_object(Client *client_context, | |||
return PlasmaError_OutOfMemory; | |||
} | |||
/* Allocate space for the new object */ | |||
uint8_t *pointer = (uint8_t *) dlmalloc(data_size + metadata_size); | |||
uint8_t *pointer = | |||
(uint8_t *) dlmemalign(BLOCK_SIZE, data_size + metadata_size); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should document why we are doing this, specifically that 64-byte alignment is REQUIRED by compute_object_hash_parallel
src/plasma/plasma_client.cc
Outdated
const uint64_t block_size = BLOCK_SIZE; | ||
/* Calculate the first and last aligned positions in the data stream. */ | ||
const uint64_t data_address = reinterpret_cast<uint64_t>(data); | ||
uint64_t left_address = (data_address + block_size - 1) & ~(block_size - 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be possible to simplify a bunch of the code here, e.g., left_address = data_address
. Is that right?
src/plasma/plasma_client.cc
Outdated
static inline bool compute_object_hash_parallel(XXH64_state_t *hash_state, | ||
const unsigned char *data, | ||
int64_t nbytes) { | ||
const uint64_t numthreads = THREADPOOL_SIZE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should check that data % 64 == 0
and document why we are requiring this (explain that we don't want it to straddle multiple cache blocks)
double elapsed = | ||
((tv2.tv_sec - tv1.tv_sec) * 1000000 + (tv2.tv_usec - tv1.tv_usec)) / 1000000.0; | ||
// TODO: replace this with ARROW_LOG(ARROW_INFO) or better equivalent. | ||
printf("Copied %llu bytes in time = %8.4f MBps=%8.4f\n", nbytes, elapsed, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this printf
still happening?
Merged build finished. Test PASSed. |
Test PASSed. |
Merged build finished. Test PASSed. |
Test PASSed. |
Merged build finished. Test PASSed. |
Test PASSed. |
Merged build finished. Test PASSed. |
Test PASSed. |
src/plasma/plasma_client.cc
Outdated
* be faster if the blocks that we divide the data into do not straddle extra | ||
* cache blocks. The incoming addresses are 64-byte aligned because we | ||
* allocate them with dlmemalign in create_object in plasma_store.cc. */ | ||
CHECK(data_address % 64 == 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The call to dlmemalign
is in the plasma store. This check here is in the plasma client, so the check only makes sense if the alignment is preserved by memory mapping.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this check is not necessary. The code above should correctly compute the hash regardless of alignment. The reason for this is that we always start the first chunk at the given data_address. The invariant is that, given any alignment, with fixed numthreads and blocksz, the chunks produced for each thread will be exactly the same. This is a correctness property for deterministically computing the object hash.
Merged build finished. Test PASSed. |
Test PASSed. |
src/plasma/plasma_client.cc
Outdated
* be faster if the blocks that we divide the data into do not straddle extra | ||
* cache blocks. The incoming addresses are 64-byte aligned because we | ||
* allocate them with dlmemalign in create_object in plasma_store.cc. */ | ||
CHECK(data_address % 64 == 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this check is not necessary. The code above should correctly compute the hash regardless of alignment. The reason for this is that we always start the first chunk at the given data_address. The invariant is that, given any alignment, with fixed numthreads and blocksz, the chunks produced for each thread will be exactly the same. This is a correctness property for deterministically computing the object hash.
src/plasma/plasma_store.cc
Outdated
* order to align the allocated region to a 64-byte boundary. This is not | ||
* strictly necessary, but it is an optimization that speeds up the | ||
* computation of a hash of the data (see compute_object_hash_parallel in | ||
* plasma_client.cc). */ | ||
uint8_t *pointer = | ||
(uint8_t *) dlmemalign(BLOCK_SIZE, data_size + metadata_size); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this helps, I can change the code back to dlmalloc for the purposes of this PR.
Merged build finished. Test PASSed. |
Test PASSed. |
No description provided.