Skip to content

Commit

Permalink
cleaned up the code
Browse files Browse the repository at this point in the history
  • Loading branch information
Hui committed Jul 13, 2012
1 parent 289f115 commit f34c5db
Showing 1 changed file with 18 additions and 84 deletions.
102 changes: 18 additions & 84 deletions GPUMapReduce/PandaLib.cu
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
CGL MapReduce Framework on GPUs and CPUs
Code Name: Panda 0.1
File: PandaLib.cu
Versions: 2012-07-01 V0.1
2012-07-09 V0.12
First Version: 2012-07-01 V0.1
Github: https://github.com/cyberaide/biostatistics/tree/master/GPUMapReduce
Developer: Hui Li (lihui@indiana.edu)
This is the source code for Panda, a MapReduce runtime on GPUs and CPUs.
*/

#ifndef __PANDALIB_CU__
Expand Down Expand Up @@ -40,8 +40,9 @@ d_global_state *GetDGlobalState(){

void InitMapReduce2(d_global_state* d_g_state)
{

//init d_g_state


//load input records from host memory to device memory.
cudaMalloc((void **)&d_g_state->d_input_keyval_arr,sizeof(keyval_t)*d_g_state->h_num_input_record);
keyval_t* h_buff = (keyval_t*)malloc(sizeof(keyval_t)*(d_g_state->h_num_input_record));
Expand All @@ -55,9 +56,8 @@ void InitMapReduce2(d_global_state* d_g_state)
cudaMemcpy(h_buff[i].val,d_g_state->h_input_keyval_arr[i].val,h_buff[i].valSize,cudaMemcpyHostToDevice);
}//for
cudaMemcpy(d_g_state->d_input_keyval_arr,h_buff,sizeof(keyval_t)*d_g_state->h_num_input_record,cudaMemcpyHostToDevice);
//printData2<<<1,d_g_state->h_num_input_record>>>(*d_g_state);
cudaThreadSynchronize();
}
}//void

/*
void InitMapReduce(Spec_t* spec)
Expand Down Expand Up @@ -116,33 +116,8 @@ __device__ void Emit2 (void* key,

__device__ void EmitIntermediate2(void *key, void *val, int keySize, int valSize, d_global_state *d_g_state, int map_task_idx){



keyval_arr_t *kv_arr_p = (keyval_arr_t *)&(d_g_state->d_intermediate_keyval_arr_arr[map_task_idx]);

//printf("\tEmitInterMediate map task id[%d] key:%s kv_arr_p->arr_len:%d\n",map_task_id,(char *)key, *(int *)(kv_arr_p->arr_len));
//if there is not enough space to store intermediate key value pairs
/*if ((kv_arr_p->arr_len)== *(kv_arr_p->arr_alloc_len)){
*(kv_arr_p->arr_alloc_len) *= 2;
//printf("\tincrease buffer for map task[%d] arr_len:%d\n", map_task_id, *(kv_arr_p->arr_alloc_len));
char *p = (char *)kv_arr_p->arr;
//kv_arr_p->arr = (keyval_t *)malloc(sizeof(keyval_t)*(*kv_arr_p->arr_alloc_len));
for (int i=0;i<sizeof(keyval_t)*(*kv_arr_p->arr_alloc_len)/2;i++)
//((char *)kv_arr_p->arr)[i] = p[i];
memcpy(kv_arr_p->arr,p,(*kv_arr_p->arr_alloc_len)/2);
//free(p);
}*///if
//__syncthreads();


/*
char *p = (char *)kv_arr_p->arr;
kv_arr_p->arr = (keyval_t*)malloc(sizeof(keyval_t)*(kv_arr_p->arr_len+1));
for (int i=0; i<sizeof(keyval_t)*(kv_arr_p->arr_len);i++)
((char *)kv_arr_p->arr)[i] = p[i];
*/


