Skip to content

Commit

Permalink
Send rtsp events to the metadata thteread for handling. Begin to clea…
Browse files Browse the repository at this point in the history
…n up signals.
  • Loading branch information
mikebrady committed Mar 31, 2018
1 parent 8094bb4 commit 69b3b5d
Show file tree
Hide file tree
Showing 9 changed files with 301 additions and 263 deletions.
4 changes: 4 additions & 0 deletions RELEASENOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
Version 3.2d33
====
A new metadata token -- 'pffr' The First Frame of a play session has been Received. Not sure we'll keep it...

Version 3.2d30
====
**Enhancements**
Expand Down
2 changes: 1 addition & 1 deletion audio_alsa.c
Original file line number Diff line number Diff line change
Expand Up @@ -947,7 +947,7 @@ void do_volume(double vol) { // caller is assumed to have the alsa_mutex when us
// debug(1,"Set alsa volume.");
do_snd_mixer_selem_set_playback_dB_all(alsa_mix_elem, vol);
} else {
debug(1, "Not setting volume because volume-based mute is active");
debug(2, "Not setting volume because volume-based mute is active");
}
}
volume_set_request = 0; // any external request that has been made is now satisfied
Expand Down
23 changes: 18 additions & 5 deletions common.c
Original file line number Diff line number Diff line change
Expand Up @@ -984,16 +984,29 @@ void memory_barrier() {
pthread_mutex_unlock(&barrier_mutex);
}

