Skip to content

Commit

Permalink
replace all small memory copies with grouped memory copy. fixed a bug…
Browse files Browse the repository at this point in the history
… in array access.

this version is much faster than previous version
  • Loading branch information
Hui Li committed Jul 13, 2012
1 parent aebdc83 commit 1286360
Show file tree
Hide file tree
Showing 2 changed files with 294 additions and 25 deletions.
60 changes: 43 additions & 17 deletions GPUMapReduce/PandaLib.cu
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ __device__ void EmitIntermediate2(void *key, void *val, int keySize, int valSize
//if(TID>96&&TID<120)
//for(int i=0;i<(kv_arr_p->arr_len);i++)

printf("EmitInterMediate2 TID[%d] map_task_id:%d, keyval_arr_len:%d\n",TID, map_task_idx, (kv_arr_p->arr_len));
//printf("EmitInterMediate2 TID[%d] map_task_id:%d, key:%s keyval_arr_len:%d\n",TID, map_task_idx, key, (kv_arr_p->arr_len));

//if(TID>300&&TID<320)
//for(int i=0;i<(kv_arr_p->arr_len);i++)
Expand Down Expand Up @@ -208,7 +208,7 @@ __global__ void Mapper2(
//if (TID>=d_g_state.h_num_input_record)return;
if(thread_end_idx>d_g_state.h_num_input_record)
thread_end_idx = d_g_state.h_num_input_record;
printf("Mapper TID:%d, thread_start_idx:%d thread_end_idx:%d totalThreads:%d\n",TID, thread_start_idx,thread_end_idx,gridDim.x*blockDim.x);
//printf("Mapper TID:%d, thread_start_idx:%d thread_end_idx:%d totalThreads:%d\n",TID, thread_start_idx,thread_end_idx,gridDim.x*blockDim.x);


for(int map_task_idx=thread_start_idx; map_task_idx < thread_end_idx; map_task_idx+=STRIDE){
Expand Down Expand Up @@ -417,7 +417,7 @@ void startGroup2(d_global_state*state){

d_global_state* d_g_state = state;
DoLog("===startGroup===");
sort_CPU2(d_g_state);
sort_CPU3(d_g_state);

}

Expand Down Expand Up @@ -448,7 +448,7 @@ __device__ void *GetKey(void *key, int4* interOffsetSizes, int keyIndex, int val
{
int4 offset = interOffsetSizes[valStartIndex];
return (void*)((char*)key + keyIndex * offset.y);
}
}//


//-------------------------------------------------------
Expand All @@ -458,23 +458,50 @@ __device__ void *GetKey(void *key, int4* interOffsetSizes, int keyIndex, int val

__global__ void Reducer2(d_global_state d_g_state)
{

int num_records_per_thread = (d_g_state.d_sorted_keyvals_arr_len+(gridDim.x*blockDim.x)-1)/(gridDim.x*blockDim.x);

int block_start_idx = num_records_per_thread*blockIdx.x*blockDim.x;
int thread_start_idx = block_start_idx
+ (threadIdx.x/STRIDE)*num_records_per_thread*STRIDE
+ (threadIdx.x%STRIDE);
int thread_end_idx = thread_start_idx+num_records_per_thread*STRIDE;

int thread_end_idx = thread_start_idx+num_records_per_thread*STRIDE;

//if (TID>=d_g_state.h_num_input_record)return;
if(thread_end_idx>d_g_state.d_sorted_keyvals_arr_len)
thread_end_idx = d_g_state.d_sorted_keyvals_arr_len;

printf("reducer2: TID:%d start_idx:%d end_idx:%d d_sorted_keyvals_arr_len:%d\n",TID,thread_start_idx,thread_end_idx,d_g_state.d_sorted_keyvals_arr_len);
//printf("reducer2: TID:%d start_idx:%d end_idx:%d d_sorted_keyvals_arr_len:%d\n",TID,thread_start_idx,thread_end_idx,d_g_state.d_sorted_keyvals_arr_len);

int start, end;
for(int reduce_task_idx=thread_start_idx; reduce_task_idx < thread_end_idx; reduce_task_idx+=STRIDE){
keyvals_t *p = &(d_g_state.d_sorted_keyvals_arr[reduce_task_idx]);
reduce2(p->key, p->vals, p->keySize, p->val_arr_len, d_g_state);
if (reduce_task_idx==0)
start = 0;
else
start = d_g_state.d_pos_arr_4_sorted_keyval_pos_arr[reduce_task_idx-1];
end = d_g_state.d_pos_arr_4_sorted_keyval_pos_arr[reduce_task_idx];

val_t *val_t_arr = (val_t*)malloc(sizeof(val_t)*(end-start));

int keySize = d_g_state.d_keyval_pos_arr[start].keySize;
int keyPos = d_g_state.d_keyval_pos_arr[start].keyPos;
void *key = (char*)d_g_state.d_intermediate_keys_shared_buff+keyPos;
//printf("reduce_task_idx:%d keyPos:%d, keySize:%d, key:%s start:%d end:%d\n",reduce_task_idx,keyPos,keySize,key,start,end);


for (int index = start;index<end;index++){
int valSize = d_g_state.d_keyval_pos_arr[index].valSize;
int valPos = d_g_state.d_keyval_pos_arr[index].valPos;
// printf("reduce_task_idx:%d valSize:%d valPos:%d\n",reduce_task_idx,valSize,valPos);
val_t_arr[index-start].valSize = valSize;
val_t_arr[index-start].val = (char*)d_g_state.d_intermediate_vals_shared_buff + valPos;
//printf("reduce_task_idx:%d key:%s val:%d\n",reduce_task_idx,key, *(int*)val_t_arr[index-start].val);
}

reduce2(key, val_t_arr, keySize, end-start, d_g_state);


}//for

//int map_task_id = TID;
Expand Down Expand Up @@ -513,16 +540,15 @@ __global__ void Reducer2(d_global_state d_g_state)
void startReduce2(d_global_state *d_g_state)
{
cudaThreadSynchronize();
printf("startReducer2: keyval_arr_len %d\n",d_g_state->d_sorted_keyvals_arr_len);

d_g_state->d_reduced_keyval_arr_len = d_g_state->d_sorted_keyvals_arr_len;

cudaMalloc((void **)&(d_g_state->d_reduced_keyval_arr), sizeof(keyval_t)*d_g_state->d_reduced_keyval_arr_len);

//printData3<<<1,d_g_state->d_sorted_keyvals_arr_len>>>(*d_g_state);

printf("Start Reducer2: Keyval_arr_len %d\n",d_g_state->d_sorted_keyvals_arr_len);
cudaThreadSynchronize();

Reducer2<<<NUM_BLOCKS,NUM_THREADS>>>(*d_g_state);
printf("Reducer2 DONE\n");
cudaThreadSynchronize();
}//void

Expand Down Expand Up @@ -569,12 +595,12 @@ void MapReduce2(d_global_state *state){
//4, start reduce
//-------------------------------------------
DoLog( "\t\t----------start reduce--------");
TimeVal_t reduceTimer;
startTimer(&reduceTimer);
//TimeVal_t reduceTimer;
//startTimer(&reduceTimer);

startReduce2(d_g_state);
cudaThreadSynchronize();
endTimer("Reduce", &reduceTimer);
//endTimer("Reduce", &reduceTimer);

EXIT_MR:

Expand Down Expand Up @@ -617,4 +643,4 @@ void FinishMapReduce2(d_global_state* state)
}//void


#endif //__PANDALIB_CU__
#endif //__PANDALIB_CU__
Loading

0 comments on commit 1286360

Please sign in to comment.