keyval_t *p = (keyval_t *)kv_arr_p->arr;
kv_arr_p->arr = (keyval_t*)malloc(sizeof(keyval_t)*(kv_arr_p->arr_len+1));
for (int i=0; i<(kv_arr_p->arr_len);i++){
Expand All @@ -153,7 +128,6 @@ __device__ void EmitIntermediate2(void *key, void *val, int keySize, int valSize
//((char *)kv_arr_p->arr)[i] = p[i];
}//for
free(p);
//TODO

int current_map_output_index = (kv_arr_p->arr_len);
keyval_t *kv_p = &(kv_arr_p->arr[current_map_output_index]);
Expand All @@ -168,19 +142,8 @@ __device__ void EmitIntermediate2(void *key, void *val, int keySize, int valSize
kv_arr_p->arr_len++;
d_g_state->d_intermediate_keyval_total_count[map_task_idx] = kv_arr_p->arr_len;

//int i=0;
//if(TID>96&&TID<120)
//for(int i=0;i<(kv_arr_p->arr_len);i++)

//printf("EmitInterMediate2 TID[%d] map_task_id:%d, key:%s keyval_arr_len:%d\n",TID, map_task_idx, key, (kv_arr_p->arr_len));

//if(TID>300&&TID<320)
//for(int i=0;i<(kv_arr_p->arr_len);i++)
//printf("EmitInterMediate2 [%d] map_task_id:%d, key:%s arr_len:%d\n",TID, map_task_id, kv_arr_p->arr[i].key,(kv_arr_p->arr_len));

//printf("EmitIntermeidate2 kv_p->key:%s kv_p->val:%d \n",kv_p->key,*((int *)kv_p->val));
//__syncthreads();

}//__device__


Expand All @@ -196,7 +159,6 @@ __global__ void Mapper2(
/*int index = TID;
int bid = BLOCK_ID;
int tid = THREAD_ID;*/


int num_records_per_thread = (d_g_state.h_num_input_record+(gridDim.x*blockDim.x)-1)/(gridDim.x*blockDim.x);
int block_start_idx = num_records_per_thread*blockIdx.x*blockDim.x;
Expand All @@ -205,38 +167,25 @@ __global__ void Mapper2(
+ (threadIdx.x%STRIDE);
int thread_end_idx = thread_start_idx+num_records_per_thread*STRIDE;

//if (TID>=d_g_state.h_num_input_record)return;
if(thread_end_idx>d_g_state.h_num_input_record)
thread_end_idx = d_g_state.h_num_input_record;
//printf("Mapper TID:%d, thread_start_idx:%d thread_end_idx:%d totalThreads:%d\n",TID, thread_start_idx,thread_end_idx,gridDim.x*blockDim.x);


for(int map_task_idx=thread_start_idx; map_task_idx < thread_end_idx; map_task_idx+=STRIDE){



void *val = d_g_state.d_input_keyval_arr[map_task_idx].val;
int valSize = d_g_state.d_input_keyval_arr[map_task_idx].valSize;

void *key = d_g_state.d_input_keyval_arr[map_task_idx].key;
int keySize = d_g_state.d_input_keyval_arr[map_task_idx].keySize;

/////////////////////////////////////////////
///////////////////////////////////////////////////////////
map2(key, val, keySize, valSize, &d_g_state, map_task_idx);
/////////////////////////////////////////////


//keyval_arr_t *kv_arr_p = (keyval_arr_t *)&(d_g_state.d_intermediate_keyval_arr_arr[map_task_idx]);

///////////////////////////////////////////////////////////
//printf("\tmap_task_idx:%d reduce_arrr_len:%d",map_task_idx,kv_arr_p->arr_len);
}//for

//int map_task_id = TID;
//Note:
//cindex is the map task id used in the intermediate record list;
//coalecent input data access; i+=blockDim.x

}//__global__
}//Mapper2


//--------------------------------------------------
Expand Down Expand Up @@ -308,20 +257,16 @@ __global__ void Mapper(char* inputKeys,

int startMap2(d_global_state *d_g_state)
{
//cudaHostAlloc((void**)&int_array_host, 10 * sizeof(int), cudaHostAllocMapped);
//int_array_host = (int*)malloc(10*sizeof(int));
//int_array_host = (int *)cudaHostAlloc(10*sizeof(int));
//allocDeviceMemory<<<1, 1>>>();
//cudaDeviceSynchronize();
//printDeviceMemory<<<1,1>>>(int_array_host);

//-------------------------------------------------------
//0, Check status of d_g_state;
//-------------------------------------------------------

DoLog("startMap2 Check status of d_g_state");
DoLog("startMap2 Check parameters of d_g_state");
if (d_g_state->h_num_input_record<0) { DoLog("Error: num of input records < 0"); exit(0);}
if (d_g_state->h_input_keyval_arr == NULL) { DoLog("Error: no any input keys"); exit(0);}


//-------------------------------------------------------
//1, upload map input data from host to device memory
//-------------------------------------------------------
Expand Down Expand Up @@ -356,9 +301,9 @@ int startMap2(d_global_state *d_g_state)
//----------------------------------------------
//3, determine the number of threads to run
//----------------------------------------------
DoLog("startMap2 determine the number of threads to run");
int num_threads = d_g_state->h_num_input_record;

DoLog("startMap2 determine the number of threads (NUM_BLOCKS, NUM_THREADS) to run");
//int num_threads = d_g_state->h_num_input_record;
//calculate NUM_BLOCKS, NUM_THREADS

//--------------------------------------------------
//4, start map
Expand All @@ -368,8 +313,6 @@ int startMap2(d_global_state *d_g_state)
dim3 h_dimGrid(4,1,1);
dim3 h_dimThread(1,1,1);
int sizeSmem = 128;*/



Mapper2<<<NUM_BLOCKS,NUM_THREADS>>>(*d_g_state);
cudaThreadSynchronize();
Expand Down Expand Up @@ -409,7 +352,7 @@ int startMap(Spec_t* spec, d_global_state *d_g_state)
//-------------------------------------------------------
//1, get map input data on host
//-------------------------------------------------------

return 0;
}//return 0;


Expand All @@ -422,10 +365,7 @@ void startGroup2(d_global_state*state){
}


void startGroup(Spec_t* spec, d_global_state *state)
{
Spec_t* g_spec = spec;
}



//--------------------------------------------------------
Expand Down Expand Up @@ -600,7 +540,7 @@ void MapReduce2(d_global_state *state){
//endTimer("Reduce", &reduceTimer);

EXIT_MR:

DoLog( "\t\t----------exit map reduce--------");

}

Expand All @@ -614,12 +554,6 @@ EXIT_MR:
//param : spec
//----------------------------------------------

void MapReduce(Spec_t *spec, d_global_state *state)
{
assert(NULL != spec);
Spec_t* g_spec = spec;
d_global_state* d_g_state = state;
}

//------------------------------------------
//the last step
Expand Down

0 comments on commit f34c5db

Please sign in to comment.