Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,14 @@ tests/%.xml: tests/%.test
@echo -e '\033[0;33m Testing:\033[0m $<'
@CMOCKA_XML_FILE="$@" CMOCKA_MESSAGE_OUTPUT=XML "./$<" >/dev/null 2>&1

MALLOC_FUNCTIONS := $(strip malloc calloc strdup realloc json_object)
WRAP_ALLOC_FUNCTIONS := $(foreach fn, $(MALLOC_FUNCTIONS)\
,-Wl,-u,$(fn) -Wl,-wrap,$(fn))

tests/%.test: CPPFLAGS := -I. $(CPPFLAGS)
tests/%.test: tests/%.o $(filter-out src/engine/n2kafka.o,$(OBJS))
tests/%.test: tests/%.o tests/rb_mem_tests.o $(filter-out src/engine/n2kafka.o,$(OBJS))
@echo -e '\033[0;33m Building: $@ \033[0m'
@$(CC) $(CPPFLAGS) $(LDFLAGS) $< $(shell cat $(@:.test=.objdeps)) -o $@ $(LIBS) -lcmocka
@$(CC) $(WRAP_ALLOC_FUNCTIONS) $(CPPFLAGS) $(LDFLAGS) $< $(shell cat $(@:.test=.objdeps)) -o $@ $(LIBS) tests/rb_mem_tests.o -lcmocka

