Skip to content
This repository was archived by the owner on Apr 19, 2025. It is now read-only.

Commit ee602bc

Browse files
committed
Reverted back to single-threaded d_struct_buf(optimised version)
1 parent 281c542 commit ee602bc

File tree

3 files changed

+56
-153
lines changed

3 files changed

+56
-153
lines changed

drpc_struct.c

Lines changed: 56 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -275,131 +275,71 @@ enum drpc_types d_struct_get_type(struct d_struct* dstruct, char* key){
275275
return ret;
276276
}
277277

278-
struct __d_struct_thrd_serialise_param{
279-
struct d_struct* dstruct;
280-
char** keys;
281-
queue_t ready_type;
282-
283-
sem_t pre_serialise_wait;
284-
sem_t final_serialise_wait;
285-
};
286-
287-
static void* d_struct_thrd_data_gather(void* param_P){
288-
struct __d_struct_thrd_serialise_param* param = param_P;
289-
size_t j = 0;
290-
for(size_t i = 0; i < param->dstruct->hashtable->capacity; i++){
291-
if(param->dstruct->hashtable->body[i].value != NULL && param->dstruct->hashtable->body[i].key != NULL && param->dstruct->hashtable->body[i].key != (char*)0xDEAD){
292-
293-
param->keys[j] = param->dstruct->hashtable->body[i].key; j++;
294-
assert(sem_post(&param->pre_serialise_wait) == 0);
295-
}
296-
}
297-
return NULL;
298-
}
299-
300-
static void* d_struct_thrd_preserialise(void* param_P){
301-
struct __d_struct_thrd_serialise_param* param = param_P;
302-
303-
for(size_t i = 0; i < param->dstruct->current_len; i++){
304-
assert(sem_wait(&param->pre_serialise_wait) == 0);
305-
char* key = param->keys[i];
306-
struct d_struct_element* element = hashtable_get(param->dstruct->hashtable,key);
307-
308-
enum drpc_types type = element->type;
309-
struct drpc_type* packed_type = NULL;
310-
if(element->is_packed == 0){
311-
packed_type = malloc(sizeof(*packed_type)); assert(packed_type);
312-
switch(type){
313-
case d_sizedbuf:
314-
sizedbuf_to_drpc(packed_type,element->data,element->sizedbuf_len);
315-
break;
316-
case d_struct:
317-
d_struct_to_drpc(packed_type,element->data);
318-
break;
319-
case d_queue:
320-
d_queue_to_drpc(packed_type,element->data);
321-
break;
322-
case d_array:
323-
d_array_to_drpc(packed_type,element->data);
324-
break;
325-
case d_str:
326-
str_to_drpc(packed_type,element->data);
327-
break;
328-
default: break;
329-
}
330-
} else packed_type = element->data;
331-
332-
size_t type_len = strlen(key) + 1 + drpc_type_buflen(packed_type);
333-
char* keyed_buf = malloc(type_len); assert(keyed_buf);
334-
char* edit_buf = keyed_buf;
335-
336-
memcpy(edit_buf,key,strlen(key) + 1); edit_buf += strlen(key) + 1;
337-
338-
drpc_buf(packed_type,edit_buf);
339-
340-
struct drpc_type* ready = malloc(sizeof(*ready)); assert(ready);
341-
342-
ready->packed_data = keyed_buf;
343-
ready->type = type;
344-
ready->len = type_len;
345-
346-
queue_push(param->ready_type,ready);
347-
sem_post(&param->final_serialise_wait);
348-
349-
if(element->is_packed == 0) {
350-
drpc_type_free(packed_type);
351-
free(packed_type);
352-
}
353-
}
354-
return NULL;
355-
}
356-
357-
static void* d_struct_thrd_finalserialise(void* param_P){
358-
struct __d_struct_thrd_serialise_param* param = param_P;
359-
360-
return drpc_types_buf_threaded(param->ready_type,&param->final_serialise_wait,param->dstruct->current_len);
361-
}
362-
363278
char* d_struct_buf(struct d_struct* dstruct, size_t* buflen){
364279
if(dstruct->current_len == 0){
365280
*buflen = sizeof(uint64_t);
366-
char* ret = calloc(1,*buflen); assert(ret);
367-
return ret;
281+
char* empty_buf = calloc(1,*buflen);
282+
assert(empty_buf);
283+
return empty_buf;
368284
}
369-
struct __d_struct_thrd_serialise_param param;
370-
param.dstruct = dstruct;
371-
param.ready_type = queue_create();
372-
param.keys = malloc(dstruct->current_len * sizeof(char*));
373-
assert(param.keys);
374-
375-
sem_init(&param.pre_serialise_wait,0,0);
376-
sem_init(&param.final_serialise_wait,0,0);
285+
struct drpc_type* packed = malloc(dstruct->current_len * sizeof(*packed));
286+
assert(packed);
377287

378-
pthread_t data_gather;
379-
pthread_t pre_serialise;
380-
pthread_t final_serialise;
381-
382-
assert(pthread_create(&data_gather,NULL,d_struct_thrd_data_gather,&param) == 0);
383-
assert(pthread_create(&pre_serialise,NULL,d_struct_thrd_preserialise,&param) == 0);
384-
assert(pthread_create(&final_serialise,NULL,d_struct_thrd_finalserialise,&param) == 0);
385-
386-
void* ret = NULL;
387-
struct drpc_types_buf_threaded_output* output;
388-
assert(pthread_join(data_gather,NULL) == 0);
389-
assert(pthread_join(pre_serialise,NULL) == 0);
390-
assert(pthread_join(final_serialise,&ret) == 0);
391-
output = ret;
288+
size_t j = 0;
289+
for(size_t i = 0; i < dstruct->hashtable->capacity; i++){
290+
if(dstruct->hashtable->body[i].value != NULL && dstruct->hashtable->body[i].key != NULL && dstruct->hashtable->body[i].key != (char*)0xDEAD){
392291

393-
sem_destroy(&param.pre_serialise_wait);
394-
sem_destroy(&param.final_serialise_wait);
292+
struct drpc_type* packed_type = NULL;
293+
if(((struct d_struct_element*)dstruct->hashtable->body[i].value)->is_packed == 0){
294+
packed_type = malloc(sizeof(*packed_type)); assert(packed_type);
295+
switch(((struct d_struct_element*)dstruct->hashtable->body[i].value)->type){
296+
case d_sizedbuf:
297+
sizedbuf_to_drpc(packed_type,((struct d_struct_element*)dstruct->hashtable->body[i].value)->data,
298+
((struct d_struct_element*)dstruct->hashtable->body[i].value)->sizedbuf_len);
299+
break;
300+
case d_struct:
301+
d_struct_to_drpc(packed_type,((struct d_struct_element*)dstruct->hashtable->body[i].value)->data);
302+
break;
303+
case d_queue:
304+
d_queue_to_drpc(packed_type,((struct d_struct_element*)dstruct->hashtable->body[i].value)->data);
305+
break;
306+
case d_array:
307+
d_array_to_drpc(packed_type,((struct d_struct_element*)dstruct->hashtable->body[i].value)->data);
308+
break;
309+
case d_str:
310+
str_to_drpc(packed_type,((struct d_struct_element*)dstruct->hashtable->body[i].value)->data);
311+
break;
312+
default: break;
313+
}
314+
} else packed_type = ((struct d_struct_element*)dstruct->hashtable->body[i].value)->data;
315+
316+
size_t keylen = strlen(dstruct->hashtable->body[i].key);
317+
size_t packed_buflen = drpc_type_buflen(packed_type);
318+
char* keyed_buf = malloc(keylen + 1 + packed_buflen); assert(keyed_buf);
319+
char* edit_buf = keyed_buf;
320+
321+
memcpy(edit_buf,dstruct->hashtable->body[i].key,keylen + 1); edit_buf += keylen + 1;
322+
323+
drpc_buf(packed_type,edit_buf);
324+
325+
packed[j].packed_data = keyed_buf;
326+
packed[j].type = ((struct d_struct_element*)dstruct->hashtable->body[i].value)->type;
327+
packed[j].len = keylen + 1 + packed_buflen;
328+
j++;
329+
330+
if(((struct d_struct_element*)dstruct->hashtable->body[i].value)->is_packed == 0) {
331+
drpc_type_free(packed_type);
332+
free(packed_type);
333+
}
334+
}
335+
}
395336

396-
queue_free(param.ready_type);
397-
free(param.keys);
337+
*buflen = drpc_types_buflen(packed,dstruct->current_len);
338+
char* buf = malloc(*buflen); assert(buf);
398339

399-
char* buf = output->buf;
400-
*buflen = output->buflen;
340+
drpc_types_buf(packed,dstruct->current_len,buf);
341+
drpc_types_free(packed,dstruct->current_len);
401342

402-
free(output);
403343
return buf;
404344
}
405345
void buf_d_struct(char* buf, struct d_struct* dstruct){

drpc_types.c

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -205,37 +205,6 @@ void drpc_types_buf(struct drpc_type* types,size_t len,char* buf){
205205
buf += drpc_buf(&types[i],buf);
206206
}
207207
}
208-
#include <stdio.h>
209-
struct drpc_types_buf_threaded_output* drpc_types_buf_threaded(queue_t types,sem_t* wait,size_t len){
210-
uint64_t len64 = len;
211-
size_t buflen = sizeof(uint64_t);
212-
size_t buf_offset = 0;
213-
char* buf = malloc(buflen); assert(buflen);
214-
memcpy(buf,&len64,sizeof(uint64_t)); buf_offset += sizeof(uint64_t);
215-
216-
for(size_t i = 0; i < len; i++){
217-
assert(sem_wait(wait) == 0);
218-
struct drpc_type* type = queue_pop(types);
219-
size_t type_len = drpc_type_buflen(type);
220-
if(buf_offset + type_len > buflen){
221-
size_t queue_len = queue_count(types);
222-
buflen += type_len + buf_offset;
223-
for(size_t i = 0; i < queue_len; i++){
224-
struct drpc_type* popped_type = queue_pop(types);
225-
buflen += drpc_type_buflen(popped_type);
226-
queue_push(types,popped_type);
227-
}
228-
assert((buf = realloc(buf,buflen)));
229-
}
230-
buf_offset += drpc_buf(type,buf + buf_offset);
231-
drpc_type_free(type);
232-
free(type);
233-
}
234-
struct drpc_types_buf_threaded_output* out = malloc(sizeof(*out)); assert(out);
235-
out->buf = buf;
236-
out->buflen = buflen;
237-
return out;
238-
}
239208

240209
struct drpc_type* buf_drpc_types(char* buf, size_t *len){
241210
uint64_t len64 = 0;

drpc_types.h

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,6 @@ struct drpc_type{
4646
size_t len;
4747
char* packed_data;
4848
};
49-
struct drpc_types_buf_threaded_output{
50-
char* buf;
51-
size_t buflen;
52-
};
5349

5450
#include "drpc_struct.h"
5551
#include "drpc_queue.h"
@@ -110,8 +106,6 @@ size_t drpc_types_buflen(struct drpc_type* types, size_t len);
110106

111107
void drpc_types_buf(struct drpc_type* types,size_t len,char* buf);
112108

113-
struct drpc_types_buf_threaded_output* drpc_types_buf_threaded(queue_t types,sem_t* wait,size_t len);
114-
115109
struct drpc_type* buf_drpc_types(char* buf, size_t *len);
116110

117111
void drpc_types_free(struct drpc_type* types, size_t len);

0 commit comments

Comments
 (0)