Skip to content

move printf to debug #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 33 additions & 12 deletions als/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,37 @@ This folder contains:

(2) the JNI code to link to and accelerate the ALS.scala program in Spark MLlib.

## Technical details
By optimizing memory access and parallelism, cuMF is much faster and cost-efficient compared with state-of-art CPU based solutions.
## What is matrix factorization?

More details can be found at:
Matrix factorization (MF) factors a sparse rating matrix R (m by n, with N_z non-zero elements) into a m-by-f and a f-by-n matrices, as shown below.

<img src=https://github.com/wei-tan/CUDA-MLlib/raw/master/als/images/mf.png width=444 height=223 />

Matrix factorization (MF) is at the core of many popular algorithms, e.g., [collaborative filtering](https://en.wikipedia.org/wiki/Collaborative_filtering), word embedding, and topic model. GPU (graphics processing units) with massive cores and high intra-chip memory bandwidth sheds light on accelerating MF much further when appropriately exploiting its architectural characteristics.

## What is cuMF?

**CuMF** is a CUDA-based matrix factorization library that optimizes alternate least square (ALS) method to solve very large-scale MF. CuMF uses a set of techniques to maximize the performance on single and multiple GPUs. These techniques include smart access of sparse data leveraging GPU memory hierarchy, using data parallelism in conjunction with model parallelism, minimizing the communication overhead among GPUs, and a novel topology-aware parallel reduction scheme.

With only a single machine with four Nvidia GPU cards, cuMF can be 6-10 times as fast, and 33-100 times as cost-efficient, compared with the state-of-art distributed CPU solutions. Moreover, cuMF can solve the largest matrix factorization problem ever reported yet in current literature.

CuMF achieves excellent scalability and performance by innovatively applying the following techniques on GPUs:

(1) On a single GPU, MF deals with sparse matrices, which makes it difficult to utilize GPU's compute power. We optimize memory access in ALS by various techniques including reducing discontiguous memory access, retaining hotspot variables in faster memory, and aggressively using registers. By this means cuMF gets closer to the roofline performance of a single GPU.

1) This Nvidia GTC 2016 talk
ppt:
(2) On multiple GPUs, we add data parallelism to ALS's inherent model parallelism. Data parallelism needs a faster reduction operation among GPUs, leading to (3).

<http://www.slideshare.net/tanwei/s6211-cumf-largescale-matrix-factorization-on-just-one-machine-with-gpus>
(3) We also develop an innovative topology-aware, parallel reduction method to fully leverage the bandwidth between GPUs. By this means cuMF ensures that multiple GPUs are efficiently utilized simultaneously.

video:
## Use cuMF to accelerate Spark ALS

<http://on-demand.gputechconf.com/gtc/2016/video/S6211.html>
CuMF can be used standalone, or to accelerate the [ALS implementation in Spark MLlib](https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala).

2) This HPDC 2016 paper:
We modified Spark's ml/recommendation/als.scala ([code](https://github.com/wei-tan/SparkGPU/blob/MLlib/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala)) to detect GPU and offload the ALS forming and solving to GPUs, while retain shuffling on Spark RDD.

"Faster and Cheaper: Parallelizing Large-Scale Matrix Factorization on GPUs"
<http://arxiv.org/abs/1603.03820>
<img src=https://github.com/wei-tan/CUDA-MLlib/raw/master/als/images/spark-gpu.png width=380 height=240 />

This approach has several advantages. First, existing Spark applications relying on mllib/ALS need no change. Second, we leverage the best of Spark (to scale-out to multiple nodes) and GPU (to scale-up in one node).

## Build
There are scripts to build the program locally, run in local mode, and run in distributed mode.
Expand Down Expand Up @@ -52,4 +65,12 @@ We are trying to improve the usability, stability and performance. Here are some

(1) Out-of-memory error from GPUs, when there are many CPU threads accessing a small number of GPUs on any node. We tested Netflix data on one node, with 12 CPU cores used by the executor, and 2 Nvidia K40 GPU cards. If you have more GPU cards, you may be able to accomodate more CPU cores/threads. Otherwise you need to lessen the #cores assigned to Spark executor.