int ss_pthread_mutex_timedlock(pthread_mutex_t *mutex, useconds_t dally_time,
const char *debugmessage, int debuglevel) {
void sps_nanosleep(const time_t sec, const long nanosec) {
struct timespec req, rem;
int result;
req.tv_sec = sec;
req.tv_nsec = nanosec;
do {
result = nanosleep(&req, &rem);
rem = req;
} while ((result == -1) && (errno == EINTR));
if (result == -1)
debug(1, "Error in sps_nanosleep of %d sec and %ld nanoseconds: %d.", sec, nanosec, errno);
}

int sps_pthread_mutex_timedlock(pthread_mutex_t *mutex, useconds_t dally_time,
const char *debugmessage, int debuglevel) {

int time_to_wait = dally_time;
useconds_t time_to_wait = dally_time;
int r = pthread_mutex_trylock(mutex);
while ((r) && (time_to_wait > 0)) {
int st = time_to_wait;
useconds_t st = time_to_wait;
if (st > 20000)
st = 20000;
usleep(st);
sps_nanosleep(0, st * 1000);
time_to_wait -= st;
r = pthread_mutex_trylock(mutex);
}
Expand Down
7 changes: 5 additions & 2 deletions common.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,12 @@ void shairport_shutdown();
extern sigset_t pselect_sigset;

// wait for the specified time in microseconds -- it checks every 20 milliseconds
int ss_pthread_mutex_timedlock(pthread_mutex_t *mutex, useconds_t dally_time,
const char *debugmessage, int debuglevel);
int sps_pthread_mutex_timedlock(pthread_mutex_t *mutex, useconds_t dally_time,
const char *debugmessage, int debuglevel);

char *get_version_string(); // mallocs a string space -- remember to free it afterwards

void sps_nanosleep(const time_t sec,
const long nanosec); // waits for this time, even through interruptions

#endif // _COMMON_H
69 changes: 46 additions & 23 deletions dacp.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,14 @@ struct HttpResponse {
int code;
};

static void *response_realloc(__attribute__((unused)) void *opaque, void *ptr, int size) {
return realloc(ptr, size);
void *response_realloc(__attribute__((unused)) void *opaque, void *ptr, int size) {
void *t = realloc(ptr, size);
if ((t == NULL) && (size != 0))
debug(1, "Response realloc of size %d failed!", size);
return t;
}

static void response_body(void *opaque, const char *data, int size) {
void response_body(void *opaque, const char *data, int size) {
struct HttpResponse *response = (struct HttpResponse *)opaque;

ssize_t space_available = response->malloced_size - response->size;
Expand Down Expand Up @@ -159,7 +162,8 @@ int dacp_send_command(const char *command, char **body, ssize_t *bodysize) {

// only do this one at a time -- not sure it is necessary, but better safe than sorry

int mutex_reply = ss_pthread_mutex_timedlock(&dacp_conversation_lock, 1000000, command, 1);
int mutex_reply = sps_pthread_mutex_timedlock(&dacp_conversation_lock, 2000000, command, 1);
// int mutex_reply = pthread_mutex_lock(&dacp_conversation_lock);
if (mutex_reply == 0) {
// debug(1,"dacp_conversation_lock acquired for command \"%s\".",command);

Expand All @@ -170,6 +174,13 @@ int dacp_send_command(const char *command, char **body, ssize_t *bodysize) {
// debug(1, "DACP socket could not be created -- error %d: \"%s\".",errno,strerror(errno));
response.code = 497; // Can't establish a socket to the DACP server
} else {
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 250000;
if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (const char *)&tv, sizeof tv) == -1)
debug(1, "Error %d setting receive timeout for DACP service.", errno);
if (setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, (const char *)&tv, sizeof tv) == -1)
debug(1, "Error %d setting send timeout for DACP service.", errno);

// connect!
// debug(1, "DACP socket created.");
Expand All @@ -185,24 +196,28 @@ int dacp_send_command(const char *command, char **body, ssize_t *bodysize) {

// Send command
// debug(1,"DACP connect message: \"%s\".",message);
if (setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, (const char *)&tv, sizeof tv) == -1)
debug(1, "Error %d setting send timeout for DACP service.", errno);
if (send(sockfd, message, strlen(message), 0) != (ssize_t)strlen(message)) {
// debug(1, "Send failed");
response.code = 493; // Client failed to send a message

} else {

response.body = malloc(2048); // it can resize this if necessary
response.malloced_size = 2048;
response.body = malloc(32768); // it can resize this if necessary
response.malloced_size = 32768;

struct http_roundtripper rt;
http_init(&rt, responseFuncs, &response);

int needmore = 1;
int looperror = 0;
char buffer[1024];
char buffer[8192];
memset(buffer, 0, sizeof(buffer));
while (needmore && !looperror) {
const char *data = buffer;
if (setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, (const char *)&tv, sizeof tv) == -1)
debug(1, "Error %d setting receive timeout for DACP service.", errno);
int ndata = recv(sockfd, buffer, sizeof(buffer), 0);
// debug(1,"Received %d bytes: \"%s\".",ndata,buffer);
if (ndata <= 0) {
Expand Down Expand Up @@ -238,10 +253,12 @@ int dacp_send_command(const char *command, char **body, ssize_t *bodysize) {
// debug(1,"DACP socket closed.");
}
pthread_mutex_unlock(&dacp_conversation_lock);
// debug(1,"Sent command\"%s\" with a response body of size %d.",command,response.size);
// debug(1,"dacp_conversation_lock released.");
} else {
// debug(1, "Could not acquire a lock on the dacp transmit/receive section. Possible
// timeout?");
debug(1, "Could not acquire a lock on the dacp transmit/receive section when attempting to "
"send the command \"%s\". Possible timeout?",
command);
response.code = 494; // This client is already busy
}
}
Expand All @@ -268,7 +285,7 @@ void relinquish_dacp_server_information(rtsp_conn_info *conn) {
// as the conn's connection number
// this is to signify that the player has stopped, but only if another thread (with a different
// index) hasn't already taken over the dacp service
ss_pthread_mutex_timedlock(
sps_pthread_mutex_timedlock(
&dacp_server_information_lock, 500000,
"set_dacp_server_information couldn't get DACP server information lock in 0.5 second!.", 1);
if (dacp_server.players_connection_thread_index == conn->connection_number)
Expand All @@ -279,7 +296,7 @@ void relinquish_dacp_server_information(rtsp_conn_info *conn) {
// this will be running on the thread of its caller, not of the conversation thread...
void set_dacp_server_information(rtsp_conn_info *conn) { // tell the DACP conversation thread that
// the port has been set or changed
ss_pthread_mutex_timedlock(
sps_pthread_mutex_timedlock(
&dacp_server_information_lock, 500000,
"set_dacp_server_information couldn't get DACP server information lock in 0.5 second!.", 1);
dacp_server.players_connection_thread_index = conn->connection_number;
Expand All @@ -290,8 +307,10 @@ void set_dacp_server_information(rtsp_conn_info *conn) { // tell the DACP conver
dacp_server.active_remote_id = conn->dacp_active_remote;
if (dacp_server.port)
dacp_server.scan_enable = 1;
else
else {
debug(1, "DACP server port has been set to zero.");
dacp_server.scan_enable = 0;
}
metadata_hub_modify_prolog();
int ch = metadata_store.dacp_server_active != dacp_server.scan_enable;
metadata_store.dacp_server_active = dacp_server.scan_enable;
Expand Down Expand Up @@ -320,7 +339,7 @@ void *dacp_monitor_thread_code(__attribute__((unused)) void *na) {
int32_t revision_number = 1;
while (1) {
int result;
ss_pthread_mutex_timedlock(
sps_pthread_mutex_timedlock(
&dacp_server_information_lock, 500000,
"dacp_monitor_thread_code couldn't get DACP server information lock in 0.5 second!.", 1);
while (dacp_server.scan_enable == 0) {
Expand All @@ -331,7 +350,7 @@ void *dacp_monitor_thread_code(__attribute__((unused)) void *na) {
int32_t the_volume;
result = dacp_get_volume(&the_volume); // just want the http code
if ((result == 496) || (result == 403) || (result == 501)) {
// debug(1,"Stopping scan because the response to \"dacp_get_volume(NULL)\" is %d.",result);
debug(1, "Stopping scan because the response to \"dacp_get_volume(NULL)\" is %d.", result);
dacp_server.scan_enable = 0;
metadata_hub_modify_prolog();
int ch = metadata_store.dacp_server_active != 0;
Expand All @@ -342,17 +361,21 @@ void *dacp_monitor_thread_code(__attribute__((unused)) void *na) {
// debug(1, "DACP Server ID \"%u\" at \"%s:%u\", scan %d.", dacp_server.active_remote_id,
// dacp_server.ip_string, dacp_server.port, scan_index);

int adv = (result == 200);
// a result of 200 means the advanced features of, e.g., iTunes, are available
// so, turn the advanced_dacp_server flag on or off and flag if it's changed.
metadata_hub_modify_prolog();
int diff = metadata_store.advanced_dacp_server_active != adv;
if (diff)
metadata_store.advanced_dacp_server_active = adv;
metadata_hub_modify_epilog(diff);
if (result != 494) { // this just means that it couldn't send the query because something else
// was sending a command
int adv = (result == 200);
// a result of 200 means the advanced features of, e.g., iTunes, are available
// so, turn the advanced_dacp_server flag on or off and flag if it's changed.
metadata_hub_modify_prolog();
int diff = metadata_store.advanced_dacp_server_active != adv;
if (diff) {
metadata_store.advanced_dacp_server_active = adv;
debug(1, "Setting dacp_server_active to %d because of a response of %d.", adv, result);
}
metadata_hub_modify_epilog(diff);
}

if (result == 200) {

metadata_hub_modify_prolog();
int diff = metadata_store.speaker_volume != the_volume;
if (diff)
Expand Down
Loading

0 comments on commit 69b3b5d

Please sign in to comment.