Skip to content

parallelize numbuf memcpy and plasma object hash construction #366

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

atumanov
Copy link
Contributor

No description provided.

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/279/
Test FAILed.

@@ -44,7 +50,11 @@ class FixedBufferStream : public arrow::io::OutputStream,
DCHECK(position_ + nbytes <= size_) << "position: " << position_
<< " nbytes: " << nbytes << "size: " << size_;
uint8_t* dst = data_ + position_;
memcpy(dst, data, nbytes);
if (nbytes >= (1<<20)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this a constant (and for the code that computes the hash too)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do, where's a good place to put it? It's worth defining things like #define MB (1<<20) and using that for better readability.

// Start the prefix thread.
threads.push_back(std::thread(
compute_block_hash, data, prefix, &threadhash[0]));
for (int i = 1; i <= numthreads; i++) {
Copy link
Contributor

@pcmoritz pcmoritz Mar 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's try to make NUMTHREADS the real # of threads here and have i = 0; for i = 1, ..., NUMTHREADS-2; i = NUMTHREADS-1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pcmoritz , I could do that, but in cases where we get aligned, well-behaved input, we won't even have to start the prefix and suffix threads, making the expected number of actual threads \in [numthreads; numthreads+2] in the general case. Alternatively, I could issue the prefix and suffix memcopy in the main thread, without spawning a thread for it. Would that be better? I felt that, having a guaranteed tight margin on the expected number of threads is sufficient.

@@ -24,5 +24,6 @@ echo "building arrow"
cd $TP_DIR/arrow/cpp
mkdir -p $TP_DIR/arrow/cpp/build
cd $TP_DIR/arrow/cpp/build
cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_FLAGS="-g" -DCMAKE_CXX_FLAGS="-g" -DARROW_BUILD_TESTS=OFF ..
cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_C_FLAGS="-g -lpthread" -DCMAKE_CXX_FLAGS="-g -lpthread" -DARROW_BUILD_TESTS=OFF ..
make clean
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should consider removing this before merging, since we probably don't want to rebuild arrow normally

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i'll remove make clean before merging.

const uint64_t numthreads = NUMTHREADS;
uint64_t threadhash[numthreads+2];
//CHECK(numthreads > 0);
const uint64_t blocksz = 64; // cache block alignment (alternative: page size)
Copy link
Contributor

@pcmoritz pcmoritz Mar 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not use abbreviations here (block_size, data_begin maybe, data_end or names like this)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's going to be unbelievably verbose. The more verbose things are, the more difficult it is to read code. Using "sz" suffix is standard in the linux kernel. See here for example:
https://github.com/torvalds/linux/blob/5924bbecd0267d87c24110cbe2041b5075173a25/arch/microblaze/include/asm/mmu.h#L101

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, we need to have a consistent style here. We are using extremely few abbreviations in the code right now (basically just abbreviating id for identifier and db for database). I think that will make it more understandable to people. I'm with you on trying to be concise with code, but let's not trade off readability. It will be good if our code is readable by many different people.

//CHECK(numthreads > 0);
const uint64_t blocksz = 64; // cache block alignment (alternative: page size)
// Calculate the first and last aligned positions in the data stream.
unsigned char *databp = (unsigned char *)(((uint64_t)data + blocksz-1) & ~(blocksz-1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, can you explain how this works? any way to simplify this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is meant to work for blocksz = any power of 2. The only way to simplify is to assume some specific alignment and substitute the constants. Let's assume 64 byte alignment. The first term on the rhs, tips the pointer over the closest aligned memory position. The second term is the negation of the power of 2 alignment minus 1. So it leaves all bits on except the bits that correspond to the bit-width of the alignment (in this case : 6 least significant bits will be zero). Doing a binary AND of the first term with the second term zeros out 6 least significant bits, thus causing the result to be 64-byte aligned to the next 64-byte aligned memory position. If it's already aligned, it stays the same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are a few ways this could be simplified, like doing a shift right, shift left by the alignment bitwidth, but it may get weird on different architectures, if you are not careful. Because the shift can be with/without sign bit, with/without carry. It would also require us to add a variable that derives the alignment bitwidth of the specified block size (i.e. a log base 2 of blocksz).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok cool thanks, that makes sense, let's keep it the way it is and add some comments that describe what you just said

Copy link
Contributor

@pcmoritz pcmoritz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also have a C unit tests that tests that the parallel memcpy is correct.

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/289/
Test FAILed.

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/290/
Test FAILed.

@pcmoritz pcmoritz force-pushed the parallel-objecthash-merge branch from 19c609b to a658e50 Compare March 20, 2017 10:37
@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/353/
Test PASSed.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/356/
Test PASSed.

class FixedBufferStream : public arrow::io::OutputStream,
public arrow::io::ReadableFileInterface {
public:
virtual ~FixedBufferStream() {}

explicit FixedBufferStream(uint8_t* data, int64_t nbytes)
: data_(data), position_(0), size_(nbytes) {}
: data_(data), position_(0), size_(nbytes), threadpool_(THREADPOOL_SIZE) {}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be great if you could contribute this code to arrow/io. I opened JIRAs for both the FixedBufferStream and multithreaded memcpy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey Wes,
We're refactoring this into a standalone class. Should be easier to contribute to arrow as well when we're done with it.
Thanks,
Alexey

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/364/
Test PASSed.

memcpy(dst, data, nbytes);
}
memcpy_helper.memcopy(dst, data, nbytes);
// if (nbytes >= BYTES_IN_MB) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's get rid of these comments

*hash = XXH64_digest(&hash_state);
}

inline bool compute_object_hash_parallel(XXH64_state_t *hash_state,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do the same naming convention here as in ParallelMemcpy

@AmplabJenkins
Copy link

Build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/370/
Test PASSed.

@AmplabJenkins
Copy link

Build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/372/
Test PASSed.

@AmplabJenkins
Copy link

Build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/374/
Test PASSed.

const uint64_t numthreads = THREADPOOL_SIZE;
uint64_t threadhash[numthreads + 2];
const uint64_t block_size = BLOCK_SIZE;
// Calculate the first and last aligned positions in the data stream.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should make the comment style consistent

@AmplabJenkins
Copy link

Build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/378/
Test PASSed.

@AmplabJenkins
Copy link

Build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/379/
Test PASSed.

@atumanov atumanov force-pushed the parallel-objecthash-merge branch from 1aa4c33 to 8cb2633 Compare March 21, 2017 05:28
@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/382/
Test PASSed.

@@ -231,7 +232,8 @@ int create_object(Client *client_context,
return PlasmaError_OutOfMemory;
}
/* Allocate space for the new object */
uint8_t *pointer = (uint8_t *) dlmalloc(data_size + metadata_size);
uint8_t *pointer =
(uint8_t *) dlmemalign(BLOCK_SIZE, data_size + metadata_size);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should document why we are doing this, specifically that 64-byte alignment is REQUIRED by compute_object_hash_parallel

const uint64_t block_size = BLOCK_SIZE;
/* Calculate the first and last aligned positions in the data stream. */
const uint64_t data_address = reinterpret_cast<uint64_t>(data);
uint64_t left_address = (data_address + block_size - 1) & ~(block_size - 1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be possible to simplify a bunch of the code here, e.g., left_address = data_address. Is that right?

static inline bool compute_object_hash_parallel(XXH64_state_t *hash_state,
const unsigned char *data,
int64_t nbytes) {
const uint64_t numthreads = THREADPOOL_SIZE;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check that data % 64 == 0 and document why we are requiring this (explain that we don't want it to straddle multiple cache blocks)

double elapsed =
((tv2.tv_sec - tv1.tv_sec) * 1000000 + (tv2.tv_usec - tv1.tv_usec)) / 1000000.0;
// TODO: replace this with ARROW_LOG(ARROW_INFO) or better equivalent.
printf("Copied %llu bytes in time = %8.4f MBps=%8.4f\n", nbytes, elapsed,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this printf still happening?

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/384/
Test PASSed.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/385/
Test PASSed.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/388/
Test PASSed.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/389/
Test PASSed.

* be faster if the blocks that we divide the data into do not straddle extra
* cache blocks. The incoming addresses are 64-byte aligned because we
* allocate them with dlmemalign in create_object in plasma_store.cc. */
CHECK(data_address % 64 == 0);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The call to dlmemalign is in the plasma store. This check here is in the plasma client, so the check only makes sense if the alignment is preserved by memory mapping.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this check is not necessary. The code above should correctly compute the hash regardless of alignment. The reason for this is that we always start the first chunk at the given data_address. The invariant is that, given any alignment, with fixed numthreads and blocksz, the chunks produced for each thread will be exactly the same. This is a correctness property for deterministically computing the object hash.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/390/
Test PASSed.

* be faster if the blocks that we divide the data into do not straddle extra
* cache blocks. The incoming addresses are 64-byte aligned because we
* allocate them with dlmemalign in create_object in plasma_store.cc. */
CHECK(data_address % 64 == 0);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this check is not necessary. The code above should correctly compute the hash regardless of alignment. The reason for this is that we always start the first chunk at the given data_address. The invariant is that, given any alignment, with fixed numthreads and blocksz, the chunks produced for each thread will be exactly the same. This is a correctness property for deterministically computing the object hash.

* order to align the allocated region to a 64-byte boundary. This is not
* strictly necessary, but it is an optimization that speeds up the
* computation of a hash of the data (see compute_object_hash_parallel in
* plasma_client.cc). */
uint8_t *pointer =
(uint8_t *) dlmemalign(BLOCK_SIZE, data_size + metadata_size);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this helps, I can change the code back to dlmalloc for the purposes of this PR.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/391/
Test PASSed.

@robertnishihara robertnishihara merged commit a3d5860 into ray-project:master Mar 21, 2017
@robertnishihara robertnishihara deleted the parallel-objecthash-merge branch March 21, 2017 23:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants