Skip to content

Commit

Permalink
group many small memory copies into one memory copy for a large share…
Browse files Browse the repository at this point in the history
…d buffer

the job time for 100KB data decrease significantly
  • Loading branch information
Hui Li committed Jul 10, 2012
1 parent 5fa1beb commit aebdc83
Showing 1 changed file with 47 additions and 20 deletions.
67 changes: 47 additions & 20 deletions GPUMapReduce/PandaLib.cu
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
CGL MapReduce Framework on GPUs and CPUs
Code Name: Panda 0.1
File: PandaLib.cu
Time: 2012-07-01 V0.1
2012-07-08 V0.11
Versions: 2012-07-01 V0.1
2012-07-09 V0.12
Developer: Hui Li (lihui@indiana.edu)
Expand Down Expand Up @@ -134,11 +134,27 @@ __device__ void EmitIntermediate2(void *key, void *val, int keySize, int valSize
}*///if
//__syncthreads();


/*
char *p = (char *)kv_arr_p->arr;
kv_arr_p->arr = (keyval_t*)malloc(sizeof(keyval_t)*(kv_arr_p->arr_len+1));
for (int i=0; i<sizeof(keyval_t)*(kv_arr_p->arr_len);i++)
((char *)kv_arr_p->arr)[i] = p[i];
*/


keyval_t *p = (keyval_t *)kv_arr_p->arr;
kv_arr_p->arr = (keyval_t*)malloc(sizeof(keyval_t)*(kv_arr_p->arr_len+1));
for (int i=0; i<(kv_arr_p->arr_len);i++){
kv_arr_p->arr[i].key = p[i].key;
kv_arr_p->arr[i].keySize = p[i].keySize;
kv_arr_p->arr[i].val = p[i].val;
kv_arr_p->arr[i].valSize = p[i].valSize;
//((char *)kv_arr_p->arr)[i] = p[i];
}//for
free(p);
//TODO