check_coverage:
@( if [[ "x$(WITH_COVERAGE)" == "xn" ]]; then \
Expand Down
2 changes: 1 addition & 1 deletion configure.n2kafka
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ function checks {
# TODO include some function that is only in v2, because CentOS bring us v1
mkl_meta_set "yajl" "desc" "Small event-driven (SAX-style) JSON parser"
mkl_lib_check --static=-lyajl "yajl" "" fail CC "-lyajl" \
"#include <yajl.h>"
"#include <yajl/yajl_parse.h>"

# Check that libcurl is available, and allow to link it statically.
mkl_meta_set "lcurl" "desc" "Free and easy-to-use client-side URL transfer library"
Expand Down
57 changes: 48 additions & 9 deletions src/decoder/mse/rb_mse.c
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,20 @@ static int parse_per_listener_opaque_config(struct mse_opaque *opaque,
topic_name = default_topic_name();
}

if (!topic_name) {
rdlog(LOG_ERR, "Can't create rkt with no topic");
return -1;
}

opaque->rkt = new_rkt_global_config(topic_name,
rb_client_mac_partitioner,err,sizeof(err));

if(NULL == opaque->rkt) {
rdlog(LOG_ERR, "Can't create MSE topic %s: %s", topic_name, err);
if (NULL == topic_name) {
rdlog(LOG_ERR, "Can't create MSE topic: %s", err);
} else {
rdlog(LOG_ERR, "Can't create MSE topic %s: %s", topic_name, err);
}
return -1;
}

Expand Down Expand Up @@ -243,6 +252,7 @@ static void mse_warn_timestamp(struct mse_data *data,
json_t *new_value = NULL;
json_int_t last_time_warned = 0;
struct mse_database *db = &decoder_info->mse_config->database;
int json_ret = 0;

pthread_mutex_lock(&db->warning_ht_lock);
if ((value = json_object_get(db->warning_ht, data->subscriptionName))
Expand All @@ -253,14 +263,19 @@ static void mse_warn_timestamp(struct mse_data *data,
rdlog(LOG_WARNING, "Timestamp out of date");
data->timestamp_warnings++;
new_value = json_integer(now);
json_object_set(db->warning_ht, data->subscriptionName, new_value);

json_ret = json_object_set(db->warning_ht, data->subscriptionName, new_value);
if (json_ret != 0)
rdlog(LOG_ERR, "Can't add new value in warning_ht");
}
} else {
rdlog(LOG_WARNING, "Timestamp out of date");
data->timestamp_warnings++;
new_value = json_integer(now);
json_object_set_new(db->warning_ht, data->subscriptionName,
new_value);
json_ret = json_object_set_new(db->warning_ht, data->subscriptionName,
new_value);
if (json_ret != 0)
rdlog(LOG_ERR, "Can't add new value in warning_ht");
}
pthread_mutex_unlock(&db->warning_ht_lock);
}
Expand Down Expand Up @@ -308,7 +323,8 @@ int mse_opaque_reload(json_t *config, void *_opaque) {
rb_client_mac_partitioner,err,sizeof(err));

if(NULL == rkt_aux) {
rdlog(LOG_ERR, "Can't create MSE topic %s: %s", topic_name, err);
if (NULL == topic_name)
rblog(LOG_DEBUG,"Empty topic_name conf in new_rkt_global_config.");
goto rkt_err;
}

Expand All @@ -319,6 +335,10 @@ int mse_opaque_reload(json_t *config, void *_opaque) {
decoder_info->max_time_offset = max_time_offset;
pthread_rwlock_unlock(&decoder_info->per_listener_enrichment_rwlock);

rd_kafka_topic_destroy(rkt_aux);
json_decref(enrichment_aux);
return 0;

rkt_err:
enrichment_err:
if(rkt_aux) {
Expand All @@ -329,7 +349,7 @@ int mse_opaque_reload(json_t *config, void *_opaque) {
json_decref(enrichment_aux);
}

return 0;
return -1;
}

void mse_opaque_done(void *_opaque) {
Expand Down Expand Up @@ -359,13 +379,17 @@ static int parse_sensor(json_t *sensor, json_t *streams_db) {
"{s:s,s?o}", "stream", &stream, MSE_ENRICHMENT_KEY, &enrichment);

if (unpack_rc != 0) {
rdlog(LOG_ERR, "Can't parse sensor (%s): %s", json_dumps(sensor, 0), err.text);
char *sensor_json = json_dumps(sensor, 0);
rdlog(LOG_ERR, "Can't parse sensor (%s): %s", sensor_json, err.text);
free(sensor_json);
return -1;
}

if (stream == NULL) {
rdlog(LOG_ERR, "Can't parse sensor (%s): %s", json_dumps(sensor, 0),
char *sensor_json = json_dumps(sensor, 0);
rdlog(LOG_ERR, "Can't parse sensor (%s): %s", sensor_json,
"No \"stream\"");
free(sensor_json);
return -1;
}

Expand All @@ -374,6 +398,7 @@ static int parse_sensor(json_t *sensor, json_t *streams_db) {
const int set_rc = json_object_set_new(streams_db, stream, _enrich);
if (set_rc != 0) {
rdlog(LOG_ERR, "Can't set new MSE enrichment db entry (out of memory?)");
return -1;
}

return 0;
Expand All @@ -399,7 +424,11 @@ int parse_mse_array(void *_db, const struct json_t *mse_array) {
}

json_array_foreach(mse_array, _index, value) {
parse_sensor(value, new_db);
const int ret = parse_sensor(value, new_db);
if (ret != 0) {
json_decref(new_db);
return -1;
}
}

pthread_rwlock_wrlock(&db->rwlock);
Expand Down Expand Up @@ -545,11 +574,21 @@ static struct mse_array *extract_mse10_rich_data(json_t *from,
return NULL;
}

if (0 == json_is_array(notifications_array)) {
rdlog(LOG_ERR, "The MSE10 JSON notifications array is not an array %s", err.text);
return NULL;
}

const size_t mse_array_size = json_array_size(notifications_array);
const size_t alloc_size = sizeof(struct mse_array) + mse_array_size * sizeof(
struct mse_data);

struct mse_array *mse_array = calloc(1, alloc_size);
if (NULL == mse_array) {
rdlog(LOG_ERR, "Can not alloc the mse_array (out of memory?)");
return NULL;
}

mse_array->size = mse_array_size;
mse_array->data = (void *)&mse_array[1];

Expand Down
2 changes: 1 addition & 1 deletion src/engine/global_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ struct n2kafka_config{
extern struct n2kafka_config global_config;

static inline bool only_stdout_output(){
return global_config.debug && !global_config.brokers;
return !global_config.brokers && !global_config.debug;
}

void init_global_config();
Expand Down
17 changes: 17 additions & 0 deletions src/util/in_addr_list.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include <sys/socket.h>
#include <stdlib.h>
#include <string.h>
#include <librd/rdlog.h>



typedef struct in_addr_list_node_s{
Expand All @@ -42,6 +44,11 @@ struct in_addr_list_s{
/// Init a sockaddr_in list.
in_addr_list_t *in_addr_list_new(){
in_addr_list_t *list = calloc(1,sizeof(*list));
if (NULL == list) {
rdlog(LOG_ERR, "Can't alloc a sockaddr_in list (out of memory?)");
return NULL;
}

LIST_INIT(syslist(list));
return list;
}
Expand All @@ -50,6 +57,11 @@ in_addr_list_t *in_addr_list_new(){
void in_addr_list_add(in_addr_list_t *list,const struct in_addr *addr){
in_addr_list_node_t *node = calloc(1,sizeof(*node));

if (NULL == node) {
rdlog(LOG_ERR, "Can't alloc the node (out of memory?)");
return;
}

memcpy(&node->addr,addr,sizeof(*addr));

LIST_INSERT_HEAD(syslist(list),node,entry);
Expand All @@ -67,6 +79,11 @@ int in_addr_list_contains(const in_addr_list_t *list,const struct in_addr *addr)

/// Deallocate a list.
void in_addr_list_done(in_addr_list_t *list){
if (NULL == list){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is valid to free an empty list

rdlog(LOG_ERR, "The list parameters is NULL");
return;
}

in_addr_list_node_t *n=NULL;
while((n = LIST_FIRST(syslist(list)))){
LIST_REMOVE(n,entry);
Expand Down
7 changes: 7 additions & 0 deletions src/util/kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,18 @@
rd_kafka_topic_t *new_rkt_global_config(const char *topic_name,
rb_rd_kafka_partitioner_t partitioner,char *err,size_t errsize) {
rd_kafka_topic_conf_t *template_config = global_config.kafka_topic_conf;

if (NULL == topic_name) {
rblog(LOG_DEBUG,"Empty topic_name conf in new_rkt_global_config.");
return NULL;
}

rd_kafka_topic_conf_t *my_rkt_conf
= rd_kafka_topic_conf_dup(template_config);

if(NULL == my_rkt_conf) {
rdlog(LOG_ERR,"Couldn't topic_conf_dup in topic %s",topic_name);
rd_kafka_topic_conf_destroy(template_config);
return NULL;
}

Expand Down
Loading