(2) CPU-GPU hybrid execution. We want to push as much workload to GPU as possible. If GPUs cannot accomodate all CPU threads, we want to retain the execution on CPUs.
(2) CPU-GPU hybrid execution. We want to push as much workload to GPU as possible. If GPUs cannot accomodate all CPU threads, we want to retain the execution on CPUs.

## References

More details can be found at:

1) CuMF: Large-Scale Matrix Factorization on Just One Machine with GPUs. Nvidia GTC 2016 talk. [ppt](http://www.slideshare.net/tanwei/s6211-cumf-largescale-matrix-factorization-on-just-one-machine-with-gpus), [video](http://on-demand.gputechconf.com/gtc/2016/video/S6211.html)

2) Faster and Cheaper: Parallelizing Large-Scale Matrix Factorization on GPUs. Wei Tan, Liangliang Cao, Liana Fong. [HPDC 2016](http://arxiv.org/abs/1603.03820), Kyoto, Japan
Binary file added als/images/mf.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added als/images/spark-gpu.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
192 changes: 98 additions & 94 deletions als/src/CuMFJNIInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,107 +24,111 @@

JNIEXPORT jobjectArray JNICALL Java_org_apache_spark_ml_recommendation_CuMFJNIInterface_doALSWithCSR
(JNIEnv * env, jobject obj, jint m, jint n, jint f, jint nnz, jdouble lambda, jobjectArray sortedSrcFactors, jintArray csrRow, jintArray csrCol, jfloatArray csrVal){
//checkCudaErrors(cudaSetDevice(1));
//use multiple GPUs
//select a GPU for *this* specific dataset
int whichGPU = get_gpu();
checkCudaErrors(cudaSetDevice(whichGPU));
cudaStream_t cuda_stream;
cudaStreamCreate(&cuda_stream);
/* check correctness
int csrRowlen = env->GetArrayLength(csrRow);
int csrCollen = env->GetArrayLength(csrCol);
int csrVallen = env->GetArrayLength(csrVal);
assert(csrRowlen == m + 1);
assert(csrCollen == nnz);
assert(csrVallen == nnz);
*/
int* csrRowIndexHostPtr;
int* csrColIndexHostPtr;
float* csrValHostPtr;
/*
printf("csrRow of len %d: ", len);
for (int i = 0; i < len; i++) {
printf("%d ", body[i]);
}
printf("\n");
*/
//calculate X from thetaT
float* thetaTHost;
cudacall(cudaMallocHost( (void** ) &thetaTHost, n * f * sizeof(thetaTHost[0])) );
//to be returned
float* XTHost;
cudacall(cudaMallocHost( (void** ) &XTHost, m * f * sizeof(XTHost[0])) );

int numSrcBlocks = env->GetArrayLength(sortedSrcFactors);
//WARNING: ReleaseFloatArrayElements and DeleteLocalRef are important;
//Otherwise result is correct but performance is bad
int index = 0;
for(int i = 0; i < numSrcBlocks; i++){
jobject factorsPerBlock = env->GetObjectArrayElement(sortedSrcFactors, i);
int numFactors = env->GetArrayLength((jobjectArray)factorsPerBlock);
for(int j = 0; j < numFactors; j++){
jobject factor = env->GetObjectArrayElement((jobjectArray)factorsPerBlock, j);
jfloat *factorfloat = (jfloat *) env->GetPrimitiveArrayCritical( (jfloatArray)factor, 0);
memcpy(thetaTHost + index*f, factorfloat, sizeof(float)*f);
index ++;
env->ReleasePrimitiveArrayCritical((jfloatArray)factor, factorfloat, 0);
env->DeleteLocalRef(factor);
try{
//checkCudaErrors(cudaSetDevice(1));
//use multiple GPUs
//select a GPU for *this* specific dataset
int whichGPU = get_gpu();
checkCudaErrors(cudaSetDevice(whichGPU));
cudaStream_t cuda_stream;
cudaStreamCreate(&cuda_stream);
/* check correctness
int csrRowlen = env->GetArrayLength(csrRow);
int csrCollen = env->GetArrayLength(csrCol);
int csrVallen = env->GetArrayLength(csrVal);
assert(csrRowlen == m + 1);
assert(csrCollen == nnz);
assert(csrVallen == nnz);
*/
int* csrRowIndexHostPtr;
int* csrColIndexHostPtr;
float* csrValHostPtr;
/*
printf("csrRow of len %d: ", len);
for (int i = 0; i < len; i++) {
printf("%d ", body[i]);
}
env->DeleteLocalRef(factorsPerBlock);
}
// get a pointer to the raw input data, pinning them in memory
csrRowIndexHostPtr = (jint*) env->GetPrimitiveArrayCritical(csrRow, 0);
csrColIndexHostPtr = (jint*) env->GetPrimitiveArrayCritical(csrCol, 0);
csrValHostPtr = (jfloat*) env->GetPrimitiveArrayCritical(csrVal, 0);
printf("\n");
*/
//calculate X from thetaT
float* thetaTHost;
cudacall(cudaMallocHost( (void** ) &thetaTHost, n * f * sizeof(thetaTHost[0])) );
//to be returned
float* XTHost;
cudacall(cudaMallocHost( (void** ) &XTHost, m * f * sizeof(XTHost[0])) );

int numSrcBlocks = env->GetArrayLength(sortedSrcFactors);
//WARNING: ReleaseFloatArrayElements and DeleteLocalRef are important;
//Otherwise result is correct but performance is bad
int index = 0;
for(int i = 0; i < numSrcBlocks; i++){
jobject factorsPerBlock = env->GetObjectArrayElement(sortedSrcFactors, i);
int numFactors = env->GetArrayLength((jobjectArray)factorsPerBlock);
for(int j = 0; j < numFactors; j++){
jobject factor = env->GetObjectArrayElement((jobjectArray)factorsPerBlock, j);
jfloat *factorfloat = (jfloat *) env->GetPrimitiveArrayCritical( (jfloatArray)factor, 0);
memcpy(thetaTHost + index*f, factorfloat, sizeof(float)*f);
index ++;
env->ReleasePrimitiveArrayCritical((jfloatArray)factor, factorfloat, 0);
env->DeleteLocalRef(factor);
}
env->DeleteLocalRef(factorsPerBlock);
}
// get a pointer to the raw input data, pinning them in memory
csrRowIndexHostPtr = (jint*) env->GetPrimitiveArrayCritical(csrRow, 0);
csrColIndexHostPtr = (jint*) env->GetPrimitiveArrayCritical(csrCol, 0);
csrValHostPtr = (jfloat*) env->GetPrimitiveArrayCritical(csrVal, 0);

/*
printf("thetaTHost of len %d: \n", n*f);
for (int i = 0; i < n*f; i++) {
printf("%f ", thetaTHost[i]);
}
printf("\n");
*/
int * d_csrRowIndex = 0;
int * d_csrColIndex = 0;
float * d_csrVal = 0;
/*
printf("thetaTHost of len %d: \n", n*f);
for (int i = 0; i < n*f; i++) {
printf("%f ", thetaTHost[i]);
}
printf("\n");
*/
int * d_csrRowIndex = 0;
int * d_csrColIndex = 0;
float * d_csrVal = 0;

cudacall(cudaMalloc((void** ) &d_csrRowIndex,(m + 1) * sizeof(float)));
cudacall(cudaMalloc((void** ) &d_csrColIndex, nnz * sizeof(float)));
cudacall(cudaMalloc((void** ) &d_csrVal, nnz * sizeof(float)));
cudacall(cudaMemcpyAsync(d_csrRowIndex, csrRowIndexHostPtr,(size_t ) ((m + 1) * sizeof(float)), cudaMemcpyHostToDevice, cuda_stream));
cudacall(cudaMemcpyAsync(d_csrColIndex, csrColIndexHostPtr,(size_t ) (nnz * sizeof(float)), cudaMemcpyHostToDevice, cuda_stream));
cudacall(cudaMemcpyAsync(d_csrVal, csrValHostPtr,(size_t ) (nnz * sizeof(float)),cudaMemcpyHostToDevice, cuda_stream));
cudaStreamSynchronize(cuda_stream);
cudacall(mallocBest((void** ) &d_csrRowIndex,(m + 1) * sizeof(float)));
cudacall(mallocBest((void** ) &d_csrColIndex, nnz * sizeof(float)));
cudacall(mallocBest((void** ) &d_csrVal, nnz * sizeof(float)));
cudacall(cudaMemcpyAsync(d_csrRowIndex, csrRowIndexHostPtr,(size_t ) ((m + 1) * sizeof(float)), cudaMemcpyHostToDevice, cuda_stream));
cudacall(cudaMemcpyAsync(d_csrColIndex, csrColIndexHostPtr,(size_t ) (nnz * sizeof(float)), cudaMemcpyHostToDevice, cuda_stream));
cudacall(cudaMemcpyAsync(d_csrVal, csrValHostPtr,(size_t ) (nnz * sizeof(float)),cudaMemcpyHostToDevice, cuda_stream));
cudaStreamSynchronize(cuda_stream);

// un-pin the host arrays, as we're done with them
env->ReleasePrimitiveArrayCritical(csrRow, csrRowIndexHostPtr, 0);
env->ReleasePrimitiveArrayCritical(csrCol, csrColIndexHostPtr, 0);
env->ReleasePrimitiveArrayCritical(csrVal, csrValHostPtr, 0);
// un-pin the host arrays, as we're done with them
env->ReleasePrimitiveArrayCritical(csrRow, csrRowIndexHostPtr, 0);
env->ReleasePrimitiveArrayCritical(csrCol, csrColIndexHostPtr, 0);
env->ReleasePrimitiveArrayCritical(csrVal, csrValHostPtr, 0);

printf("\tdoALSWithCSR with m=%d,n=%d,f=%d,nnz=%d,lambda=%f \n.", m, n, f, nnz, lambda);
try{
#ifdef DEBUG
printf("\tdoALSWithCSR with m=%d,n=%d,f=%d,nnz=%d,lambda=%f \n.", m, n, f, nnz, lambda);
#endif
doALSWithCSR(cuda_stream, d_csrRowIndex, d_csrColIndex, d_csrVal, thetaTHost, XTHost, m, n, f, nnz, lambda, 1);
}
catch (thrust::system_error &e) {
printf("CUDA error during some_function: %s", e.what());

}
jclass floatArrayClass = env->FindClass("[F");
jobjectArray output = env->NewObjectArray(m, floatArrayClass, 0);
for (int i = 0; i < m; i++) {
jfloatArray col = env->NewFloatArray(f);
env->SetFloatArrayRegion(col, 0, f, XTHost + i*f);
env->SetObjectArrayElement(output, i, col);
env->DeleteLocalRef(col);
}
cudaFreeHost(thetaTHost);
cudaFreeHost(XTHost);
//TODO: stream create and destroy expensive?
checkCudaErrors(cudaStreamSynchronize(cuda_stream));
checkCudaErrors(cudaStreamDestroy(cuda_stream));
cudaCheckError();
return output;
jclass floatArrayClass = env->FindClass("[F");
jobjectArray output = env->NewObjectArray(m, floatArrayClass, 0);
for (int i = 0; i < m; i++) {
jfloatArray col = env->NewFloatArray(f);
env->SetFloatArrayRegion(col, 0, f, XTHost + i*f);
env->SetObjectArrayElement(output, i, col);
env->DeleteLocalRef(col);
}
cudaFreeHost(thetaTHost);
cudaFreeHost(XTHost);
//TODO: stream create and destroy expensive?
checkCudaErrors(cudaStreamSynchronize(cuda_stream));
checkCudaErrors(cudaStreamDestroy(cuda_stream));
#ifdef DEBUG
cudaCheckError();
#endif
return output;
}
catch (thrust::system_error &e) {
printf("CUDA error during some function: %s", e.what());
}
}

JNIEXPORT void JNICALL Java_org_apache_spark_ml_recommendation_CuMFJNIInterface_testjni
Expand Down
Loading