Skip to content

Commit

Permalink
Make more of the code amenable to pthread cancel
Browse files Browse the repository at this point in the history
  • Loading branch information
mikebrady committed Jul 14, 2018
1 parent bae7585 commit d3b79b9
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 53 deletions.
1 change: 1 addition & 0 deletions audio.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/*
* Audio driver handler. This file is part of Shairport.
* Copyright (c) James Laird 2013
* Modifications (c) Mike Brady 2014 -- 2018
* All rights reserved.
*
* Permission is hereby granted, free of charge, to any person
Expand Down
5 changes: 3 additions & 2 deletions common.c
Original file line number Diff line number Diff line change
Expand Up @@ -419,14 +419,15 @@ static char super_secret_key[] =

#ifdef HAVE_LIBSSL
uint8_t *rsa_apply(uint8_t *input, int inlen, int *outlen, int mode) {
static RSA *rsa = NULL;

RSA *rsa = NULL;
if (!rsa) {
BIO *bmem = BIO_new_mem_buf(super_secret_key, -1);
rsa = PEM_read_bio_RSAPrivateKey(bmem, NULL, NULL, NULL);
BIO_free(bmem);
}

debug(1,"RSA_size(rsa) is %d",RSA_size(rsa));

uint8_t *out = malloc(RSA_size(rsa));
switch (mode) {
case RSA_MODE_AUTH:
Expand Down
1 change: 1 addition & 0 deletions dacp.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ int dacp_get_speaker_list(dacp_spkr_stuff *speaker_array, int max_size_of_array,
void set_dacp_server_information(rtsp_conn_info *conn); // tell the DACP conversation thread that
// the dacp server information has been set
// or changed
void relinquish_dacp_server_information(rtsp_conn_info *conn); // tell the DACP conversation thread that the player thread is no longer associated with it.
void dacp_monitor_port_update_callback(
char *dacp_id, uint16_t port); // a callback to say the port is no longer in use
int send_simple_dacp_command(const char *command);
Expand Down
3 changes: 0 additions & 3 deletions mdns_avahi.c
Original file line number Diff line number Diff line change
Expand Up @@ -468,9 +468,6 @@ void avahi_dacp_dont_monitor(void *userdata) {
if (userdata) {
dacp_browser_struct *dbs = (dacp_browser_struct *)userdata;
// stop and dispose of everything
/*if (dbs->service_poll)
avahi_threaded_poll_stop((dbs)->service_poll);
*/
if (dbs->service_poll) {
avahi_threaded_poll_stop(dbs->service_poll);
avahi_threaded_poll_lock(dbs->service_poll);
Expand Down
142 changes: 97 additions & 45 deletions player.c
Original file line number Diff line number Diff line change
Expand Up @@ -1419,10 +1419,61 @@ typedef struct stats { // statistics for running averages
void player_thread_cleanup_handler(void *arg) {
debug(1, "player_thread_cleanup_handler called");
rtsp_conn_info *conn = (rtsp_conn_info *)arg;

#ifdef HAVE_DACP_CLIENT

relinquish_dacp_server_information(conn); // say it doesn't belong to this conversation thread any more...

#else
// stop watching for DACP port number stuff
// this is only used for compatability, if dacp stuff isn't enabled.
if (conn->dapo_private_storage) {
mdns_dacp_dont_monitor(conn->dapo_private_storage);
conn->dapo_private_storage = NULL;
} else {
debug(2, "DACP Monitor already stopped");
}
#endif

debug(2, "Cancelling timing, control and audio threads...");
debug(2, "Cancel timing thread.");
pthread_cancel(conn->rtp_timing_thread);
debug(2, "Join timing thread.");
pthread_join(conn->rtp_timing_thread, NULL);
debug(2, "Timing thread terminated.");
debug(2, "Cancel control thread.");
pthread_cancel(conn->rtp_control_thread);
debug(2, "Join control thread.");
pthread_join(conn->rtp_control_thread, NULL);
debug(2, "Control thread terminated.");
debug(2, "Cancel audio thread.");
pthread_cancel(conn->rtp_audio_thread);
debug(2, "Join audio thread.");
pthread_join(conn->rtp_audio_thread, NULL);
debug(2, "Audio thread terminated.");

if (conn->outbuf) {
free(conn->outbuf);
conn->outbuf = NULL;
}
if (conn->sbuf) {
free(conn->sbuf);
conn->sbuf = NULL;
}
if (conn->tbuf) {
free(conn->tbuf);
conn->tbuf = NULL;
}
free_audio_buffers(conn);
terminate_decoders(conn);
if (config.output->stop)
config.output->stop();

clear_reference_timestamp(conn);
conn->rtp_running = 0;
}

void *player_thread_func(void *arg) {
pthread_cleanup_push(player_thread_cleanup_handler, arg);
rtsp_conn_info *conn = (rtsp_conn_info *)arg;

conn->packet_count = 0;
Expand Down Expand Up @@ -1497,12 +1548,6 @@ void *player_thread_func(void *arg) {

debug(3, "Output frame bytes is %d.", conn->output_bytes_per_frame);

// create and start the timing, control and audio receiver threads
pthread_t rtp_audio_thread, rtp_control_thread, rtp_timing_thread;
pthread_create(&rtp_audio_thread, NULL, &rtp_audio_receiver, (void *)conn);
pthread_create(&rtp_control_thread, NULL, &rtp_control_receiver, (void *)conn);
pthread_create(&rtp_timing_thread, NULL, &rtp_timing_receiver, (void *)conn);

conn->session_corrections = 0;
conn->play_segment_reference_frame = 0; // zero signals that we are not in a play segment

Expand Down Expand Up @@ -1550,12 +1595,7 @@ void *player_thread_func(void *arg) {
static char rnstate[256];
initstate(time(NULL), rnstate, 256);

signed short *inbuf, *tbuf;

int32_t *sbuf;

char *outbuf;

signed short *inbuf;
int inbuflength;

int output_bit_depth = 16; // default;
Expand Down Expand Up @@ -1595,25 +1635,24 @@ void *player_thread_func(void *arg) {
// if ((input_rate!=config.output_rate) || (input_bit_depth!=output_bit_depth)) {
// debug(1,"Define tbuf of length
// %d.",output_bytes_per_frame*(max_frames_per_packet*output_sample_ratio+max_frame_size_change));
tbuf = malloc(sizeof(int32_t) * 2 * (conn->max_frames_per_packet * conn->output_sample_ratio +
conn->tbuf = malloc(sizeof(int32_t) * 2 * (conn->max_frames_per_packet * conn->output_sample_ratio +
conn->max_frame_size_change));
if (tbuf == NULL)
if (conn->tbuf == NULL)
die("Failed to allocate memory for the transition buffer.");

sbuf = 0;
// initialise this, because soxr stuffing might be chosen later

sbuf = malloc(sizeof(int32_t) * 2 * (conn->max_frames_per_packet * conn->output_sample_ratio +
conn->sbuf = malloc(sizeof(int32_t) * 2 * (conn->max_frames_per_packet * conn->output_sample_ratio +
conn->max_frame_size_change));
if (sbuf == NULL)
debug(1, "Failed to allocate memory for the sbuf buffer.");
if (conn->sbuf == NULL)
die("Failed to allocate memory for the sbuf buffer.");

// The size of these dependents on the number of frames, the size of each frame and the maximum
// size change
outbuf = malloc(
conn->outbuf = malloc(
conn->output_bytes_per_frame *
(conn->max_frames_per_packet * conn->output_sample_ratio + conn->max_frame_size_change));
if (outbuf == NULL)
if (conn->outbuf == NULL)
die("Failed to allocate memory for an output buffer.");
conn->first_packet_timestamp = 0;
conn->missing_packets = conn->late_packets = conn->too_late_packets = conn->resend_requests = 0;
Expand All @@ -1624,7 +1663,7 @@ void *player_thread_func(void *arg) {

// stop looking elsewhere for DACP stuff
#ifdef HAVE_DACP_CLIENT
// this may have pthread cancellation points in it
// this does not have pthread cancellation points in it (assuming avahi doesn't)
set_dacp_server_information(conn); // this will start scanning when a port is registered by the
// code initiated by the mdns_dacp_monitor
#else
Expand All @@ -1634,6 +1673,7 @@ void *player_thread_func(void *arg) {
if (conn->dapo_private_storage)
debug(1, "DACP monitor already initialised?");
else
// this does not have pthread cancellation points in it (assuming avahi doesn't)
conn->dapo_private_storage = mdns_dacp_monitor(conn->dacp_id); // ??
#endif

Expand Down Expand Up @@ -1680,9 +1720,16 @@ void *player_thread_func(void *arg) {

// set the default volume to whaterver it was before, as stored in the config airplay_volume
debug(3, "Set initial volume to %f.", config.airplay_volume);

player_volume(config.airplay_volume, conn); // ??
int64_t frames_to_drop = 0;

// create and start the timing, control and audio receiver threads
pthread_create(&conn->rtp_audio_thread, NULL, &rtp_audio_receiver, (void *)conn);
pthread_create(&conn->rtp_control_thread, NULL, &rtp_control_receiver, (void *)conn);
pthread_create(&conn->rtp_timing_thread, NULL, &rtp_timing_receiver, (void *)conn);

pthread_cleanup_push(player_thread_cleanup_handler, arg); // undo what's been done so far

// debug(1, "Play begin");
while (!conn->player_thread_please_stop) {
abuf_t *inframe = buffer_get_frame(conn);
Expand Down Expand Up @@ -1759,7 +1806,7 @@ void *player_thread_func(void *arg) {
int32_t ll = 0, rl = 0;
int16_t *inps = inbuf;
// int16_t *outps = tbuf;
int32_t *outpl = (int32_t *)tbuf;
int32_t *outpl = (int32_t *)conn->tbuf;
for (i = 0; i < inbuflength; i++) {
ls = *inps++;
rs = *inps++;
Expand Down Expand Up @@ -2029,7 +2076,7 @@ void *player_thread_func(void *arg) {
|| config.convolution
#endif
) {
int32_t *tbuf32 = (int32_t *)tbuf;
int32_t *tbuf32 = (int32_t *)conn->tbuf;
float fbuf_l[inbuflength];
float fbuf_r[inbuflength];

Expand Down Expand Up @@ -2079,14 +2126,14 @@ void *player_thread_func(void *arg) {
case ST_basic:
// if (amount_to_stuff) debug(1,"Basic stuff...");
play_samples =
stuff_buffer_basic_32((int32_t *)tbuf, inbuflength, config.output_format,
outbuf, amount_to_stuff, enable_dither, conn);
stuff_buffer_basic_32((int32_t *)conn->tbuf, inbuflength, config.output_format,
conn->outbuf, amount_to_stuff, enable_dither, conn);
break;
case ST_soxr:
#ifdef HAVE_LIBSOXR
// if (amount_to_stuff) debug(1,"Soxr stuff...");
play_samples = stuff_buffer_soxr_32((int32_t *)tbuf, (int32_t *)sbuf, inbuflength,
config.output_format, outbuf, amount_to_stuff,
play_samples = stuff_buffer_soxr_32((int32_t *)conn->tbuf, (int32_t *)conn->sbuf, inbuflength,
config.output_format, conn->outbuf, amount_to_stuff,
enable_dither, conn);
#endif
break;
Expand All @@ -2107,13 +2154,13 @@ void *player_thread_func(void *arg) {
}
*/

if (outbuf == NULL)
if (conn->outbuf == NULL)
debug(1, "NULL outbuf to play -- skipping it.");
else {
if (play_samples == 0)
debug(1, "play_samples==0 skipping it (1).");
else
config.output->play(outbuf, play_samples);
config.output->play(conn->outbuf, play_samples);
}

// check for loss of sync
Expand Down Expand Up @@ -2142,12 +2189,12 @@ void *player_thread_func(void *arg) {
} else {
// if there is no delay procedure, or it's not working or not allowed, there can be no
// synchronising
play_samples = stuff_buffer_basic_32((int32_t *)tbuf, inbuflength, config.output_format,
outbuf, 0, enable_dither, conn);
if (outbuf == NULL)
play_samples = stuff_buffer_basic_32((int32_t *)conn->tbuf, inbuflength, config.output_format,
conn->outbuf, 0, enable_dither, conn);
if (conn->outbuf == NULL)
debug(1, "NULL outbuf to play -- skipping it.");
else
config.output->play(outbuf, play_samples); // remove the (short*)!
config.output->play(conn->outbuf, play_samples); // remove the (short*)!
}

// mark the frame as finished
Expand Down Expand Up @@ -2295,7 +2342,13 @@ void *player_thread_func(void *arg) {
elapsedSec);
}

#ifndef HAVE_DACP_CLIENT
/* all done in the cleanup...
#ifdef HAVE_DACP_CLIENT
relinquish_dacp_server_information(conn); // say it doesn't belong to this conversation thread any more...
#else
// stop watching for DACP port number stuff
// this is only used for compatability, if dacp stuff isn't enabled.
if (conn->dapo_private_storage) {
Expand All @@ -2306,11 +2359,6 @@ void *player_thread_func(void *arg) {
}
#endif
debug(3, "Connection %d: stopping output device.", conn->connection_number);

if (config.output->stop)
config.output->stop();

debug(2, "Cancelling timing, control and audio threads...");
debug(2, "Cancel timing thread.");
pthread_cancel(rtp_timing_thread);
Expand All @@ -2330,28 +2378,32 @@ void *player_thread_func(void *arg) {
clear_reference_timestamp(conn);
conn->rtp_running = 0;
debug(3, "Connection %d: stopping output device.", conn->connection_number);
if (config.output->stop)
config.output->stop();
debug(2, "Freeing audio buffers and decoders.");
free_audio_buffers(conn);
terminate_decoders(conn);
debug(2, "Connection %d: player thread terminated.", conn->connection_number);
if (conn->dacp_id) {
free(conn->dacp_id);
conn->dacp_id = NULL;
}
if (outbuf)
free(outbuf);
if (tbuf)
free(tbuf);
if (sbuf)
free(sbuf);
*/
pthread_cleanup_pop(1);
pthread_exit(NULL);
}

// takes the volume as specified by the airplay protocol
void player_volume_without_notification(double airplay_volume, rtsp_conn_info *conn) {

// no cancellation points here if we assume that the mute call to the back end has no cancellation points

// The volume ranges -144.0 (mute) or -30 -- 0. See
// http://git.zx2c4.com/Airtunes2/about/#setting-volume
// By examination, the -30 -- 0 range is linear on the slider; i.e. the slider is calibrated in 30
Expand Down
9 changes: 6 additions & 3 deletions player.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,14 @@ typedef struct {
SOCKADDR remote, local;
int stop;
int running;
pthread_t thread, timer_requester;

pthread_t thread, timer_requester, rtp_audio_thread, rtp_control_thread, rtp_timing_thread;
// pthread_t *ptp;

signed short *tbuf;
int32_t *sbuf;
char *outbuf;

pthread_t *player_thread;

abuf_t audio_buffer[BUFFER_FRAMES];
int max_frames_per_packet, input_num_channels, input_bit_depth, input_rate;
int input_bytes_per_frame, output_bytes_per_frame, output_sample_ratio;
Expand Down
6 changes: 6 additions & 0 deletions rtsp.c
Original file line number Diff line number Diff line change
Expand Up @@ -2021,6 +2021,12 @@ static void *rtsp_conversation_thread_func(void *pconn) {
playing_conn = NULL;
pthread_mutex_unlock(&play_lock);
}

if (conn->dacp_id) {
free(conn->dacp_id);
conn->dacp_id = NULL;
}

debug(2, "Connection %d: RTSP thread terminated.", conn->connection_number);
conn->running = 0;

Expand Down

0 comments on commit d3b79b9

Please sign in to comment.