Skip to content

Commit

Permalink
Change how rtsp_messages are memory managed. Add some diagnostics.
Browse files Browse the repository at this point in the history
  • Loading branch information
mikebrady committed Jan 12, 2019
1 parent ca72645 commit 9fbdc06
Showing 1 changed file with 63 additions and 24 deletions.
87 changes: 63 additions & 24 deletions rtsp.c
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,10 @@ typedef struct {
} pc_queue; // producer-consumer queue
#endif

static int msg_indexes = 1;

typedef struct {
int index_number;
uint32_t referenceCount; // we might start using this...
unsigned int nheaders;
char *name[16];
Expand Down Expand Up @@ -401,16 +404,17 @@ static char *nextline(char *in, int inbuf) {
}

void msg_retain(rtsp_message *msg) {
if (msg) {
int rc = pthread_mutex_lock(&reference_counter_lock);
if (rc)
debug(1, "Error %d locking reference counter lock");
if (msg > (rtsp_message *) 0x00010000) {
msg->referenceCount++;
debug(1,"msg_retain -- item %d reference count %d.", msg->index_number, msg->referenceCount);
rc = pthread_mutex_unlock(&reference_counter_lock);
if (rc)
debug(1, "Error %d unlocking reference counter lock");
} else {
debug(1, "null rtsp_message pointer passed to retain");
debug(1, "invalid rtsp_message pointer %d passed to retain", (int) msg);
}
}

Expand All @@ -419,9 +423,11 @@ rtsp_message *msg_init(void) {
if (msg) {
memset(msg, 0, sizeof(rtsp_message));
msg->referenceCount = 1; // from now on, any access to this must be protected with the lock
msg->index_number = msg_indexes++;
} else {
die("can not allocate memory for an rtsp_message.");
die("msg_init -- can not allocate memory for rtsp_message %d.", msg_indexes);
}
debug(1,"msg_init -- create item %d.", msg->index_number);
return msg;
}

Expand Down Expand Up @@ -476,6 +482,37 @@ static void debug_print_msg_content(int level, rtsp_message *msg) {
}
*/

void msg_free(rtsp_message **msgh) {
debug_mutex_lock(&reference_counter_lock, 1000, 0);
if (*msgh > (rtsp_message *)0x00010000) {
rtsp_message *msg = *msgh;
msg->referenceCount--;
if (msg->referenceCount == 0) {
unsigned int i;
for (i = 0; i < msg->nheaders; i++) {
free(msg->name[i]);
free(msg->value[i]);
}
if (msg->content)
free(msg->content);
debug(1,"msg_free item %d -- free.",msg->index_number);
*msgh = (rtsp_message *)(msg->index_number & 0xFFFF); // put the index number of the freed message in here
free(msg);
} else {
debug(1,"msg_free item %d -- decrement reference to %d.",msg->index_number,msg->referenceCount);
}

// else {
// debug(1,"rtsp_message reference count non-zero:
// %d!",msg->referenceCount);
//}
} else if (*msgh != NULL) {
debug(1, "msg_free: error attempting to free an allocated but already-freed rtsp_message %d.",(int)*msgh);
}
debug_mutex_unlock(&reference_counter_lock, 0);
}

/*
void msg_free(rtsp_message *msg) {
if (msg) {
Expand All @@ -499,6 +536,7 @@ void msg_free(rtsp_message *msg) {
debug(1, "null rtsp_message pointer passed to msg_free()");
}
}
*/

int msg_handle_line(rtsp_message **pmsg, char *line) {
rtsp_message *msg = *pmsg;
Expand Down Expand Up @@ -550,17 +588,22 @@ int msg_handle_line(rtsp_message **pmsg, char *line) {
}

fail:
*pmsg = NULL;
msg_free(msg);
msg_free(pmsg);
return 0;
}

enum rtsp_read_request_response rtsp_read_request(rtsp_conn_info *conn, rtsp_message **the_packet) {

*the_packet = NULL; // need this for erro handling

enum rtsp_read_request_response reply = rtsp_read_request_response_ok;
ssize_t buflen = 4096;
char *buf = malloc(buflen + 1); // add a NUL at the end

rtsp_message *msg = NULL;
if (!buf) {
warn("rtsp_read_request: can't get a buffer.");
reply = rtsp_read_request_response_error;
goto shutdown;
}

ssize_t nread;
ssize_t inbuf = 0;
Expand Down Expand Up @@ -607,9 +650,9 @@ do {

char *next;
while (msg_size < 0 && (next = nextline(buf, inbuf))) {
msg_size = msg_handle_line(&msg, buf);
msg_size = msg_handle_line(the_packet, buf);

if (!msg) {
if (!(*the_packet)) {
warn("no RTSP header received");
reply = rtsp_read_request_response_bad_packet;
goto shutdown;
Expand Down Expand Up @@ -690,6 +733,7 @@ do {
inbuf += nread;
}

rtsp_message *msg = *the_packet;
msg->contentlength = inbuf;
msg->content = buf;
char *jp = inbuf + buf;
Expand All @@ -698,13 +742,8 @@ do {
return reply;

shutdown:
if (msg) {
msg_free(msg); // which will free the content and everything else
}
// in case the message wasn't formed or wasn't fully initialised
if ((msg && (msg->content == NULL)) || (!msg))
free(buf);
*the_packet = NULL;
msg_free(the_packet);
free(buf);
return reply;
}

Expand Down Expand Up @@ -1396,7 +1435,7 @@ void metadata_pack_cleanup_function(void *arg) {
// debug(1, "metadata_pack_cleanup_function called");
metadata_package *pack = (metadata_package *)arg;
if (pack->carrier)
msg_free(pack->carrier); // release the message
msg_free(&pack->carrier); // release the message
else if (pack->data)
free(pack->data);
}
Expand Down Expand Up @@ -1474,13 +1513,13 @@ int send_metadata(uint32_t type, uint32_t code, char *data, uint32_t length, rts
pack.code = code;
pack.data = data;
pack.length = length;
if (carrier)
msg_retain(carrier);
pack.carrier = carrier;
if (pack.carrier)
msg_retain(pack.carrier);
int rc = pc_queue_add_item(&metadata_queue, &pack, block);
if (rc == EBUSY) {
if (carrier)
msg_free(carrier);
if (pack.carrier)
msg_free(&pack.carrier);
else if (data)
free(data);
warn("Metadata queue is busy, discarding message of type 0x%08X, code 0x%08X.", type, code);
Expand Down Expand Up @@ -2171,7 +2210,7 @@ void rtsp_conversation_thread_cleanup_function(void *arg) {

void msg_cleanup_function(void *arg) {
// debug(3, "msg_cleanup_function called.");
msg_free((rtsp_message *)arg);
msg_free((rtsp_message **)arg);
}

static void *rtsp_conversation_thread_func(void *pconn) {
Expand Down Expand Up @@ -2217,9 +2256,9 @@ static void *rtsp_conversation_thread_func(void *pconn) {
int debug_level = 3; // for printing the request and response
reply = rtsp_read_request(conn, &req);
if (reply == rtsp_read_request_response_ok) {
pthread_cleanup_push(msg_cleanup_function, (void *)req);
pthread_cleanup_push(msg_cleanup_function, (void *)&req);
resp = msg_init();
pthread_cleanup_push(msg_cleanup_function, (void *)resp);
pthread_cleanup_push(msg_cleanup_function, (void *)&resp);
resp->respcode = 400;

if (strcmp(req->method, "OPTIONS") !=
Expand Down

0 comments on commit 9fbdc06

Please sign in to comment.