int current_map_output_index = (kv_arr_p->arr_len);
keyval_t *kv_p = &(kv_arr_p->arr[current_map_output_index]);
kv_p->key = (char *)malloc(sizeof(keySize));
Expand All @@ -156,7 +172,7 @@ __device__ void EmitIntermediate2(void *key, void *val, int keySize, int valSize
//if(TID>96&&TID<120)
//for(int i=0;i<(kv_arr_p->arr_len);i++)

//printf("EmitInterMediate2 [%d] map_task_id:%d, key:%s arr_len:%d\n",TID, map_task_id, kv_arr_p->arr[i].key,(kv_arr_p->arr_len));
printf("EmitInterMediate2 TID[%d] map_task_id:%d, keyval_arr_len:%d\n",TID, map_task_idx, (kv_arr_p->arr_len));

//if(TID>300&&TID<320)
//for(int i=0;i<(kv_arr_p->arr_len);i++)
Expand All @@ -181,7 +197,8 @@ __global__ void Mapper2(
int bid = BLOCK_ID;
int tid = THREAD_ID;*/

int num_records_per_thread = d_g_state.h_num_input_record/(gridDim.x*blockDim.x);

int num_records_per_thread = (d_g_state.h_num_input_record+(gridDim.x*blockDim.x)-1)/(gridDim.x*blockDim.x);
int block_start_idx = num_records_per_thread*blockIdx.x*blockDim.x;
int thread_start_idx = block_start_idx
+ (threadIdx.x/STRIDE)*num_records_per_thread*STRIDE
Expand All @@ -191,10 +208,13 @@ __global__ void Mapper2(
//if (TID>=d_g_state.h_num_input_record)return;
if(thread_end_idx>d_g_state.h_num_input_record)
thread_end_idx = d_g_state.h_num_input_record;
printf("Mapper TID:%d, thread_start_idx:%d thread_end_idx:%d totalThreads:%d\n",TID, thread_start_idx,thread_end_idx,gridDim.x*blockDim.x);


for(int map_task_idx=thread_start_idx; map_task_idx < thread_end_idx; map_task_idx+=STRIDE){

//d_g_state.d_intermediate_keyval_total_count[map_task_idx]=0;



void *val = d_g_state.d_input_keyval_arr[map_task_idx].val;
int valSize = d_g_state.d_input_keyval_arr[map_task_idx].valSize;

Expand All @@ -206,7 +226,7 @@ __global__ void Mapper2(
/////////////////////////////////////////////


keyval_arr_t *kv_arr_p = (keyval_arr_t *)&(d_g_state.d_intermediate_keyval_arr_arr[map_task_idx]);
//keyval_arr_t *kv_arr_p = (keyval_arr_t *)&(d_g_state.d_intermediate_keyval_arr_arr[map_task_idx]);

//printf("\tmap_task_idx:%d reduce_arrr_len:%d",map_task_idx,kv_arr_p->arr_len);
}//for
Expand Down Expand Up @@ -288,7 +308,6 @@ __global__ void Mapper(char* inputKeys,

int startMap2(d_global_state *d_g_state)
{

//cudaHostAlloc((void**)&int_array_host, 10 * sizeof(int), cudaHostAllocMapped);
//int_array_host = (int*)malloc(10*sizeof(int));
//int_array_host = (int *)cudaHostAlloc(10*sizeof(int));
Expand All @@ -299,6 +318,7 @@ int startMap2(d_global_state *d_g_state)
//-------------------------------------------------------
//0, Check status of d_g_state;
//-------------------------------------------------------

DoLog("startMap2 Check status of d_g_state");
if (d_g_state->h_input_keyval_arr == NULL) { DoLog("Error: no any input keys"); exit(0);}

Expand All @@ -314,9 +334,10 @@ int startMap2(d_global_state *d_g_state)
(cudaMalloc((void**)&(d_keyval_arr_arr),d_g_state->h_num_input_record*sizeof(keyval_arr_t)));

for (int i=0; i<d_g_state->h_num_input_record;i++){
keyval_t *d_keyval;
(cudaMalloc((void**)&(d_keyval),sizeof(keyval_t)));
h_keyval_arr_arr[i].arr = d_keyval;
//keyval_t *d_keyval;
//(cudaMalloc((void**)&(d_keyval),sizeof(keyval_t)));
//h_keyval_arr_arr[i].arr = d_keyval;
h_keyval_arr_arr[i].arr = NULL;
//(cudaMalloc((void**)&(d_arr_len),sizeof(int)));
//(cudaMemcpy(d_arr_len, &h_arr_len, sizeof(int), cudaMemcpyHostToDevice));
h_keyval_arr_arr[i].arr_len = 0;
Expand All @@ -326,7 +347,7 @@ int startMap2(d_global_state *d_g_state)
//(*d_g_state).intermediate_keyval_arr_arr_len = spec->inputRecordCount;
d_g_state->d_intermediate_keyval_arr_arr = d_keyval_arr_arr;

int *count;
int *count = NULL;
checkCudaErrors(cudaMalloc((void**)&(count),d_g_state->h_num_input_record*sizeof(int)));
d_g_state->d_intermediate_keyval_total_count = count;
cudaMemset(d_g_state->d_intermediate_keyval_total_count,0,d_g_state->h_num_input_record*sizeof(int));
Expand All @@ -347,9 +368,11 @@ int startMap2(d_global_state *d_g_state)
dim3 h_dimGrid(4,1,1);
dim3 h_dimThread(1,1,1);
int sizeSmem = 128;*/



Mapper2<<<NUM_BLOCKS,NUM_THREADS>>>(*d_g_state);

cudaThreadSynchronize();
return 0;
}//int

Expand Down Expand Up @@ -394,7 +417,7 @@ void startGroup2(d_global_state*state){

d_global_state* d_g_state = state;
DoLog("===startGroup===");
sort_CPU(d_g_state);
sort_CPU2(d_g_state);

}

Expand Down Expand Up @@ -435,28 +458,31 @@ __device__ void *GetKey(void *key, int4* interOffsetSizes, int keyIndex, int val

__global__ void Reducer2(d_global_state d_g_state)
{
int num_records_per_thread = d_g_state.d_sorted_keyvals_arr_len/(gridDim.x*blockDim.x);

int num_records_per_thread = (d_g_state.d_sorted_keyvals_arr_len+(gridDim.x*blockDim.x)-1)/(gridDim.x*blockDim.x);
int block_start_idx = num_records_per_thread*blockIdx.x*blockDim.x;
int thread_start_idx = block_start_idx
+ (threadIdx.x/STRIDE)*num_records_per_thread*STRIDE
+ (threadIdx.x%STRIDE);
int thread_end_idx = thread_start_idx+num_records_per_thread*STRIDE;

//if (TID>=d_g_state.h_num_input_record)return;
if(thread_end_idx>d_g_state.h_num_input_record)
thread_end_idx = d_g_state.h_num_input_record;
if(thread_end_idx>d_g_state.d_sorted_keyvals_arr_len)
thread_end_idx = d_g_state.d_sorted_keyvals_arr_len;

printf("reducer2: TID:%d start_idx:%d end_idx:%d d_sorted_keyvals_arr_len:%d\n",TID,thread_start_idx,thread_end_idx,d_g_state.d_sorted_keyvals_arr_len);

for(int reduce_task_idx=thread_start_idx; reduce_task_idx < thread_end_idx; reduce_task_idx+=STRIDE){
keyvals_t *p = &(d_g_state.d_sorted_keyvals_arr[reduce_task_idx]);
reduce2(p->key, p->vals, p->keySize, p->val_arr_len, d_g_state);
}
}//for

//int map_task_id = TID;
//if (map_task_id>=d_g_state.d_sorted_keyvals_arr_len) return;
//invoke user implemeted reduce function
//run the assigned the reduce tasks.

//printf("reducer2: TID:%d\n",TID);

}


Expand Down Expand Up @@ -497,11 +523,12 @@ void startReduce2(d_global_state *d_g_state)
cudaThreadSynchronize();

Reducer2<<<NUM_BLOCKS,NUM_THREADS>>>(*d_g_state);
//cudaThreadSynchronize();
cudaThreadSynchronize();
}//void

void startReduce(Spec_t* spec)
{

Spec_t* g_spec = spec;

if (g_spec->interKeys == NULL) {DoLog( "Error: no any intermediate keys"); exit(0);}
Expand Down

0 comments on commit aebdc83

Please sign in to comment.